async read write rework done (in principle)

This commit is contained in:
Muaz Ahmad 2023-10-12 12:45:38 +05:00
parent 6cf6bba795
commit 0b910df389
6 changed files with 97 additions and 86 deletions

View file

@ -18,14 +18,17 @@ pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decode
return Ok((v, a)); return Ok((v, a));
} }
pub fn handle_decoder(decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) { pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
let mut stdin = decoder.stdin(); let stdin = decoder.stdin();
let mut stdout = decoder.stdout(); let stdout = decoder.stdout();
let raw_buff_size = decoder.raw_buff_size();
let raw_sample_type = decoder.media_type();
let c_err2 = c_err.clone(); let c_err2 = c_err.clone();
thread::spawn(move || { thread::spawn(move || {
read_nalu_loop(stdin, c_in, c_err); codecs::read_nalu_loop(stdin, c_in, c_err);
}); });
thread::spawn(move || { thread::spawn(move || {
write_raw_loop(stdout, c_out, c_err2); codecs::write_raw_loop(stdout, raw_buff_size, raw_sample_type, c_out, c_err2);
}); });
} }

View file

@ -1,39 +1,31 @@
use std::error::Error; use std::error::Error;
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio, ChildStdin, ChildStdout};
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::io::{Read, Write};
use crate::decode::codecs::Decoder; use crate::decode::codecs::Decoder;
use crate::util; use crate::util;
pub struct AACDecoder { pub struct AACDecoder {
cmd: Child, cmd: Child,
bytes_per_sample_chunk: usize raw_buff_size: usize,
raw_sample_type: util::RawMediaType,
} }
impl Decoder for Arc<Mutex<AACDecoder>> { impl Decoder for AACDecoder {
fn close_clean(&self) { fn stdin(&mut self) -> ChildStdin {
let mut self_ptr = self.lock().unwrap(); self.cmd.stdin.take().unwrap()
drop(self_ptr.cmd.stdin.take().unwrap());
} }
fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut self_ptr = self.lock().unwrap(); fn stdout(&mut self) -> ChildStdout {
let mut pipe = match &self_ptr.cmd.stdin { self.cmd.stdout.take().unwrap()
Some(x) => x,
None => {return Err(Box::new(util::DecoderError::BrokenInPipe))}
};
pipe.write_all(nalu.packet_data.as_slice())?;
Ok(())
} }
fn read_raw(&self) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
let mut self_ptr = self.lock().unwrap(); fn raw_buff_size(&self) -> usize {
let mut pcm_buff = vec![0u8; self_ptr.bytes_per_sample_chunk]; self.raw_buff_size
let mut pipe = match &mut self_ptr.cmd.stdout { }
Some(x) => x,
None => {return Err(Box::new(util::DecoderError::BrokenOutPipe))} fn media_type(&self) -> util::RawMediaType {
}; self.raw_sample_type
pipe.read_exact(pcm_buff.as_mut_slice())?;
return Ok(util::RawMedia {media_type: util::RawMediaType::PCM16BE, sample: pcm_buff});
} }
} }
@ -42,5 +34,5 @@ pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Erro
.args(["-f", "2", "-w", "-q", "-"]) .args(["-f", "2", "-w", "-q", "-"])
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample_chunk = 1024 * 2 * metadata.audio.channels as usize; let bytes_per_sample_chunk = 1024 * 2 * metadata.audio.channels as usize;
return Ok(AACDecoder {cmd: cmd, bytes_per_sample_chunk: bytes_per_sample_chunk}) return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16BE})
} }

View file

@ -3,36 +3,60 @@ pub mod audio;
use std::sync::mpsc; use std::sync::mpsc;
use std::error::Error; use std::error::Error;
use std::process::{ChildStdin, ChildStdout};
use std::io::{Read, Write};
use std::thread; use std::thread;
use std::io;
use crate::util; use crate::util;
pub trait Decoder { pub trait Decoder {
fn close_clean(&self); fn stdin(&mut self) -> ChildStdin;
fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>>;
fn write_loop(&self, chan_in: mpsc::Receiver<util::NALUPacket>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>) { fn stdout(&mut self) -> ChildStdout;
loop {
let nalu = match chan_in.recv() { fn raw_buff_size(&self) -> usize;
Ok(x) => x,
Err(err) => {self.close_clean(); thread::park(); return} fn media_type(&self) -> util::RawMediaType;
}; }
match self.write_nalu(nalu) {
Ok(_) => (), fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
Err(err) => {util::thread_freeze(err_in, err); return} cmd_in.write_all(nalu.packet_data.as_slice())?;
} Ok(())
} }
}
fn read_raw(&self) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>>; fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
fn read_loop(&self, chan_out: mpsc::Sender<util::RawMedia>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>) { let mut raw_buff = vec![0u8; raw_buff_size];
loop { match cmd_out.read_exact(raw_buff.as_mut_slice()) {
let raw_sample = match self.read_raw() { Ok(_) => (),
Ok(x) => x, Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DecoderError::EOF)),
Err(err) => {util::thread_freeze(err_in, err); return} Err(err) => return Err(Box::new(err))
}; };
match chan_out.send(raw_sample) { Ok(util::RawMedia {media_type: media_type, sample: raw_buff})
Ok(_) => (), }
Err(err) => {util::thread_freeze(err_in, Box::new(err)); return}
} pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
loop {
let nalu = match c_in.recv() {
Ok(x) => x,
Err(err) => {drop(cmd_in); thread::park();return}
};
match write_nalu(&mut cmd_in, nalu) {
Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, err); return}
}
}
}
pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: util::RawMediaType, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
loop {
let media = match read_raw(&mut cmd_out, buff_size, media_type) {
Ok(x) => x,
Err(err) => {drop(c_out); thread::park(); return}
};
match c_out.send(media) {
Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, Box::new(err)); return}
} }
} }
} }

View file

@ -1,49 +1,38 @@
use std::error::Error; use std::error::Error;
use std::process::{Child, Stdio, Command}; use std::process::{Child, Stdio, Command, ChildStdin, ChildStdout};
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::io::{Read, Write};
use std::fs::File;
use crate::decode::codecs::Decoder; use crate::decode::codecs::Decoder;
use crate::util; use crate::util;
pub struct H264Decoder { pub struct H264Decoder {
cmd: Child, cmd: Child,
bytes_per_sample: usize raw_buff_size: usize,
raw_sample_type: util::RawMediaType,
} }
impl Decoder for Arc<Mutex<H264Decoder>> { impl Decoder for H264Decoder {
fn close_clean(&self) { fn stdin(&mut self) -> ChildStdin {
let mut self_ptr = self.lock().unwrap(); self.cmd.stdin.take().unwrap()
drop(self_ptr.cmd.stdin.take().unwrap());
} }
fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> { fn stdout(&mut self) -> ChildStdout {
let mut self_ptr = self.lock().unwrap(); self.cmd.stdout.take().unwrap()
let mut pipe = match &self_ptr.cmd.stdin {
Some(x) => x,
None => {return Err(Box::new(util::DecoderError::BrokenInPipe))}
};
pipe.write_all(nalu.packet_data.as_slice())?;
Ok(())
} }
fn read_raw(&self) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
let mut self_ptr = self.lock().unwrap(); fn raw_buff_size(&self) -> usize {
let mut yuv_buff = vec![0u8; self_ptr.bytes_per_sample]; self.raw_buff_size
let mut pipe = match &mut self_ptr.cmd.stdout { }
Some(x) => x,
None => {return Err(Box::new(util::DecoderError::BrokenOutPipe))} fn media_type(&self) -> util::RawMediaType {
}; self.raw_sample_type
pipe.read_exact(yuv_buff.as_mut_slice());
return Ok(util::RawMedia {media_type: util::RawMediaType::YUV420P, sample: yuv_buff});
} }
} }
pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> { pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> {
let f = File::create("out.yuv")?;
let cmd = Command::new("ffmpeg") let cmd = Command::new("ffmpeg")
.args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"]) .args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"])
.stdin(Stdio::piped()).stdout(f).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2;
return Ok(H264Decoder {cmd: cmd, bytes_per_sample: bytes_per_sample as usize}); return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P});
} }

View file

@ -1,7 +1,7 @@
mod cmds; mod cmds;
mod codecs; mod codecs;
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc};
use std::error::Error; use std::error::Error;
use std::thread; use std::thread;

View file

@ -59,6 +59,7 @@ pub struct NALUPacket {
pub packet_data: Vec<u8>, pub packet_data: Vec<u8>,
} }
#[derive(Copy, Clone)]
pub enum RawMediaType { pub enum RawMediaType {
YUV420P, YUV420P,
PCM16BE, PCM16BE,
@ -132,6 +133,7 @@ pub enum DecoderError {
CodecNotImplemented, CodecNotImplemented,
BrokenInPipe, BrokenInPipe,
BrokenOutPipe, BrokenOutPipe,
EOF,
} }
impl Error for DecoderError {} impl Error for DecoderError {}
@ -143,6 +145,7 @@ impl fmt::Debug for DecoderError {
DecoderError::CodecNotImplemented => write!(f, "Codec not recognized"), DecoderError::CodecNotImplemented => write!(f, "Codec not recognized"),
DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdin closed"), DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdin closed"),
DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdout closed"), DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdout closed"),
DecoderError::EOF => write!(f, "Decoder output closed, expected EOF"),
_ => write!(f, "Error not described yet") _ => write!(f, "Error not described yet")
} }
} }