From 0d65197ce9ab8f382b120657cabc4a2ba8728848 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Fri, 6 Oct 2023 13:05:09 +0500 Subject: [PATCH] demuxer data loop impl --- src/demux/flv.rs | 2 +- src/demux/input.rs | 25 ++++++++++++++++++++++++- src/util/mod.rs | 9 +++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/demux/flv.rs b/src/demux/flv.rs index b023383..97cb56e 100644 --- a/src/demux/flv.rs +++ b/src/demux/flv.rs @@ -74,7 +74,7 @@ impl input::FileReader for FLVReader { self.skip_init_header()?; return self.read_metadata() } - fn read_nalu(&mut self) -> Result> { + fn read_nalu(&mut self, metadata: &util::Metadata) -> Result> { todo!(); } } diff --git a/src/demux/input.rs b/src/demux/input.rs index 26d8f06..a4dece6 100644 --- a/src/demux/input.rs +++ b/src/demux/input.rs @@ -1,13 +1,36 @@ use std::error::Error; use std::io; use std::io::Read; +use std::sync::mpsc; use crate::util; use crate::demux::flv; pub trait FileReader { fn init(&mut self) -> Result>; - fn read_nalu(&mut self) -> Result>; + fn read_nalu(&mut self, metadata: &util::Metadata) -> Result>; + fn data_loop(&mut self, v_in: mpsc::Sender, a_in: mpsc::Sender, metadata: &util::Metadata, err_in: mpsc::Sender>) { + loop { + let nalu = match self.read_nalu(metadata) { + Ok(x) => x, + Err(err) => {util::thread_freeze(err_in, err); return} + }; + match nalu.packet_type { + util::NALUPacketType::Audio => { + match a_in.send(nalu) { + Ok(_) => (), + Err(err) => {util::thread_freeze(err_in, Box::new(err)); return} + } + } + util::NALUPacketType::Video => { + match v_in.send(nalu) { + Ok(_) => (), + Err(err) => {util::thread_freeze(err_in, Box::new(err)); return} + } + } + } + } + } } pub fn new_reader() -> Result> { diff --git a/src/util/mod.rs b/src/util/mod.rs index c5eba22..f6a3ce2 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,5 +1,7 @@ use std::error::Error; use std::fmt; +use std::thread; +use std::sync::mpsc; // Data structs/enums @@ -108,3 +110,10 @@ impl fmt::Display for DemuxerError { fmt::Debug::fmt(self, f) } } + +// funcs + +pub fn thread_freeze(err_sender: mpsc::Sender>, err: Box) { + err_sender.send(err).unwrap(); + thread::park(); +}