diff --git a/src/decode/codecs/audio.rs b/src/decode/codecs/audio.rs index 09a270d..375f1a3 100644 --- a/src/decode/codecs/audio.rs +++ b/src/decode/codecs/audio.rs @@ -13,6 +13,10 @@ pub struct AACDecoder { } impl Decoder for Arc> { + fn close_clean(&self) { + let mut self_ptr = self.lock().unwrap(); + drop(self_ptr.cmd.stdin.take().unwrap()); + } fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box> { let mut self_ptr = self.lock().unwrap(); let mut pipe = match &self_ptr.cmd.stdin { diff --git a/src/decode/codecs/mod.rs b/src/decode/codecs/mod.rs index a1b2ad7..a081da3 100644 --- a/src/decode/codecs/mod.rs +++ b/src/decode/codecs/mod.rs @@ -3,16 +3,18 @@ pub mod audio; use std::sync::mpsc; use std::error::Error; +use std::thread; use crate::util; pub trait Decoder { + fn close_clean(&self); fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box>; fn write_loop(&self, chan_in: mpsc::Receiver, err_in: mpsc::Sender>) { loop { let nalu = match chan_in.recv() { Ok(x) => x, - Err(err) => {util::thread_freeze(err_in, Box::new(err)); return} + Err(err) => {self.close_clean(); thread::park(); return} }; match self.write_nalu(nalu) { Ok(_) => (), diff --git a/src/decode/codecs/video.rs b/src/decode/codecs/video.rs index e49118b..70ea6d3 100644 --- a/src/decode/codecs/video.rs +++ b/src/decode/codecs/video.rs @@ -13,6 +13,11 @@ pub struct H264Decoder { } impl Decoder for Arc> { + fn close_clean(&self) { + let mut self_ptr = self.lock().unwrap(); + drop(self_ptr.cmd.stdin.take().unwrap()); + } + fn write_nalu(&self, nalu: util::NALUPacket) -> Result<(), Box> { let mut self_ptr = self.lock().unwrap(); let mut pipe = match &self_ptr.cmd.stdin { diff --git a/src/demux/input.rs b/src/demux/input.rs index 58e2640..51990b4 100644 --- a/src/demux/input.rs +++ b/src/demux/input.rs @@ -2,6 +2,7 @@ use std::error::Error; use std::io; use std::io::Read; use std::sync::{mpsc, Arc}; +use std::thread; use crate::util; use crate::demux::flv; @@ -13,7 +14,21 @@ pub trait FileReader { loop { let nalu = match self.read_nalu(&metadata) { Ok(x) => x, - Err(err) => {util::thread_freeze(err_in, err); return} + Err(err) => { + if let Some(e) = err.downcast_ref::() { + match e { + util::DemuxerError::EOF => { + drop(v_in); + drop(a_in); + thread::park(); + return + }, + _ => () + } + }; + util::thread_freeze(err_in, err); + return + } }; match nalu.packet_type { util::NALUPacketType::Audio => {