start of rework to allow read write concurrency in decoder

This commit is contained in:
Muaz Ahmad 2023-10-11 16:43:29 +05:00
parent 78e0e5c7cc
commit 6cf6bba795
5 changed files with 30 additions and 32 deletions

View file

@ -1,11 +1,12 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, mpsc};
use std::error::Error; use std::error::Error;
use std::thread;
use crate::util; use crate::util;
use crate::decode::codecs::Decoder; use crate::decode::codecs::Decoder;
use crate::decode::codecs; use crate::decode::codecs;
pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder + Clone + Send + 'static, impl Decoder + Clone + Send + 'static), Box<dyn Error>> { pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> {
let v = match metadata.video.codec { let v = match metadata.video.codec {
Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?, Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?,
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
@ -14,5 +15,17 @@ pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder + Clone + Se
Some(util::AudioCodec::AAC) => codecs::audio::new_aac(metadata)?, Some(util::AudioCodec::AAC) => codecs::audio::new_aac(metadata)?,
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
}; };
return Ok((Arc::new(Mutex::new(v)), Arc::new(Mutex::new(a)))); return Ok((v, a));
}
pub fn handle_decoder(decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
let mut stdin = decoder.stdin();
let mut stdout = decoder.stdout();
let c_err2 = c_err.clone();
thread::spawn(move || {
read_nalu_loop(stdin, c_in, c_err);
});
thread::spawn(move || {
write_raw_loop(stdout, c_out, c_err2);
});
} }

View file

@ -6,12 +6,11 @@ use std::error::Error;
use std::thread; use std::thread;
use crate::util; use crate::util;
use crate::decode::codecs::Decoder;
pub fn spawn( pub fn spawn(
v: mpsc::Receiver<util::NALUPacket>, v: mpsc::Receiver<util::NALUPacket>,
a: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>,
metadata: &Arc<util::Metadata> metadata: Arc<util::Metadata>
) -> Result< ) -> Result<
( (
mpsc::Receiver<util::RawMedia>, mpsc::Receiver<util::RawMedia>,
@ -22,28 +21,14 @@ pub fn spawn(
> { > {
let (raw_v_in, raw_v_out) = mpsc::channel(); let (raw_v_in, raw_v_out) = mpsc::channel();
let (raw_a_in, raw_a_out) = mpsc::channel(); let (raw_a_in, raw_a_out) = mpsc::channel();
let (err_in, err_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel();
let err_in_a = err_in_v.clone();
let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?; let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?;
spawn_threads(v, raw_v_in, v_decoder, a, raw_a_in, a_decoder, err_in); thread::spawn(move || {
cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v);
});
thread::spawn(move || {
cmds::handle_decoder(a_decoder, a, raw_a_in, err_in_a);
});
return Ok((raw_v_out, raw_a_out, err_out)); return Ok((raw_v_out, raw_a_out, err_out));
} }
fn spawn_threads(
v: mpsc::Receiver<util::NALUPacket>,
raw_v_in: mpsc::Sender<util::RawMedia>,
v_decoder: impl Decoder + Clone + Send + 'static,
a: mpsc::Receiver<util::NALUPacket>,
raw_a_in: mpsc::Sender<util::RawMedia>,
a_decoder: impl Decoder + Clone + Send + 'static,
err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>
) {
let err_clone = err_in.clone();
let tmp_v = v_decoder.clone();
let tmp_a = a_decoder.clone();
thread::spawn(move || {
tmp_v.write_loop(v, err_clone);
});
thread::spawn(move || {
tmp_a.write_loop(a, err_in);
});
}

View file

@ -7,7 +7,7 @@ use crate::util;
pub fn spawn( pub fn spawn(
v: mpsc::Receiver<util::RawMedia>, v: mpsc::Receiver<util::RawMedia>,
a: mpsc::Receiver<util::RawMedia>, a: mpsc::Receiver<util::RawMedia>,
metadata: &Arc<util::Metadata> metadata: Arc<util::Metadata>
) -> Result< ) -> Result<
( (
mpsc::Receiver<util::NALUPacket>, mpsc::Receiver<util::NALUPacket>,

View file

@ -11,9 +11,9 @@ use std::time::Duration;
fn init() -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>>{ fn init() -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>>{
let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?; let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?;
let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, &metadata)?; let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?;
let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, &metadata)?; let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone())?;
let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, &metadata)?; let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone())?;
return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]); return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]);
} }

View file

@ -7,7 +7,7 @@ use crate::util;
pub fn spawn( pub fn spawn(
v: mpsc::Receiver<util::NALUPacket>, v: mpsc::Receiver<util::NALUPacket>,
a: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>,
metadata: &Arc<util::Metadata> metadata: Arc<util::Metadata>
) -> Result< ) -> Result<
mpsc::Receiver<Box<dyn Error + Send + Sync>>, mpsc::Receiver<Box<dyn Error + Send + Sync>>,
Box<dyn Error> Box<dyn Error>