From 0b910df3898f0aea5343fb957c127c28c0c8667e Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Thu, 12 Oct 2023 12:45:38 +0500 Subject: [PATCH] async read write rework done (in principle) --- src/decode/cmds.rs | 13 ++++--- src/decode/codecs/audio.rs | 44 ++++++++++------------- src/decode/codecs/mod.rs | 74 +++++++++++++++++++++++++------------- src/decode/codecs/video.rs | 47 ++++++++++-------------- src/decode/mod.rs | 2 +- src/util/mod.rs | 3 ++ 6 files changed, 97 insertions(+), 86 deletions(-) diff --git a/src/decode/cmds.rs b/src/decode/cmds.rs index 861f157..9bc5fcd 100644 --- a/src/decode/cmds.rs +++ b/src/decode/cmds.rs @@ -18,14 +18,17 @@ pub fn spawn(metadata: Arc) -> Result<(impl Decoder, impl Decode return Ok((v, a)); } -pub fn handle_decoder(decoder: impl Decoder, c_in: mpsc::Receiver, c_out: mpsc::Sender, c_err: mpsc::Sender>) { - let mut stdin = decoder.stdin(); - let mut stdout = decoder.stdout(); +pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver, c_out: mpsc::Sender, c_err: mpsc::Sender>) { + let stdin = decoder.stdin(); + let stdout = decoder.stdout(); + let raw_buff_size = decoder.raw_buff_size(); + let raw_sample_type = decoder.media_type(); let c_err2 = c_err.clone(); thread::spawn(move || { - read_nalu_loop(stdin, c_in, c_err); + codecs::read_nalu_loop(stdin, c_in, c_err); }); thread::spawn(move || { - write_raw_loop(stdout, c_out, c_err2); + codecs::write_raw_loop(stdout, raw_buff_size, raw_sample_type, c_out, c_err2); }); } + diff --git a/src/decode/codecs/audio.rs b/src/decode/codecs/audio.rs index c7d8471..a13123f 100644 --- a/src/decode/codecs/audio.rs +++ b/src/decode/codecs/audio.rs @@ -1,39 +1,31 @@ use std::error::Error; -use std::process::{Child, Command, Stdio}; -use std::sync::{Arc, Mutex}; -use std::io::{Read, Write}; +use std::process::{Child, Command, Stdio, ChildStdin, ChildStdout}; +use std::sync::Arc; use crate::decode::codecs::Decoder; use crate::util; pub struct AACDecoder { cmd: Child, - bytes_per_sample_chunk: usize + raw_buff_size: usize, + raw_sample_type: util::RawMediaType, } -impl Decoder for Arc> { - fn close_clean(&self) { - let mut self_ptr = self.lock().unwrap(); - drop(self_ptr.cmd.stdin.take().unwrap()); +impl Decoder for AACDecoder { + fn stdin(&mut self) -> ChildStdin { + self.cmd.stdin.take().unwrap() } - fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box> { - let mut self_ptr = self.lock().unwrap(); - let mut pipe = match &self_ptr.cmd.stdin { - Some(x) => x, - None => {return Err(Box::new(util::DecoderError::BrokenInPipe))} - }; - pipe.write_all(nalu.packet_data.as_slice())?; - Ok(()) + + fn stdout(&mut self) -> ChildStdout { + self.cmd.stdout.take().unwrap() } - fn read_raw(&self) -> Result> { - let mut self_ptr = self.lock().unwrap(); - let mut pcm_buff = vec![0u8; self_ptr.bytes_per_sample_chunk]; - let mut pipe = match &mut self_ptr.cmd.stdout { - Some(x) => x, - None => {return Err(Box::new(util::DecoderError::BrokenOutPipe))} - }; - pipe.read_exact(pcm_buff.as_mut_slice())?; - return Ok(util::RawMedia {media_type: util::RawMediaType::PCM16BE, sample: pcm_buff}); + + fn raw_buff_size(&self) -> usize { + self.raw_buff_size + } + + fn media_type(&self) -> util::RawMediaType { + self.raw_sample_type } } @@ -42,5 +34,5 @@ pub fn new_aac(metadata: Arc) -> Result Result<(), Box>; - fn write_loop(&self, chan_in: mpsc::Receiver, err_in: mpsc::Sender>) { - loop { - let nalu = match chan_in.recv() { - Ok(x) => x, - Err(err) => {self.close_clean(); thread::park(); return} - }; - match self.write_nalu(nalu) { - Ok(_) => (), - Err(err) => {util::thread_freeze(err_in, err); return} - } - } - } - fn read_raw(&self) -> Result>; - fn read_loop(&self, chan_out: mpsc::Sender, err_in: mpsc::Sender>) { - loop { - let raw_sample = match self.read_raw() { - Ok(x) => x, - Err(err) => {util::thread_freeze(err_in, err); return} - }; - match chan_out.send(raw_sample) { - Ok(_) => (), - Err(err) => {util::thread_freeze(err_in, Box::new(err)); return} - } + fn stdin(&mut self) -> ChildStdin; + + fn stdout(&mut self) -> ChildStdout; + + fn raw_buff_size(&self) -> usize; + + fn media_type(&self) -> util::RawMediaType; +} + +fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box> { + cmd_in.write_all(nalu.packet_data.as_slice())?; + Ok(()) +} + +fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result> { + let mut raw_buff = vec![0u8; raw_buff_size]; + match cmd_out.read_exact(raw_buff.as_mut_slice()) { + Ok(_) => (), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DecoderError::EOF)), + Err(err) => return Err(Box::new(err)) + }; + Ok(util::RawMedia {media_type: media_type, sample: raw_buff}) +} + +pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver, c_err: mpsc::Sender>) { + loop { + let nalu = match c_in.recv() { + Ok(x) => x, + Err(err) => {drop(cmd_in); thread::park();return} + }; + match write_nalu(&mut cmd_in, nalu) { + Ok(_) => (), + Err(err) => {util::thread_freeze(c_err, err); return} + } + } +} + +pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: util::RawMediaType, c_out: mpsc::Sender, c_err: mpsc::Sender>) { + loop { + let media = match read_raw(&mut cmd_out, buff_size, media_type) { + Ok(x) => x, + Err(err) => {drop(c_out); thread::park(); return} + }; + match c_out.send(media) { + Ok(_) => (), + Err(err) => {util::thread_freeze(c_err, Box::new(err)); return} } } } diff --git a/src/decode/codecs/video.rs b/src/decode/codecs/video.rs index 5720cc5..0652ecf 100644 --- a/src/decode/codecs/video.rs +++ b/src/decode/codecs/video.rs @@ -1,49 +1,38 @@ use std::error::Error; -use std::process::{Child, Stdio, Command}; -use std::sync::{Arc, Mutex}; -use std::io::{Read, Write}; -use std::fs::File; +use std::process::{Child, Stdio, Command, ChildStdin, ChildStdout}; +use std::sync::Arc; use crate::decode::codecs::Decoder; use crate::util; pub struct H264Decoder { cmd: Child, - bytes_per_sample: usize + raw_buff_size: usize, + raw_sample_type: util::RawMediaType, } -impl Decoder for Arc> { - fn close_clean(&self) { - let mut self_ptr = self.lock().unwrap(); - drop(self_ptr.cmd.stdin.take().unwrap()); +impl Decoder for H264Decoder { + fn stdin(&mut self) -> ChildStdin { + self.cmd.stdin.take().unwrap() } - fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box> { - let mut self_ptr = self.lock().unwrap(); - let mut pipe = match &self_ptr.cmd.stdin { - Some(x) => x, - None => {return Err(Box::new(util::DecoderError::BrokenInPipe))} - }; - pipe.write_all(nalu.packet_data.as_slice())?; - Ok(()) + fn stdout(&mut self) -> ChildStdout { + self.cmd.stdout.take().unwrap() } - fn read_raw(&self) -> Result> { - let mut self_ptr = self.lock().unwrap(); - let mut yuv_buff = vec![0u8; self_ptr.bytes_per_sample]; - let mut pipe = match &mut self_ptr.cmd.stdout { - Some(x) => x, - None => {return Err(Box::new(util::DecoderError::BrokenOutPipe))} - }; - pipe.read_exact(yuv_buff.as_mut_slice()); - return Ok(util::RawMedia {media_type: util::RawMediaType::YUV420P, sample: yuv_buff}); + + fn raw_buff_size(&self) -> usize { + self.raw_buff_size + } + + fn media_type(&self) -> util::RawMediaType { + self.raw_sample_type } } pub fn new_h264(metadata: Arc) -> Result> { - let f = File::create("out.yuv")?; let cmd = Command::new("ffmpeg") .args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"]) - .stdin(Stdio::piped()).stdout(f).spawn()?; + .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; - return Ok(H264Decoder {cmd: cmd, bytes_per_sample: bytes_per_sample as usize}); + return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P}); } diff --git a/src/decode/mod.rs b/src/decode/mod.rs index 50d6bb0..7058541 100644 --- a/src/decode/mod.rs +++ b/src/decode/mod.rs @@ -1,7 +1,7 @@ mod cmds; mod codecs; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc}; use std::error::Error; use std::thread; diff --git a/src/util/mod.rs b/src/util/mod.rs index 47d3f05..a6ae57d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -59,6 +59,7 @@ pub struct NALUPacket { pub packet_data: Vec, } +#[derive(Copy, Clone)] pub enum RawMediaType { YUV420P, PCM16BE, @@ -132,6 +133,7 @@ pub enum DecoderError { CodecNotImplemented, BrokenInPipe, BrokenOutPipe, + EOF, } impl Error for DecoderError {} @@ -143,6 +145,7 @@ impl fmt::Debug for DecoderError { DecoderError::CodecNotImplemented => write!(f, "Codec not recognized"), DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdin closed"), DecoderError::BrokenInPipe => write!(f, "Decoder has invalid piping, stdout closed"), + DecoderError::EOF => write!(f, "Decoder output closed, expected EOF"), _ => write!(f, "Error not described yet") } }