diff --git a/src/decode/cmds.rs b/src/decode/cmds.rs index 9bc5fcd..edad85b 100644 --- a/src/decode/cmds.rs +++ b/src/decode/cmds.rs @@ -7,6 +7,7 @@ use crate::decode::codecs::Decoder; use crate::decode::codecs; pub fn spawn(metadata: Arc) -> Result<(impl Decoder, impl Decoder), Box> { + // create appropriate decoder implementation for input codec let v = match metadata.video.codec { Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?, _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} @@ -19,9 +20,10 @@ pub fn spawn(metadata: Arc) -> Result<(impl Decoder, impl Decode } pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver, c_out: mpsc::Sender, c_err: mpsc::Sender>) { + // take stdin and stdout for process to read from and write to on different threads let stdin = decoder.stdin(); let stdout = decoder.stdout(); - let raw_buff_size = decoder.raw_buff_size(); + let raw_buff_size = decoder.raw_buff_size(); // determined by input parameters and codec let raw_sample_type = decoder.media_type(); let c_err2 = c_err.clone(); thread::spawn(move || { diff --git a/src/decode/codecs/audio.rs b/src/decode/codecs/audio.rs index bf81a0c..b27e408 100644 --- a/src/decode/codecs/audio.rs +++ b/src/decode/codecs/audio.rs @@ -30,9 +30,10 @@ impl Decoder for AACDecoder { } pub fn new_aac(metadata: Arc) -> Result> { + // faad for AAC -> raw PCM 16-bit little-e, piped let cmd = Command::new("faad") .args(["-f", "2", "-w", "-q", "-"]) .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; - let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; + let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; // channels * 2 bytes per sample, * samplerate for bytes per second return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE}) } diff --git a/src/decode/codecs/mod.rs b/src/decode/codecs/mod.rs index 78cd5e7..cc538e3 100644 --- a/src/decode/codecs/mod.rs +++ b/src/decode/codecs/mod.rs @@ -20,11 +20,14 @@ pub trait Decoder { fn media_type(&self) -> util::RawMediaType; } +// write to decoder fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box> { - cmd_in.write_all(nalu.packet_data.as_slice())?; + cmd_in.write_all(nalu.packet_data.as_slice())?; // nalus should already be in bytestream + // format, send as is Ok(()) } +// read from decoder fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result> { let mut raw_buff = vec![0u8; raw_buff_size]; match cmd_out.read_exact(raw_buff.as_mut_slice()) { @@ -39,11 +42,14 @@ pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver x, - Err(err) => {drop(cmd_in); thread::park();return} + Err(err) => {drop(cmd_in); thread::park();return} // drop cmd_in to signal EOF to + // decoder, letting the decoder close + // stdout when done }; match write_nalu(&mut cmd_in, nalu) { Ok(_) => (), - Err(err) => {util::thread_freeze(c_err, err); return} + Err(err) => {util::thread_freeze(c_err, err); return} // pause thread, wait for main to + // exit } } } @@ -52,7 +58,7 @@ pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: ut loop { let media = match read_raw(&mut cmd_out, buff_size, media_type) { Ok(x) => x, - Err(err) => {drop(c_out); thread::park(); return} + Err(err) => {drop(c_out); thread::park(); return} // drop to signal EOF to encoder }; match c_out.send(media) { Ok(_) => (), diff --git a/src/decode/codecs/video.rs b/src/decode/codecs/video.rs index 0652ecf..6303d6e 100644 --- a/src/decode/codecs/video.rs +++ b/src/decode/codecs/video.rs @@ -30,9 +30,12 @@ impl Decoder for H264Decoder { } pub fn new_h264(metadata: Arc) -> Result> { + // ffmpeg pipe input output, H264 -> raw YUV420P sequence let cmd = Command::new("ffmpeg") .args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"]) .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; - let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; + let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; // size of single + // yuv is + // 1.5*num_pixels return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P}); } diff --git a/src/decode/mod.rs b/src/decode/mod.rs index 7058541..91ba914 100644 --- a/src/decode/mod.rs +++ b/src/decode/mod.rs @@ -22,7 +22,7 @@ pub fn spawn( let (raw_v_in, raw_v_out) = mpsc::channel(); let (raw_a_in, raw_a_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel(); - let err_in_a = err_in_v.clone(); + let err_in_a = err_in_v.clone(); // double for 2 threads let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?; thread::spawn(move || { cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v); diff --git a/src/demux/flv.rs b/src/demux/flv.rs index 3a03b6d..3a374a9 100644 --- a/src/demux/flv.rs +++ b/src/demux/flv.rs @@ -30,6 +30,8 @@ pub struct FLVReader { impl FLVReader { fn check_signature(&mut self) -> Result<(), Box> { + // check for "LV" for FLV header + // "F" already used in spawning, header[..4] always bytes "F" "L" "V" 0x1 let mut sig_bytes_rem = [0u8; 3]; self.stdin.read_exact(&mut sig_bytes_rem)?; if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 { @@ -39,6 +41,7 @@ impl FLVReader { } fn skip_init_header(&mut self) -> Result<(), Box> { + // skip rest of header. Header is (FLV1 + 1byte + 4byte offset + 4byte reserved) let mut rest_of_header = [0u8; 5]; self.stdin.read_exact(&mut rest_of_header)?; let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4; @@ -48,13 +51,15 @@ impl FLVReader { } fn read_packet(&mut self) -> Result<(FLVTagType, Vec), Box> { + // FLV packet header always 11 bytes let mut tag_head = [0u8; 11]; match self.stdin.read_exact(&mut tag_head) { Ok(_) => (), - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), // assumes EOF while reading header means 0 bytes were read, not just less than len Err(err) => return Err(Box::new(err)) }; let tag_type = FLVTagType::try_from(tag_head[0])?; + // len is u24 not u32, u24 does not exist let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize; let mut packet_data = vec![0u8; tag_data_len]; self.stdin.read_exact(packet_data.as_mut_slice())?; @@ -85,6 +90,7 @@ impl input::FileReader for FLVReader { fn read_nalu(&mut self, metadata: &util::Metadata) -> Result> { let (packet_type, mut data) = self.read_packet()?; return match packet_type { + // add headers to raw bytestreams as needed FLVTagType::Audio => Ok(util::NALUPacket { packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()), packet_data: nalu::prepend_a(data, &metadata), @@ -106,8 +112,10 @@ pub fn new_reader(stdin: io::Stdin) -> Result> { fn process_metadata(mut data: Vec) -> Result> { let mut metadata = util::Metadata { ..Default::default() }; - data.drain(..13); + data.drain(..13); // skip until ECMA array, hacky, but seems fixed length and I'm not + // implementing a proper amf parser for 1 single tag that seems to be fixed length offset let ecma_sync_byte = data.drain(..1).next().unwrap(); + // AMF ECMA array object marker if ecma_sync_byte != 8 { return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail)); } @@ -115,8 +123,12 @@ fn process_metadata(mut data: Vec) -> Result> for _ in 0..arr_len { let key = String::fetch_amf_val(&mut data); let amf_marker = u8::fetch_amf_val(&mut data); - if !handle_relevant_keys(key, &mut data, &mut metadata)? { + if !handle_relevant_keys(key, &mut data, &mut metadata)? { // most are completely useless + // or unused (levels), if they + // are, get the value match amf_marker { + // added functions to base types, see below. + // would try generics, but restricting types is weird 0 => {let _ = f64::fetch_amf_val(&mut data);}, 1 => {let _ = bool::fetch_amf_val(&mut data);}, 2 => {let _ = String::fetch_amf_val(&mut data);}, @@ -124,11 +136,13 @@ fn process_metadata(mut data: Vec) -> Result> } } } + // check if all needed values are set return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))}; } fn handle_relevant_keys(key: String, data: &mut Vec, metadata: &mut util::Metadata) -> Result> { match key.as_str() { + // fetches relevant values, should try to use above instead, but codecs complicate things "width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)}, "height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)}, "framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)}, diff --git a/src/demux/input.rs b/src/demux/input.rs index 66644b3..46fae63 100644 --- a/src/demux/input.rs +++ b/src/demux/input.rs @@ -15,18 +15,20 @@ pub trait FileReader { let nalu = match self.read_nalu(&metadata) { Ok(x) => x, Err(err) => { + // pause loop on EOF if let Some(e) = err.downcast_ref::() { match e { util::DemuxerError::EOF => { + // drop to trigger RecvError on decode module to end drop(v_in); drop(a_in); - thread::park(); + thread::park(); // prevent dropping err_out to keep main running return }, _ => () } }; - util::thread_freeze(err_in, err); + util::thread_freeze(err_in, err); // pause thread, trigger main to exit return } }; @@ -50,7 +52,7 @@ pub trait FileReader { pub fn new_reader() -> Result> { let mut stdin_hndl = io::stdin(); - let mut byte1 = [0u8; 1]; + let mut byte1 = [0u8; 1]; // sync byte to determine filetype, limited checking capability stdin_hndl.read_exact(&mut byte1)?; match &byte1[0] { 0x46 => flv::new_reader(stdin_hndl), diff --git a/src/demux/mod.rs b/src/demux/mod.rs index 5240f7b..8bbadf5 100644 --- a/src/demux/mod.rs +++ b/src/demux/mod.rs @@ -1,6 +1,6 @@ mod input; mod flv; -mod nalu_flv; +mod nalu_flv; // probably should organize into an flv submodule instead, but eh use std::sync::mpsc; use std::error::Error; @@ -20,7 +20,7 @@ pub fn spawn() -> Result< Box > { let mut reader = input::new_reader()?; - let metadata = Arc::new(reader.init()?); + let metadata = Arc::new(reader.init()?); // arc to share across modules let (v_in, v_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel(); let (err_in, err_out) = mpsc::channel(); diff --git a/src/demux/nalu_flv.rs b/src/demux/nalu_flv.rs index 8a00c84..be09fad 100644 --- a/src/demux/nalu_flv.rs +++ b/src/demux/nalu_flv.rs @@ -29,25 +29,28 @@ fn parse_avcc(mut data: Vec) -> (Vec, Vec) { } fn preprocess_h264(mut data: Vec) -> Vec { - let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; + let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; // base nalu delimiter for H264 match data[1] { 0 => { - data.drain(..5); + // sequence header contains avcC as in ISOBMF binding, + // contains sps and pps needed for bytestream + // must be segmented for NALU format + data.drain(..5); // composition time info, etc, see FLV spec let (mut sps, mut pps) = parse_avcc(data); nalu.append(&mut sps); nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); nalu.append(&mut pps); return nalu; }, - 2 => {return vec![0u8; 0];}, + 2 => {return vec![0u8; 0];}, // end seq, return null 1 => { data.drain(..5); - loop { + loop { // may be multiple NALU packets per tag let mut buff = [0u8; 4]; buff.copy_from_slice(data.drain(..4).as_slice()); let nalu_len = u32::from_be_bytes(buff) as usize; nalu.extend_from_slice(data.drain(..nalu_len).as_slice()); - if data.len() <= 4 {return nalu} + if data.len() <= 4 {return nalu} // at end of packet nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); } }, @@ -56,6 +59,7 @@ fn preprocess_h264(mut data: Vec) -> Vec { } fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec { + // see multimedia wiki for details let aot_idx = 1u8; let frame_len = (data_len + 7) as u16; let samplerate_idx: u8 = match metadata.audio.samplerate { @@ -75,7 +79,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec { _ => {panic!("invalid samplerate")} }; let chan_conf_idx = metadata.audio.channels; - let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; + let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; // sync bytes and guaranteed + // value set head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2); head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8); head[4] = ((frame_len >> 3) & 0xff) as u8; @@ -86,6 +91,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec { fn preprocess_aac(mut data: Vec, metadata: &util::Metadata) -> Vec { match data[1] { 0 => { + // bytestream needs no codec config, all information already in metadata, also would + // not be persistent across packets return vec![0u8; 0]; }, 1 => { diff --git a/src/encode/cmds.rs b/src/encode/cmds.rs index 60df077..9caca48 100644 --- a/src/encode/cmds.rs +++ b/src/encode/cmds.rs @@ -6,6 +6,7 @@ use crate::util; use crate::encode::codecs::Encoder; use crate::encode::codecs; +// spawn encoder for a given output encoding pub fn spawn(metadata: Arc, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box> { let v = match v_target { util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?, diff --git a/src/encode/codecs/audio.rs b/src/encode/codecs/audio.rs index b833681..db4b689 100644 --- a/src/encode/codecs/audio.rs +++ b/src/encode/codecs/audio.rs @@ -18,19 +18,21 @@ impl Encoder for OpusEncoder { } fn preprocess_stream(&mut self) -> Result<(), Box> { + // opus outputs Ogg format, can be parsed with the same parser as the rest return Ok(()); } fn read_nalu(&mut self) -> Result> { let mut stdout = self.cmd.stdout.as_mut().unwrap(); - let mut buff = [0u8; 27]; + let mut buff = [0u8; 27]; // each packet has a 27 byte header, mostly irrelevant match stdout.read_exact(&mut buff) { Ok(_) => (), - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume EOF at header means EOF proper Err(err) => return Err(Box::new(err)) }; let num_segments = buff[buff.len() - 1] as usize; - let mut segment_table_buff = vec![0u8; num_segments]; + let mut segment_table_buff = vec![0u8; num_segments]; // dictates actual size of data as + // raw sum of u8 stdout.read_exact(&mut segment_table_buff)?; let mut page_size:usize = 0; for i in segment_table_buff { @@ -45,6 +47,7 @@ impl Encoder for OpusEncoder { pub fn new_opus(metadata: Arc) -> Result> { let channels = metadata.audio.channels.to_string(); let samplerate = metadata.audio.samplerate.to_string(); + // opus enc for PCM LE 16 -> Opus let cmd = Command::new("opusenc") .args([ "--quiet", @@ -52,7 +55,7 @@ pub fn new_opus(metadata: Arc) -> Result ChildStdin; + // read from encoder fn read_nalu(&mut self) -> Result>; + // for specific encoder peculiarities + // svtav1 gives DKIF format, which has its own extra header which breaks just parsing as is + // opus outputs Ogg which needs full and header follows the same format as the rest for packets + // use as needed fn preprocess_stream(&mut self) -> Result<(), Box>; fn write_nalu_loop(&mut self, c_out: mpsc::Sender, c_err: mpsc::Sender>) { @@ -24,7 +29,9 @@ pub trait Encoder { loop { let nalu = match self.read_nalu() { Ok(x) => x, - Err(err) => {drop(c_out); thread::park(); return} + Err(err) => {drop(c_out); thread::park(); return} // drop c_out to signal EOF to + // muxer, park to keep c_err + // allocated }; match c_out.send(nalu) { Ok(_) => (), @@ -34,6 +41,7 @@ pub trait Encoder { } } +// write to encoder fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box> { cmd_in.write_all(media.sample.as_slice())?; Ok(()) @@ -43,11 +51,15 @@ pub fn read_raw_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver x, - Err(err) => {drop(cmd_in); thread::park(); return} + Err(err) => {drop(cmd_in); thread::park(); return} // drop stdin to signal EOF, which + // lets encoder close stdout as + // needed, park to keep c_err + // allocated }; match write_raw(&mut cmd_in, media) { Ok(_) => (), - Err(err) => {util::thread_freeze(c_err, err); return} + Err(err) => {util::thread_freeze(c_err, err); return} // park thread, wait for main to + // exit } } } diff --git a/src/encode/codecs/video.rs b/src/encode/codecs/video.rs index cdcf0f9..92393ca 100644 --- a/src/encode/codecs/video.rs +++ b/src/encode/codecs/video.rs @@ -17,7 +17,9 @@ impl Encoder for AV1Encoder { } fn preprocess_stream(&mut self) -> Result<(), Box> { + // SVTAV1 outputs Duck IVF framing, check specs if needed let mut stdout = self.cmd.stdout.as_mut().unwrap(); + // remove dkif header let mut buff = [0u8; 8]; stdout.read_exact(&mut buff)?; let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize; @@ -28,16 +30,18 @@ impl Encoder for AV1Encoder { fn read_nalu(&mut self) -> Result> { let mut stdout = self.cmd.stdout.as_mut().unwrap(); - let mut buff = [0u8; 12]; + let mut buff = [0u8; 12]; // frame always 12 byte header match stdout.read_exact(&mut buff) { Ok(_) => (), - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume eof if header read fails Err(err) => return Err(Box::new(err)) } - let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; + let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; // actual sample + // size in frame + // header let mut data = vec![0u8; nalu_len]; stdout.read_exact(data.as_mut_slice())?; - data.drain(..2); + data.drain(..2); // skip temporal delimiter packet (2 bytes, always first of each dkif frame) return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data}); } } @@ -46,15 +50,16 @@ pub fn new_av1(metadata: Arc) -> Result AV1, piped input output let cmd = Command::new("SvtAv1EncApp") .args([ - "--errlog", "/dev/null", + "--errlog", "/dev/null", // dump errors "--progress", "0", "-w", width.as_str(), "-h", height.as_str(), "--fps-num", fps_num.as_str(), - "--preset", "12", - "--keyint", "30", + "--preset", "12", // super fast + "--keyint", "30", // keyframe every 30 frames "-i", "-", "-b", "-" ]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; diff --git a/src/encode/mod.rs b/src/encode/mod.rs index d7eaee9..20dafe8 100644 --- a/src/encode/mod.rs +++ b/src/encode/mod.rs @@ -24,7 +24,7 @@ pub fn spawn( let (v_in, v_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel(); - let err_in_a = err_in_v.clone(); + let err_in_a = err_in_v.clone(); // double for each thread let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?; thread::spawn(move || { cmds::handle_encoder(v_encoder, v, v_in, err_in_v); diff --git a/src/main.rs b/src/main.rs index 520d155..642b1bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,21 +9,28 @@ use std::sync::mpsc; use std::thread; use std::time::Duration; -fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver>; 4], Box>{ +// spawns threads for each subtask, linking input/output channels +// returns an array of error receivers to exit if needed +// passes input metadata around for video and audio metadata, makes scaling and resampling +// impossible +fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver>; 4], Box> { let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?; let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?; + // output codecs hardcoded, probably want to pass in via cmd args instead let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?; let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), args)?; return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]); } +// post init loop. Modules are initialized without error, loop and check for errors fn process(error_handles: [mpsc::Receiver>; 4]) -> Result<(), Box> { - let (mut v_eof, mut a_eof) = (false, false); + let (mut v_eof, mut a_eof) = (false, false); // proper EOF handling loop { - if v_eof && a_eof {return Ok(())} + if v_eof && a_eof {return Ok(())} // if both video and audio streams have ended for i in 0..4 { match error_handles[i].try_recv() { Ok(err) => { + // check if EOF error found. if let Some(eof_error) = err.downcast_ref::() { match eof_error { util::MuxerError::VideoEOF => {v_eof = true;}, @@ -34,17 +41,22 @@ fn process(error_handles: [mpsc::Receiver (), + Err(mpsc::TryRecvError::Empty) => (), // do nothing since nothing was sent and + // thread still active Err(err) => return Err(Box::new(util::ThreadError(i))), } } - thread::sleep(Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); // sleep so isn't running constantly for no + // reason } } + +// gets args +// bad, replace with clap impl if meaning to actually use fn parse_args() -> util::HLSArgs { let mut args = std::env::args(); - args.next(); + args.next(); // skip prog name util::HLSArgs { segment_time: args.next().expect("no time interval").parse().expect("not int"), segment_prepend: args.next().expect("no prepend"), diff --git a/src/muxer/hls/handler.rs b/src/muxer/hls/handler.rs index 1b1d185..bf55ad1 100644 --- a/src/muxer/hls/handler.rs +++ b/src/muxer/hls/handler.rs @@ -26,20 +26,22 @@ impl HLSHandler { match self.muxer.get_segment(self.curr_segment_idx, self.args.segment_time) { None => (), Some(segment) => { - let del_list = self.new_segment(segment); + let del_list = self.new_segment(segment); // making a new segment should push + // out old segments self.delete_files(del_list); self.dump_playlist(); self.curr_segment_idx += 1; } } - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(time::Duration::from_millis(100)); // don't need to have it constantly + // running, } } fn new_segment(&mut self, segment: segments::Segment) -> Vec { - segment.dump(); - self.curr_segments.push(segment.filename); - let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; + segment.dump(); // write out the segment, + self.curr_segments.push(segment.filename); // add its filename to the curr playlist, + let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; // and remove segement filenames that are old self.curr_segments.drain(..to_drain).collect() } @@ -47,18 +49,23 @@ impl HLSHandler { fn delete_files(&mut self, mut del_list: Vec) { for filename in &del_list { _ = fs::remove_file(filename); - self.curr_media_seq_idx += 1; + self.curr_media_seq_idx += 1; // media sequence must increment per deletion as per HLS + // spec } } fn dump_playlist(&self) { let mut segment_listing = String::new(); for segment_filename in &self.curr_segments { + // each segment has its duration and access URI, should also have Datetime, but neither + // really needed nor "simple" with std rust, would need chrono or time or be ready for + // headaches segment_listing += format!("\ #EXTINF:{},\n\ {}\n\ ", self.args.segment_time, (self.args.segment_prepend.clone() + segment_filename)).as_str(); } + // full playlist string, see HLS spec for details let playlist_string = format!("\ #EXTM3U\n\ #EXT-X-VERSION:7\n\ diff --git a/src/muxer/hls/mp4/atoms.rs b/src/muxer/hls/mp4/atoms.rs index 168c190..aa154a3 100644 --- a/src/muxer/hls/mp4/atoms.rs +++ b/src/muxer/hls/mp4/atoms.rs @@ -1,3 +1,8 @@ +// cursed abomination, not well thought out, but works fine, rust also complicates inheritance +// every necessary MP4 atom to create readable files, +// lookup the corresponding item in the ISO doc or the quicktime spec (for everything not under +// moof), because I am not doing all of that here. + use std::sync::Arc; use std::io::Write; use std::fs::File; @@ -921,6 +926,7 @@ pub fn construct_moov(stsd_v: STSD, stsd_a: STSD, metadata: &Arc } } +// video will always be track 1, audio in trak 2 fn new_trak(stsd: STSD, metadata: &Arc) -> TRAK { let mut tkhd = TKHD {..Default::default() }; match stsd.entry { @@ -1192,6 +1198,11 @@ impl MP4Atom for MDAT { } pub fn new_moof(v_samples: &Vec, default_dur_v: u32, a_samples: &Vec, default_dur_a: u32, mdat_v_offset: usize, mdat_a_offset: usize, moof_idx: usize) -> MOOF { + // hacky fix, since sticking to a standard format and defaults for segments, only trun is + // variable which has a predictable size regardless, moof offset + v and a trun sample table + // len and mdat header offset, will break using version 1 atoms for time, + // alternative is to somehow record idx for base offset (which breaks MP4atoms impl) to modify + // later, or calc full length properly, also complicated let mdat_data_start = 152 + v_samples.len() * 8 + a_samples.len() * 8 + 8; MOOF { mfhd: MFHD { seq_num: moof_idx as u32, ..Default::default() }, @@ -1241,6 +1252,7 @@ fn make_sample_table(samples: &Vec) -> Vec { return table; } +// takes content, applies MP4 atom encapsulation fn make_box(mut content: Vec, box_name: [u8; 4]) -> Vec { let mut head = vec![0u8; 8]; head.splice(..4, ((content.len() + 8) as u32).to_be_bytes()); diff --git a/src/muxer/hls/mp4/mod.rs b/src/muxer/hls/mp4/mod.rs index d7ab478..6d2f5b2 100644 --- a/src/muxer/hls/mp4/mod.rs +++ b/src/muxer/hls/mp4/mod.rs @@ -8,6 +8,7 @@ use std::error::Error; use crate::util; pub struct MP4Muxer { + // option to allow being taken by threads v: Option>, pub v_samples: Arc, a: Option>, @@ -22,15 +23,22 @@ pub fn new_muxer(v: mpsc::Receiver, a: mpsc::Receiver> for multithread write access v_samples: Arc::new(samples::SampleQueue { queue: Mutex::new(Vec::new()), - default_duration: 1000, + default_duration: 1000, // timescale as fps * 1000, each sample then always 1000 + // regardless of fps samples_per_sec: metadata.video.framerate as usize, }), a_samples: Arc::new(samples::SampleQueue { queue: Mutex::new(Vec::new()), - default_duration: metadata.audio.samplerate * 20 / 1000, - samples_per_sec: 50, + default_duration: metadata.audio.samplerate * 20 / 1000, // timescale set as + // samplerate, duration per + // sample is sample duration + // (20ms packet) * samples per + // time (samplerate) + samples_per_sec: 50, // opus default is 20ms packets (1000/20 = 50), would need to change this if + // changing that }), }; muxer.gen_init()?; diff --git a/src/muxer/hls/mp4/mp4muxer.rs b/src/muxer/hls/mp4/mp4muxer.rs index ca7b3ba..86a2876 100644 --- a/src/muxer/hls/mp4/mp4muxer.rs +++ b/src/muxer/hls/mp4/mp4muxer.rs @@ -11,12 +11,17 @@ impl mp4::MP4Muxer { pub fn gen_init(&mut self) -> Result<(), Box> { let stsd_v = self.handle_v_cc()?; let stsd_a = self.handle_a_cc()?; + // only need moov for codec declarations, ftyp is uneeded apparently let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata); - mp4::atoms::dump_init_tree(init_tree)?; + mp4::atoms::dump_init_tree(init_tree)?; // write out moov in init.mp4 segment Ok(()) } fn handle_v_cc(&mut self) -> Result> { + // handles getting stsd per sample, parsing codec config mostly + // and peculiarities per codec + // AV1 has the first nalu as is in the samples + // also contains info for codec config let v_cc = self.v.as_ref().expect("must be owned at init").recv()?; return match v_cc.packet_type { util::NALUPacketType::Video(util::VideoCodec::AV1) => { @@ -28,6 +33,10 @@ impl mp4::MP4Muxer { } fn handle_a_cc(&mut self) -> Result> { + // handles getting stsd per sample, parsing codec config mostly + // and peculiarities per codec + // Ogg (Opus) first packet has codec config info, not needed in samples + // Second packet has no discernable use for ISOBMF? Also not needed in samples let a_cc = self.a.as_ref().expect("must be owned at init").recv()?; return match a_cc.packet_type { util::NALUPacketType::Audio(util::AudioCodec::OPUS) => { @@ -48,8 +57,15 @@ impl mp4::MP4Muxer { let mut i = 1; loop { match v.recv() { + // hacky fix for video keyframe sample flags + // set at 30 frames per keyframe, would need some way to send the same value to + // the encoder as here or attempt to parse the actual OBU (even the header + // seems cursed, just use a lib instead) + // no clue how those flags are parsed, but they work (0x02 is keyframe, 0x0101 + // is interframe) Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});}, Err(_) => { + // send EOF to main for proper shutdown util::thread_freeze(err_in1, Box::new(util::MuxerError::VideoEOF)); return } @@ -61,8 +77,11 @@ impl mp4::MP4Muxer { thread::spawn(move || { loop { match a.recv() { + // audio should be all keyframes, should use the tfhd default var, but + // more work to decouple video and audio impls Ok(x) => {a_queue.push(x.packet_data, 0x02000000);}, Err(_) => { + // send EOF for proper shutdown util::thread_freeze(err_in2, Box::new(util::MuxerError::AudioEOF)); return } @@ -72,6 +91,9 @@ impl mp4::MP4Muxer { } pub fn get_segment(&mut self, idx: usize, time: usize) -> Option { + // if sample queues are long enough to extract a segment, do so, otherwise return None + // probably should not be an option, instead loop here until ready to allow flushing at + // exit if self.v_samples.segment_ready(time) && self.a_samples.segment_ready(time) { return Some(segments::Segment { filename: idx.to_string() + ".m4s", @@ -100,7 +122,7 @@ fn get_av1_stsd(mut sample: Vec, metadata: &Arc) -> mp4::ato dref_idx: 1, width: metadata.video.width as u16, height: metadata.video.height as u16, - resolution: (72u64 << 48) + (72u64 << 16), + resolution: (72u64 << 48) + (72u64 << 16), // 72 dpi, fixed point numbers (72.00 + 72.00) sample_per_frame: 1, encoder_name: String::from("libsvtav1"), pixel_depth: 24, @@ -115,6 +137,7 @@ fn get_av1_stsd(mut sample: Vec, metadata: &Arc) -> mp4::ato } fn get_opus_stsd(sample: Vec, metadata: &Arc) -> mp4::atoms::STSD { + // see opus encapsulation in ISOBMF for details let mut opus_binding = vec![0u8; 11]; opus_binding[1] = metadata.audio.channels; let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap()); @@ -132,7 +155,7 @@ fn get_opus_stsd(sample: Vec, metadata: &Arc) -> mp4::atoms: dref_idx: 1, channels: metadata.audio.channels as u16, sample_size: 16, - sample_rate: metadata.audio.samplerate << 16, + sample_rate: metadata.audio.samplerate << 16, // fixed point (u16 int + u16 decimals) codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding} }; return mp4::atoms::STSD { diff --git a/src/muxer/hls/mp4/samples.rs b/src/muxer/hls/mp4/samples.rs index d22f386..25e0e30 100644 --- a/src/muxer/hls/mp4/samples.rs +++ b/src/muxer/hls/mp4/samples.rs @@ -2,6 +2,8 @@ use std::sync::{Arc, Mutex}; use crate::muxer::hls::segments; +// forces a common duration, no composition time offset +// flags declared per sample as well as size pub struct Sample { pub data: Vec, pub size: u32, diff --git a/src/muxer/hls/segments.rs b/src/muxer/hls/segments.rs index 233db87..b5ad6e0 100644 --- a/src/muxer/hls/segments.rs +++ b/src/muxer/hls/segments.rs @@ -10,14 +10,16 @@ pub struct Segment { pub filename: String, pub idx: usize, pub segment_video: Vec, - pub sample_duration_v: u32, + pub sample_duration_v: u32, // default interval per sample at designated timescale (same for + // audio instead below) pub segment_audio: Vec, pub sample_duration_a: u32, } impl Segment { pub fn dump(&self) { - let mdat = self.new_mdat(); + let mdat = self.new_mdat(); // sample full byte length needed for moof generation + // increment idx (0-indexed) for moofs (1-indexed) let moof = atoms::new_moof(&self.segment_video, self.sample_duration_v, &self.segment_audio, self.sample_duration_a, 0, mdat.v_samples.len(), self.idx + 1); let mut segment_bytes = moof.marshall(); segment_bytes.append(&mut mdat.marshall()); diff --git a/src/util/mod.rs b/src/util/mod.rs index 2fc5403..deb7177 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,5 @@ +// general use structs, functions and errors go here + use std::error::Error; use std::fmt; use std::thread; @@ -222,11 +224,15 @@ impl fmt::Display for MuxerError { } // funcs +// parks a thread and sends an error over the param channel +// used to pause threads keeping channels open, so that main can receive the actual error and then +// exit, instead of a "RecvError" pub fn thread_freeze(err_sender: mpsc::Sender>, err: Box) { err_sender.send(err).unwrap(); thread::park(); } +// should probably make this a macro instead, eh pub fn blank_vec(n: usize) -> Vec { return vec![0u8; n]; }