From 89fc3c63db04bd37b3492d368f424dc74635c46d Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Mon, 23 Oct 2023 14:46:24 +0500 Subject: [PATCH] using multithreading to push in samples --- src/muxer/hls/mp4/mod.rs | 26 +++++++++++------------ src/muxer/hls/mp4/mp4muxer.rs | 39 ++++++++++++++++++++++++++++++++--- src/muxer/hls/mp4/samples.rs | 8 ++++--- src/util/mod.rs | 2 ++ 4 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/muxer/hls/mp4/mod.rs b/src/muxer/hls/mp4/mod.rs index 9ca103b..b7b6dba 100644 --- a/src/muxer/hls/mp4/mod.rs +++ b/src/muxer/hls/mp4/mod.rs @@ -2,34 +2,34 @@ mod mp4muxer; mod atoms; mod samples; -use std::sync::{mpsc, Arc}; +use std::sync::{mpsc, Arc, Mutex}; use std::error::Error; use crate::util; pub struct MP4Muxer { - v: mpsc::Receiver, - v_samples: samples::SampleQueue, - a: mpsc::Receiver, - a_samples: samples::SampleQueue, + v: Option>, + v_samples: Arc, + a: Option>, + a_samples: Arc, metadata: Arc, err_in: mpsc::Sender>, } pub fn new_muxer(v: mpsc::Receiver, a: mpsc::Receiver, metadata: Arc, err_in: mpsc::Sender>) -> Result> { let mut muxer = MP4Muxer { - v: v, - a: a, + v: Some(v), + a: Some(a), metadata: metadata, err_in: err_in, - v_samples: samples::SampleQueue { - queue: Vec::new(), + v_samples: Arc::new(samples::SampleQueue { + queue: Mutex::new(Vec::new()), default_duration: 1000, - }, - a_samples: samples::SampleQueue { - queue: Vec::new(), + }), + a_samples: Arc::new(samples::SampleQueue { + queue: Mutex::new(Vec::new()), default_duration: 960, - }, + }), }; muxer.gen_init()?; return Ok(muxer); diff --git a/src/muxer/hls/mp4/mp4muxer.rs b/src/muxer/hls/mp4/mp4muxer.rs index f4a7740..ae18955 100644 --- a/src/muxer/hls/mp4/mp4muxer.rs +++ b/src/muxer/hls/mp4/mp4muxer.rs @@ -1,5 +1,6 @@ use std::error::Error; use std::sync::Arc; +use std::thread; use crate::util; use crate::muxer::hls::mp4; @@ -15,7 +16,7 @@ impl mp4::MP4Muxer { } fn handle_v_cc(&mut self) -> Result> { - let v_cc = self.v.recv()?; + let v_cc = self.v.as_ref().expect("must be owned at init").recv()?; return match v_cc.packet_type { util::NALUPacketType::Video(util::VideoCodec::AV1) => { self.v_samples.push(v_cc.packet_data.clone(), 0x02000000); @@ -26,15 +27,47 @@ impl mp4::MP4Muxer { } fn handle_a_cc(&mut self) -> Result> { - let a_cc = self.a.recv()?; + let a_cc = self.a.as_ref().expect("must be owned at init").recv()?; return match a_cc.packet_type { util::NALUPacketType::Audio(util::AudioCodec::OPUS) => { - self.a.recv()?; + self.a.as_ref().expect("must be owned at init").recv()?; Ok(get_opus_stsd(a_cc.packet_data, &self.metadata)) }, _ => Err(Box::new(util::MuxerError::InvalidCodec)) } } + + pub fn spawn_read_loops(&mut self) { + let (mut v, mut a) = (self.v.take().unwrap(), self.a.take().unwrap()); + let v_queue = self.v_samples.clone(); + let a_queue = self.a_samples.clone(); + let (err_in1, err_in2) = (self.err_in.clone(), self.err_in.clone()); + + thread::spawn(move || { + let mut i = 1; + loop { + match v.recv() { + Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});}, + Err(_) => { + util::thread_freeze(err_in1, Box::new(util::MuxerError::EOF)); + return + } + } + } + }); + + thread::spawn(move || { + loop { + match a.recv() { + Ok(x) => {a_queue.push(x.packet_data, 0x01010000);}, + Err(_) => { + util::thread_freeze(err_in2, Box::new(util::MuxerError::EOF)); + return + } + } + } + }); + } } fn get_av1_stsd(mut sample: Vec, metadata: &Arc) -> mp4::atoms::STSD { diff --git a/src/muxer/hls/mp4/samples.rs b/src/muxer/hls/mp4/samples.rs index db5c512..1b8e447 100644 --- a/src/muxer/hls/mp4/samples.rs +++ b/src/muxer/hls/mp4/samples.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, Mutex}; + pub struct Sample { pub data: Vec, pub size: u32, @@ -5,13 +7,13 @@ pub struct Sample { } pub struct SampleQueue { - pub queue: Vec, + pub queue: Mutex>, pub default_duration: u32, } impl SampleQueue { - pub fn push(&mut self, data: Vec, flags: u32) { - self.queue.push(Sample { + pub fn push(&self, data: Vec, flags: u32) { + self.queue.lock().unwrap().push(Sample { size: data.len() as u32, data: data, flags: flags, diff --git a/src/util/mod.rs b/src/util/mod.rs index 8e1c94d..a70aaab 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -198,6 +198,7 @@ impl fmt::Display for EncoderError { pub enum MuxerError { InvalidCodec, CCParseError, + EOF, } impl Error for MuxerError {} @@ -208,6 +209,7 @@ impl fmt::Debug for MuxerError { match self { MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"), MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"), + MuxerError::EOF => write!(f, "input EOF"), _ => write!(f, "Error not described yet") } }