using multithreading to push in samples

This commit is contained in:
Muaz Ahmad 2023-10-23 14:46:24 +05:00
parent b2c2163928
commit 89fc3c63db
4 changed files with 56 additions and 19 deletions

View file

@ -2,34 +2,34 @@ mod mp4muxer;
mod atoms; mod atoms;
mod samples; mod samples;
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc, Mutex};
use std::error::Error; use std::error::Error;
use crate::util; use crate::util;
pub struct MP4Muxer { pub struct MP4Muxer {
v: mpsc::Receiver<util::NALUPacket>, v: Option<mpsc::Receiver<util::NALUPacket>>,
v_samples: samples::SampleQueue, v_samples: Arc<samples::SampleQueue>,
a: mpsc::Receiver<util::NALUPacket>, a: Option<mpsc::Receiver<util::NALUPacket>>,
a_samples: samples::SampleQueue, a_samples: Arc<samples::SampleQueue>,
metadata: Arc<util::Metadata>, metadata: Arc<util::Metadata>,
err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>,
} }
pub fn new_muxer(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, metadata: Arc<util::Metadata>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>) -> Result<MP4Muxer, Box<dyn Error>> { pub fn new_muxer(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, metadata: Arc<util::Metadata>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>) -> Result<MP4Muxer, Box<dyn Error>> {
let mut muxer = MP4Muxer { let mut muxer = MP4Muxer {
v: v, v: Some(v),
a: a, a: Some(a),
metadata: metadata, metadata: metadata,
err_in: err_in, err_in: err_in,
v_samples: samples::SampleQueue { v_samples: Arc::new(samples::SampleQueue {
queue: Vec::new(), queue: Mutex::new(Vec::new()),
default_duration: 1000, default_duration: 1000,
}, }),
a_samples: samples::SampleQueue { a_samples: Arc::new(samples::SampleQueue {
queue: Vec::new(), queue: Mutex::new(Vec::new()),
default_duration: 960, default_duration: 960,
}, }),
}; };
muxer.gen_init()?; muxer.gen_init()?;
return Ok(muxer); return Ok(muxer);

View file

@ -1,5 +1,6 @@
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use crate::util; use crate::util;
use crate::muxer::hls::mp4; use crate::muxer::hls::mp4;
@ -15,7 +16,7 @@ impl mp4::MP4Muxer {
} }
fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
let v_cc = self.v.recv()?; let v_cc = self.v.as_ref().expect("must be owned at init").recv()?;
return match v_cc.packet_type { return match v_cc.packet_type {
util::NALUPacketType::Video(util::VideoCodec::AV1) => { util::NALUPacketType::Video(util::VideoCodec::AV1) => {
self.v_samples.push(v_cc.packet_data.clone(), 0x02000000); self.v_samples.push(v_cc.packet_data.clone(), 0x02000000);
@ -26,15 +27,47 @@ impl mp4::MP4Muxer {
} }
fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
let a_cc = self.a.recv()?; let a_cc = self.a.as_ref().expect("must be owned at init").recv()?;
return match a_cc.packet_type { return match a_cc.packet_type {
util::NALUPacketType::Audio(util::AudioCodec::OPUS) => { util::NALUPacketType::Audio(util::AudioCodec::OPUS) => {
self.a.recv()?; self.a.as_ref().expect("must be owned at init").recv()?;
Ok(get_opus_stsd(a_cc.packet_data, &self.metadata)) Ok(get_opus_stsd(a_cc.packet_data, &self.metadata))
}, },
_ => Err(Box::new(util::MuxerError::InvalidCodec)) _ => Err(Box::new(util::MuxerError::InvalidCodec))
} }
} }
pub fn spawn_read_loops(&mut self) {
let (mut v, mut a) = (self.v.take().unwrap(), self.a.take().unwrap());
let v_queue = self.v_samples.clone();
let a_queue = self.a_samples.clone();
let (err_in1, err_in2) = (self.err_in.clone(), self.err_in.clone());
thread::spawn(move || {
let mut i = 1;
loop {
match v.recv() {
Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});},
Err(_) => {
util::thread_freeze(err_in1, Box::new(util::MuxerError::EOF));
return
}
}
}
});
thread::spawn(move || {
loop {
match a.recv() {
Ok(x) => {a_queue.push(x.packet_data, 0x01010000);},
Err(_) => {
util::thread_freeze(err_in2, Box::new(util::MuxerError::EOF));
return
}
}
}
});
}
} }
fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD { fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD {

View file

@ -1,3 +1,5 @@
use std::sync::{Arc, Mutex};
pub struct Sample { pub struct Sample {
pub data: Vec<u8>, pub data: Vec<u8>,
pub size: u32, pub size: u32,
@ -5,13 +7,13 @@ pub struct Sample {
} }
pub struct SampleQueue { pub struct SampleQueue {
pub queue: Vec<Sample>, pub queue: Mutex<Vec<Sample>>,
pub default_duration: u32, pub default_duration: u32,
} }
impl SampleQueue { impl SampleQueue {
pub fn push(&mut self, data: Vec<u8>, flags: u32) { pub fn push(&self, data: Vec<u8>, flags: u32) {
self.queue.push(Sample { self.queue.lock().unwrap().push(Sample {
size: data.len() as u32, size: data.len() as u32,
data: data, data: data,
flags: flags, flags: flags,

View file

@ -198,6 +198,7 @@ impl fmt::Display for EncoderError {
pub enum MuxerError { pub enum MuxerError {
InvalidCodec, InvalidCodec,
CCParseError, CCParseError,
EOF,
} }
impl Error for MuxerError {} impl Error for MuxerError {}
@ -208,6 +209,7 @@ impl fmt::Debug for MuxerError {
match self { match self {
MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"), MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"),
MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"), MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"),
MuxerError::EOF => write!(f, "input EOF"),
_ => write!(f, "Error not described yet") _ => write!(f, "Error not described yet")
} }
} }