From 6cf6bba795a49c22585defebfac7588acb70d185 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Wed, 11 Oct 2023 16:43:29 +0500 Subject: [PATCH] start of rework to allow read write concurrency in decoder --- src/decode/cmds.rs | 19 ++++++++++++++++--- src/decode/mod.rs | 33 +++++++++------------------------ src/encode/mod.rs | 2 +- src/main.rs | 6 +++--- src/muxer/mod.rs | 2 +- 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/decode/cmds.rs b/src/decode/cmds.rs index 23c73a6..861f157 100644 --- a/src/decode/cmds.rs +++ b/src/decode/cmds.rs @@ -1,11 +1,12 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, mpsc}; use std::error::Error; +use std::thread; use crate::util; use crate::decode::codecs::Decoder; use crate::decode::codecs; -pub fn spawn(metadata: Arc) -> Result<(impl Decoder + Clone + Send + 'static, impl Decoder + Clone + Send + 'static), Box> { +pub fn spawn(metadata: Arc) -> Result<(impl Decoder, impl Decoder), Box> { let v = match metadata.video.codec { Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?, _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} @@ -14,5 +15,17 @@ pub fn spawn(metadata: Arc) -> Result<(impl Decoder + Clone + Se Some(util::AudioCodec::AAC) => codecs::audio::new_aac(metadata)?, _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} }; - return Ok((Arc::new(Mutex::new(v)), Arc::new(Mutex::new(a)))); + return Ok((v, a)); +} + +pub fn handle_decoder(decoder: impl Decoder, c_in: mpsc::Receiver, c_out: mpsc::Sender, c_err: mpsc::Sender>) { + let mut stdin = decoder.stdin(); + let mut stdout = decoder.stdout(); + let c_err2 = c_err.clone(); + thread::spawn(move || { + read_nalu_loop(stdin, c_in, c_err); + }); + thread::spawn(move || { + write_raw_loop(stdout, c_out, c_err2); + }); } diff --git a/src/decode/mod.rs b/src/decode/mod.rs index 69f6f42..50d6bb0 100644 --- a/src/decode/mod.rs +++ b/src/decode/mod.rs @@ -6,12 +6,11 @@ use std::error::Error; use std::thread; use crate::util; -use crate::decode::codecs::Decoder; pub fn spawn( v: mpsc::Receiver, a: mpsc::Receiver, - metadata: &Arc + metadata: Arc ) -> Result< ( mpsc::Receiver, @@ -22,28 +21,14 @@ pub fn spawn( > { let (raw_v_in, raw_v_out) = mpsc::channel(); let (raw_a_in, raw_a_out) = mpsc::channel(); - let (err_in, err_out) = mpsc::channel(); + let (err_in_v, err_out) = mpsc::channel(); + let err_in_a = err_in_v.clone(); let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?; - spawn_threads(v, raw_v_in, v_decoder, a, raw_a_in, a_decoder, err_in); + thread::spawn(move || { + cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v); + }); + thread::spawn(move || { + cmds::handle_decoder(a_decoder, a, raw_a_in, err_in_a); + }); return Ok((raw_v_out, raw_a_out, err_out)); } - -fn spawn_threads( - v: mpsc::Receiver, - raw_v_in: mpsc::Sender, - v_decoder: impl Decoder + Clone + Send + 'static, - a: mpsc::Receiver, - raw_a_in: mpsc::Sender, - a_decoder: impl Decoder + Clone + Send + 'static, - err_in: mpsc::Sender> -) { - let err_clone = err_in.clone(); - let tmp_v = v_decoder.clone(); - let tmp_a = a_decoder.clone(); - thread::spawn(move || { - tmp_v.write_loop(v, err_clone); - }); - thread::spawn(move || { - tmp_a.write_loop(a, err_in); - }); -} diff --git a/src/encode/mod.rs b/src/encode/mod.rs index efcb990..8553ec6 100644 --- a/src/encode/mod.rs +++ b/src/encode/mod.rs @@ -7,7 +7,7 @@ use crate::util; pub fn spawn( v: mpsc::Receiver, a: mpsc::Receiver, - metadata: &Arc + metadata: Arc ) -> Result< ( mpsc::Receiver, diff --git a/src/main.rs b/src/main.rs index 9f4336c..ea0f692 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,9 +11,9 @@ use std::time::Duration; fn init() -> Result<[mpsc::Receiver>; 4], Box>{ let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?; - let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, &metadata)?; - let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, &metadata)?; - let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, &metadata)?; + let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?; + let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone())?; + let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone())?; return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]); } diff --git a/src/muxer/mod.rs b/src/muxer/mod.rs index 3d79825..ad1bddf 100644 --- a/src/muxer/mod.rs +++ b/src/muxer/mod.rs @@ -7,7 +7,7 @@ use crate::util; pub fn spawn( v: mpsc::Receiver, a: mpsc::Receiver, - metadata: &Arc + metadata: Arc ) -> Result< mpsc::Receiver>, Box