This commit is contained in:
Muaz Ahmad 2023-10-26 13:35:54 +05:00
parent f21850d181
commit 889a07516e
22 changed files with 185 additions and 57 deletions

View file

@ -7,6 +7,7 @@ use crate::decode::codecs::Decoder;
use crate::decode::codecs; use crate::decode::codecs;
pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> { pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> {
// create appropriate decoder implementation for input codec
let v = match metadata.video.codec { let v = match metadata.video.codec {
Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?, Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?,
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
@ -19,9 +20,10 @@ pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decode
} }
pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) { pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
// take stdin and stdout for process to read from and write to on different threads
let stdin = decoder.stdin(); let stdin = decoder.stdin();
let stdout = decoder.stdout(); let stdout = decoder.stdout();
let raw_buff_size = decoder.raw_buff_size(); let raw_buff_size = decoder.raw_buff_size(); // determined by input parameters and codec
let raw_sample_type = decoder.media_type(); let raw_sample_type = decoder.media_type();
let c_err2 = c_err.clone(); let c_err2 = c_err.clone();
thread::spawn(move || { thread::spawn(move || {

View file

@ -30,9 +30,10 @@ impl Decoder for AACDecoder {
} }
pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> { pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> {
// faad for AAC -> raw PCM 16-bit little-e, piped
let cmd = Command::new("faad") let cmd = Command::new("faad")
.args(["-f", "2", "-w", "-q", "-"]) .args(["-f", "2", "-w", "-q", "-"])
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; // channels * 2 bytes per sample, * samplerate for bytes per second
return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE}) return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE})
} }

View file

@ -20,11 +20,14 @@ pub trait Decoder {
fn media_type(&self) -> util::RawMediaType; fn media_type(&self) -> util::RawMediaType;
} }
// write to decoder
fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> { fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
cmd_in.write_all(nalu.packet_data.as_slice())?; cmd_in.write_all(nalu.packet_data.as_slice())?; // nalus should already be in bytestream
// format, send as is
Ok(()) Ok(())
} }
// read from decoder
fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> { fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
let mut raw_buff = vec![0u8; raw_buff_size]; let mut raw_buff = vec![0u8; raw_buff_size];
match cmd_out.read_exact(raw_buff.as_mut_slice()) { match cmd_out.read_exact(raw_buff.as_mut_slice()) {
@ -39,11 +42,14 @@ pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::NALUPac
loop { loop {
let nalu = match c_in.recv() { let nalu = match c_in.recv() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(cmd_in); thread::park();return} Err(err) => {drop(cmd_in); thread::park();return} // drop cmd_in to signal EOF to
// decoder, letting the decoder close
// stdout when done
}; };
match write_nalu(&mut cmd_in, nalu) { match write_nalu(&mut cmd_in, nalu) {
Ok(_) => (), Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, err); return} Err(err) => {util::thread_freeze(c_err, err); return} // pause thread, wait for main to
// exit
} }
} }
} }
@ -52,7 +58,7 @@ pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: ut
loop { loop {
let media = match read_raw(&mut cmd_out, buff_size, media_type) { let media = match read_raw(&mut cmd_out, buff_size, media_type) {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(c_out); thread::park(); return} Err(err) => {drop(c_out); thread::park(); return} // drop to signal EOF to encoder
}; };
match c_out.send(media) { match c_out.send(media) {
Ok(_) => (), Ok(_) => (),

View file

@ -30,9 +30,12 @@ impl Decoder for H264Decoder {
} }
pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> { pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> {
// ffmpeg pipe input output, H264 -> raw YUV420P sequence
let cmd = Command::new("ffmpeg") let cmd = Command::new("ffmpeg")
.args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"]) .args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"])
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; // size of single
// yuv is
// 1.5*num_pixels
return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P}); return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P});
} }

View file

@ -22,7 +22,7 @@ pub fn spawn(
let (raw_v_in, raw_v_out) = mpsc::channel(); let (raw_v_in, raw_v_out) = mpsc::channel();
let (raw_a_in, raw_a_out) = mpsc::channel(); let (raw_a_in, raw_a_out) = mpsc::channel();
let (err_in_v, err_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel();
let err_in_a = err_in_v.clone(); let err_in_a = err_in_v.clone(); // double for 2 threads
let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?; let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?;
thread::spawn(move || { thread::spawn(move || {
cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v); cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v);

View file

@ -30,6 +30,8 @@ pub struct FLVReader {
impl FLVReader { impl FLVReader {
fn check_signature(&mut self) -> Result<(), Box<dyn Error>> { fn check_signature(&mut self) -> Result<(), Box<dyn Error>> {
// check for "LV" for FLV header
// "F" already used in spawning, header[..4] always bytes "F" "L" "V" 0x1
let mut sig_bytes_rem = [0u8; 3]; let mut sig_bytes_rem = [0u8; 3];
self.stdin.read_exact(&mut sig_bytes_rem)?; self.stdin.read_exact(&mut sig_bytes_rem)?;
if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 { if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 {
@ -39,6 +41,7 @@ impl FLVReader {
} }
fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> { fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> {
// skip rest of header. Header is (FLV1 + 1byte + 4byte offset + 4byte reserved)
let mut rest_of_header = [0u8; 5]; let mut rest_of_header = [0u8; 5];
self.stdin.read_exact(&mut rest_of_header)?; self.stdin.read_exact(&mut rest_of_header)?;
let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4; let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4;
@ -48,13 +51,15 @@ impl FLVReader {
} }
fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> { fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> {
// FLV packet header always 11 bytes
let mut tag_head = [0u8; 11]; let mut tag_head = [0u8; 11];
match self.stdin.read_exact(&mut tag_head) { match self.stdin.read_exact(&mut tag_head) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), // assumes EOF while reading header means 0 bytes were read, not just less than len
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
}; };
let tag_type = FLVTagType::try_from(tag_head[0])?; let tag_type = FLVTagType::try_from(tag_head[0])?;
// len is u24 not u32, u24 does not exist
let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize; let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize;
let mut packet_data = vec![0u8; tag_data_len]; let mut packet_data = vec![0u8; tag_data_len];
self.stdin.read_exact(packet_data.as_mut_slice())?; self.stdin.read_exact(packet_data.as_mut_slice())?;
@ -85,6 +90,7 @@ impl input::FileReader for FLVReader {
fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let (packet_type, mut data) = self.read_packet()?; let (packet_type, mut data) = self.read_packet()?;
return match packet_type { return match packet_type {
// add headers to raw bytestreams as needed
FLVTagType::Audio => Ok(util::NALUPacket { FLVTagType::Audio => Ok(util::NALUPacket {
packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()), packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()),
packet_data: nalu::prepend_a(data, &metadata), packet_data: nalu::prepend_a(data, &metadata),
@ -106,8 +112,10 @@ pub fn new_reader(stdin: io::Stdin) -> Result<FLVReader, Box<dyn Error>> {
fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> { fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> {
let mut metadata = util::Metadata { ..Default::default() }; let mut metadata = util::Metadata { ..Default::default() };
data.drain(..13); data.drain(..13); // skip until ECMA array, hacky, but seems fixed length and I'm not
// implementing a proper amf parser for 1 single tag that seems to be fixed length offset
let ecma_sync_byte = data.drain(..1).next().unwrap(); let ecma_sync_byte = data.drain(..1).next().unwrap();
// AMF ECMA array object marker
if ecma_sync_byte != 8 { if ecma_sync_byte != 8 {
return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail)); return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail));
} }
@ -115,8 +123,12 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
for _ in 0..arr_len { for _ in 0..arr_len {
let key = String::fetch_amf_val(&mut data); let key = String::fetch_amf_val(&mut data);
let amf_marker = u8::fetch_amf_val(&mut data); let amf_marker = u8::fetch_amf_val(&mut data);
if !handle_relevant_keys(key, &mut data, &mut metadata)? { if !handle_relevant_keys(key, &mut data, &mut metadata)? { // most are completely useless
// or unused (levels), if they
// are, get the value
match amf_marker { match amf_marker {
// added functions to base types, see below.
// would try generics, but restricting types is weird
0 => {let _ = f64::fetch_amf_val(&mut data);}, 0 => {let _ = f64::fetch_amf_val(&mut data);},
1 => {let _ = bool::fetch_amf_val(&mut data);}, 1 => {let _ = bool::fetch_amf_val(&mut data);},
2 => {let _ = String::fetch_amf_val(&mut data);}, 2 => {let _ = String::fetch_amf_val(&mut data);},
@ -124,11 +136,13 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
} }
} }
} }
// check if all needed values are set
return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))}; return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))};
} }
fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> { fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> {
match key.as_str() { match key.as_str() {
// fetches relevant values, should try to use above instead, but codecs complicate things
"width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)}, "width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)},
"height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)}, "height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)},
"framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)}, "framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)},

View file

@ -15,18 +15,20 @@ pub trait FileReader {
let nalu = match self.read_nalu(&metadata) { let nalu = match self.read_nalu(&metadata) {
Ok(x) => x, Ok(x) => x,
Err(err) => { Err(err) => {
// pause loop on EOF
if let Some(e) = err.downcast_ref::<util::DemuxerError>() { if let Some(e) = err.downcast_ref::<util::DemuxerError>() {
match e { match e {
util::DemuxerError::EOF => { util::DemuxerError::EOF => {
// drop to trigger RecvError on decode module to end
drop(v_in); drop(v_in);
drop(a_in); drop(a_in);
thread::park(); thread::park(); // prevent dropping err_out to keep main running
return return
}, },
_ => () _ => ()
} }
}; };
util::thread_freeze(err_in, err); util::thread_freeze(err_in, err); // pause thread, trigger main to exit
return return
} }
}; };
@ -50,7 +52,7 @@ pub trait FileReader {
pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> { pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> {
let mut stdin_hndl = io::stdin(); let mut stdin_hndl = io::stdin();
let mut byte1 = [0u8; 1]; let mut byte1 = [0u8; 1]; // sync byte to determine filetype, limited checking capability
stdin_hndl.read_exact(&mut byte1)?; stdin_hndl.read_exact(&mut byte1)?;
match &byte1[0] { match &byte1[0] {
0x46 => flv::new_reader(stdin_hndl), 0x46 => flv::new_reader(stdin_hndl),

View file

@ -1,6 +1,6 @@
mod input; mod input;
mod flv; mod flv;
mod nalu_flv; mod nalu_flv; // probably should organize into an flv submodule instead, but eh
use std::sync::mpsc; use std::sync::mpsc;
use std::error::Error; use std::error::Error;
@ -20,7 +20,7 @@ pub fn spawn() -> Result<
Box<dyn Error> Box<dyn Error>
> { > {
let mut reader = input::new_reader()?; let mut reader = input::new_reader()?;
let metadata = Arc::new(reader.init()?); let metadata = Arc::new(reader.init()?); // arc to share across modules
let (v_in, v_out) = mpsc::channel(); let (v_in, v_out) = mpsc::channel();
let (a_in, a_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel();
let (err_in, err_out) = mpsc::channel(); let (err_in, err_out) = mpsc::channel();

View file

@ -29,25 +29,28 @@ fn parse_avcc(mut data: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
} }
fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> { fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; // base nalu delimiter for H264
match data[1] { match data[1] {
0 => { 0 => {
data.drain(..5); // sequence header contains avcC as in ISOBMF binding,
// contains sps and pps needed for bytestream
// must be segmented for NALU format
data.drain(..5); // composition time info, etc, see FLV spec
let (mut sps, mut pps) = parse_avcc(data); let (mut sps, mut pps) = parse_avcc(data);
nalu.append(&mut sps); nalu.append(&mut sps);
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
nalu.append(&mut pps); nalu.append(&mut pps);
return nalu; return nalu;
}, },
2 => {return vec![0u8; 0];}, 2 => {return vec![0u8; 0];}, // end seq, return null
1 => { 1 => {
data.drain(..5); data.drain(..5);
loop { loop { // may be multiple NALU packets per tag
let mut buff = [0u8; 4]; let mut buff = [0u8; 4];
buff.copy_from_slice(data.drain(..4).as_slice()); buff.copy_from_slice(data.drain(..4).as_slice());
let nalu_len = u32::from_be_bytes(buff) as usize; let nalu_len = u32::from_be_bytes(buff) as usize;
nalu.extend_from_slice(data.drain(..nalu_len).as_slice()); nalu.extend_from_slice(data.drain(..nalu_len).as_slice());
if data.len() <= 4 {return nalu} if data.len() <= 4 {return nalu} // at end of packet
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
} }
}, },
@ -56,6 +59,7 @@ fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
} }
fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> { fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
// see multimedia wiki for details
let aot_idx = 1u8; let aot_idx = 1u8;
let frame_len = (data_len + 7) as u16; let frame_len = (data_len + 7) as u16;
let samplerate_idx: u8 = match metadata.audio.samplerate { let samplerate_idx: u8 = match metadata.audio.samplerate {
@ -75,7 +79,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
_ => {panic!("invalid samplerate")} _ => {panic!("invalid samplerate")}
}; };
let chan_conf_idx = metadata.audio.channels; let chan_conf_idx = metadata.audio.channels;
let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; // sync bytes and guaranteed
// value set
head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2); head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2);
head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8); head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8);
head[4] = ((frame_len >> 3) & 0xff) as u8; head[4] = ((frame_len >> 3) & 0xff) as u8;
@ -86,6 +91,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> { fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> {
match data[1] { match data[1] {
0 => { 0 => {
// bytestream needs no codec config, all information already in metadata, also would
// not be persistent across packets
return vec![0u8; 0]; return vec![0u8; 0];
}, },
1 => { 1 => {

View file

@ -6,6 +6,7 @@ use crate::util;
use crate::encode::codecs::Encoder; use crate::encode::codecs::Encoder;
use crate::encode::codecs; use crate::encode::codecs;
// spawn encoder for a given output encoding
pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> { pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> {
let v = match v_target { let v = match v_target {
util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?, util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?,

View file

@ -18,19 +18,21 @@ impl Encoder for OpusEncoder {
} }
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> { fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// opus outputs Ogg format, can be parsed with the same parser as the rest
return Ok(()); return Ok(());
} }
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
let mut buff = [0u8; 27]; let mut buff = [0u8; 27]; // each packet has a 27 byte header, mostly irrelevant
match stdout.read_exact(&mut buff) { match stdout.read_exact(&mut buff) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume EOF at header means EOF proper
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
}; };
let num_segments = buff[buff.len() - 1] as usize; let num_segments = buff[buff.len() - 1] as usize;
let mut segment_table_buff = vec![0u8; num_segments]; let mut segment_table_buff = vec![0u8; num_segments]; // dictates actual size of data as
// raw sum of u8
stdout.read_exact(&mut segment_table_buff)?; stdout.read_exact(&mut segment_table_buff)?;
let mut page_size:usize = 0; let mut page_size:usize = 0;
for i in segment_table_buff { for i in segment_table_buff {
@ -45,6 +47,7 @@ impl Encoder for OpusEncoder {
pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> { pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> {
let channels = metadata.audio.channels.to_string(); let channels = metadata.audio.channels.to_string();
let samplerate = metadata.audio.samplerate.to_string(); let samplerate = metadata.audio.samplerate.to_string();
// opus enc for PCM LE 16 -> Opus
let cmd = Command::new("opusenc") let cmd = Command::new("opusenc")
.args([ .args([
"--quiet", "--quiet",
@ -52,7 +55,7 @@ pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Er
"--raw-bits", "16", "--raw-bits", "16",
"--raw-rate", samplerate.as_str(), "--raw-rate", samplerate.as_str(),
"--raw-chan", channels.as_str(), "--raw-chan", channels.as_str(),
"--max-delay", "1", "--max-delay", "1", // 0 works worse than 1 somehow
"-", "-" "-", "-"
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; ]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
return Ok(OpusEncoder {cmd: cmd}); return Ok(OpusEncoder {cmd: cmd});

View file

@ -12,8 +12,13 @@ use crate::util;
pub trait Encoder { pub trait Encoder {
fn stdin(&mut self) -> ChildStdin; fn stdin(&mut self) -> ChildStdin;
// read from encoder
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>; fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>;
// for specific encoder peculiarities
// svtav1 gives DKIF format, which has its own extra header which breaks just parsing as is
// opus outputs Ogg which needs full and header follows the same format as the rest for packets
// use as needed
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>; fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) { fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
@ -24,7 +29,9 @@ pub trait Encoder {
loop { loop {
let nalu = match self.read_nalu() { let nalu = match self.read_nalu() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(c_out); thread::park(); return} Err(err) => {drop(c_out); thread::park(); return} // drop c_out to signal EOF to
// muxer, park to keep c_err
// allocated
}; };
match c_out.send(nalu) { match c_out.send(nalu) {
Ok(_) => (), Ok(_) => (),
@ -34,6 +41,7 @@ pub trait Encoder {
} }
} }
// write to encoder
fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> { fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> {
cmd_in.write_all(media.sample.as_slice())?; cmd_in.write_all(media.sample.as_slice())?;
Ok(()) Ok(())
@ -43,11 +51,15 @@ pub fn read_raw_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::RawMedia
loop { loop {
let media = match c_in.recv() { let media = match c_in.recv() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(cmd_in); thread::park(); return} Err(err) => {drop(cmd_in); thread::park(); return} // drop stdin to signal EOF, which
// lets encoder close stdout as
// needed, park to keep c_err
// allocated
}; };
match write_raw(&mut cmd_in, media) { match write_raw(&mut cmd_in, media) {
Ok(_) => (), Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, err); return} Err(err) => {util::thread_freeze(c_err, err); return} // park thread, wait for main to
// exit
} }
} }
} }

View file

@ -17,7 +17,9 @@ impl Encoder for AV1Encoder {
} }
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> { fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// SVTAV1 outputs Duck IVF framing, check specs if needed
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
// remove dkif header
let mut buff = [0u8; 8]; let mut buff = [0u8; 8];
stdout.read_exact(&mut buff)?; stdout.read_exact(&mut buff)?;
let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize; let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize;
@ -28,16 +30,18 @@ impl Encoder for AV1Encoder {
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
let mut buff = [0u8; 12]; let mut buff = [0u8; 12]; // frame always 12 byte header
match stdout.read_exact(&mut buff) { match stdout.read_exact(&mut buff) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume eof if header read fails
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
} }
let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; // actual sample
// size in frame
// header
let mut data = vec![0u8; nalu_len]; let mut data = vec![0u8; nalu_len];
stdout.read_exact(data.as_mut_slice())?; stdout.read_exact(data.as_mut_slice())?;
data.drain(..2); data.drain(..2); // skip temporal delimiter packet (2 bytes, always first of each dkif frame)
return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data}); return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data});
} }
} }
@ -46,15 +50,16 @@ pub fn new_av1(metadata: Arc<util::Metadata>) -> Result<AV1Encoder, Box<dyn Erro
let width = metadata.video.width.to_string(); let width = metadata.video.width.to_string();
let height = metadata.video.height.to_string(); let height = metadata.video.height.to_string();
let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string(); let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string();
// SVTAV1 for YUV420 -> AV1, piped input output
let cmd = Command::new("SvtAv1EncApp") let cmd = Command::new("SvtAv1EncApp")
.args([ .args([
"--errlog", "/dev/null", "--errlog", "/dev/null", // dump errors
"--progress", "0", "--progress", "0",
"-w", width.as_str(), "-w", width.as_str(),
"-h", height.as_str(), "-h", height.as_str(),
"--fps-num", fps_num.as_str(), "--fps-num", fps_num.as_str(),
"--preset", "12", "--preset", "12", // super fast
"--keyint", "30", "--keyint", "30", // keyframe every 30 frames
"-i", "-", "-i", "-",
"-b", "-" "-b", "-"
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; ]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;

View file

@ -24,7 +24,7 @@ pub fn spawn(
let (v_in, v_out) = mpsc::channel(); let (v_in, v_out) = mpsc::channel();
let (a_in, a_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel();
let (err_in_v, err_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel();
let err_in_a = err_in_v.clone(); let err_in_a = err_in_v.clone(); // double for each thread
let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?; let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?;
thread::spawn(move || { thread::spawn(move || {
cmds::handle_encoder(v_encoder, v, v_in, err_in_v); cmds::handle_encoder(v_encoder, v, v_in, err_in_v);

View file

@ -9,21 +9,28 @@ use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
// spawns threads for each subtask, linking input/output channels
// returns an array of error receivers to exit if needed
// passes input metadata around for video and audio metadata, makes scaling and resampling
// impossible
fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>> { fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>> {
let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?; let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?;
let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?; let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?;
// output codecs hardcoded, probably want to pass in via cmd args instead
let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?; let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?;
let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), args)?; let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), args)?;
return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]); return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]);
} }
// post init loop. Modules are initialized without error, loop and check for errors
fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> { fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> {
let (mut v_eof, mut a_eof) = (false, false); let (mut v_eof, mut a_eof) = (false, false); // proper EOF handling
loop { loop {
if v_eof && a_eof {return Ok(())} if v_eof && a_eof {return Ok(())} // if both video and audio streams have ended
for i in 0..4 { for i in 0..4 {
match error_handles[i].try_recv() { match error_handles[i].try_recv() {
Ok(err) => { Ok(err) => {
// check if EOF error found.
if let Some(eof_error) = err.downcast_ref::<util::MuxerError>() { if let Some(eof_error) = err.downcast_ref::<util::MuxerError>() {
match eof_error { match eof_error {
util::MuxerError::VideoEOF => {v_eof = true;}, util::MuxerError::VideoEOF => {v_eof = true;},
@ -34,17 +41,22 @@ fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Syn
return Err(err) return Err(err)
} }
}, },
Err(mpsc::TryRecvError::Empty) => (), Err(mpsc::TryRecvError::Empty) => (), // do nothing since nothing was sent and
// thread still active
Err(err) => return Err(Box::new(util::ThreadError(i))), Err(err) => return Err(Box::new(util::ThreadError(i))),
} }
} }
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100)); // sleep so isn't running constantly for no
// reason
} }
} }
// gets args
// bad, replace with clap impl if meaning to actually use
fn parse_args() -> util::HLSArgs { fn parse_args() -> util::HLSArgs {
let mut args = std::env::args(); let mut args = std::env::args();
args.next(); args.next(); // skip prog name
util::HLSArgs { util::HLSArgs {
segment_time: args.next().expect("no time interval").parse().expect("not int"), segment_time: args.next().expect("no time interval").parse().expect("not int"),
segment_prepend: args.next().expect("no prepend"), segment_prepend: args.next().expect("no prepend"),

View file

@ -26,20 +26,22 @@ impl HLSHandler {
match self.muxer.get_segment(self.curr_segment_idx, self.args.segment_time) { match self.muxer.get_segment(self.curr_segment_idx, self.args.segment_time) {
None => (), None => (),
Some(segment) => { Some(segment) => {
let del_list = self.new_segment(segment); let del_list = self.new_segment(segment); // making a new segment should push
// out old segments
self.delete_files(del_list); self.delete_files(del_list);
self.dump_playlist(); self.dump_playlist();
self.curr_segment_idx += 1; self.curr_segment_idx += 1;
} }
} }
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100)); // don't need to have it constantly
// running,
} }
} }
fn new_segment(&mut self, segment: segments::Segment) -> Vec<String> { fn new_segment(&mut self, segment: segments::Segment) -> Vec<String> {
segment.dump(); segment.dump(); // write out the segment,
self.curr_segments.push(segment.filename); self.curr_segments.push(segment.filename); // add its filename to the curr playlist,
let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; // and remove segement filenames that are old
self.curr_segments.drain(..to_drain).collect() self.curr_segments.drain(..to_drain).collect()
} }
@ -47,18 +49,23 @@ impl HLSHandler {
fn delete_files(&mut self, mut del_list: Vec<String>) { fn delete_files(&mut self, mut del_list: Vec<String>) {
for filename in &del_list { for filename in &del_list {
_ = fs::remove_file(filename); _ = fs::remove_file(filename);
self.curr_media_seq_idx += 1; self.curr_media_seq_idx += 1; // media sequence must increment per deletion as per HLS
// spec
} }
} }
fn dump_playlist(&self) { fn dump_playlist(&self) {
let mut segment_listing = String::new(); let mut segment_listing = String::new();
for segment_filename in &self.curr_segments { for segment_filename in &self.curr_segments {
// each segment has its duration and access URI, should also have Datetime, but neither
// really needed nor "simple" with std rust, would need chrono or time or be ready for
// headaches
segment_listing += format!("\ segment_listing += format!("\
#EXTINF:{},\n\ #EXTINF:{},\n\
{}\n\ {}\n\
", self.args.segment_time, (self.args.segment_prepend.clone() + segment_filename)).as_str(); ", self.args.segment_time, (self.args.segment_prepend.clone() + segment_filename)).as_str();
} }
// full playlist string, see HLS spec for details
let playlist_string = format!("\ let playlist_string = format!("\
#EXTM3U\n\ #EXTM3U\n\
#EXT-X-VERSION:7\n\ #EXT-X-VERSION:7\n\

View file

@ -1,3 +1,8 @@
// cursed abomination, not well thought out, but works fine, rust also complicates inheritance
// every necessary MP4 atom to create readable files,
// lookup the corresponding item in the ISO doc or the quicktime spec (for everything not under
// moof), because I am not doing all of that here.
use std::sync::Arc; use std::sync::Arc;
use std::io::Write; use std::io::Write;
use std::fs::File; use std::fs::File;
@ -921,6 +926,7 @@ pub fn construct_moov(stsd_v: STSD, stsd_a: STSD, metadata: &Arc<util::Metadata>
} }
} }
// video will always be track 1, audio in trak 2
fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK { fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK {
let mut tkhd = TKHD {..Default::default() }; let mut tkhd = TKHD {..Default::default() };
match stsd.entry { match stsd.entry {
@ -1192,6 +1198,11 @@ impl MP4Atom for MDAT {
} }
pub fn new_moof(v_samples: &Vec<samples::Sample>, default_dur_v: u32, a_samples: &Vec<samples::Sample>, default_dur_a: u32, mdat_v_offset: usize, mdat_a_offset: usize, moof_idx: usize) -> MOOF { pub fn new_moof(v_samples: &Vec<samples::Sample>, default_dur_v: u32, a_samples: &Vec<samples::Sample>, default_dur_a: u32, mdat_v_offset: usize, mdat_a_offset: usize, moof_idx: usize) -> MOOF {
// hacky fix, since sticking to a standard format and defaults for segments, only trun is
// variable which has a predictable size regardless, moof offset + v and a trun sample table
// len and mdat header offset, will break using version 1 atoms for time,
// alternative is to somehow record idx for base offset (which breaks MP4atoms impl) to modify
// later, or calc full length properly, also complicated
let mdat_data_start = 152 + v_samples.len() * 8 + a_samples.len() * 8 + 8; let mdat_data_start = 152 + v_samples.len() * 8 + a_samples.len() * 8 + 8;
MOOF { MOOF {
mfhd: MFHD { seq_num: moof_idx as u32, ..Default::default() }, mfhd: MFHD { seq_num: moof_idx as u32, ..Default::default() },
@ -1241,6 +1252,7 @@ fn make_sample_table(samples: &Vec<samples::Sample>) -> Vec<SampleData> {
return table; return table;
} }
// takes content, applies MP4 atom encapsulation
fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> { fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
let mut head = vec![0u8; 8]; let mut head = vec![0u8; 8];
head.splice(..4, ((content.len() + 8) as u32).to_be_bytes()); head.splice(..4, ((content.len() + 8) as u32).to_be_bytes());

View file

@ -8,6 +8,7 @@ use std::error::Error;
use crate::util; use crate::util;
pub struct MP4Muxer { pub struct MP4Muxer {
// option to allow being taken by threads
v: Option<mpsc::Receiver<util::NALUPacket>>, v: Option<mpsc::Receiver<util::NALUPacket>>,
pub v_samples: Arc<samples::SampleQueue>, pub v_samples: Arc<samples::SampleQueue>,
a: Option<mpsc::Receiver<util::NALUPacket>>, a: Option<mpsc::Receiver<util::NALUPacket>>,
@ -22,15 +23,22 @@ pub fn new_muxer(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NA
a: Some(a), a: Some(a),
metadata: metadata.clone(), metadata: metadata.clone(),
err_in: err_in, err_in: err_in,
// Arc<Mutex<Vec>> for multithread write access
v_samples: Arc::new(samples::SampleQueue { v_samples: Arc::new(samples::SampleQueue {
queue: Mutex::new(Vec::new()), queue: Mutex::new(Vec::new()),
default_duration: 1000, default_duration: 1000, // timescale as fps * 1000, each sample then always 1000
// regardless of fps
samples_per_sec: metadata.video.framerate as usize, samples_per_sec: metadata.video.framerate as usize,
}), }),
a_samples: Arc::new(samples::SampleQueue { a_samples: Arc::new(samples::SampleQueue {
queue: Mutex::new(Vec::new()), queue: Mutex::new(Vec::new()),
default_duration: metadata.audio.samplerate * 20 / 1000, default_duration: metadata.audio.samplerate * 20 / 1000, // timescale set as
samples_per_sec: 50, // samplerate, duration per
// sample is sample duration
// (20ms packet) * samples per
// time (samplerate)
samples_per_sec: 50, // opus default is 20ms packets (1000/20 = 50), would need to change this if
// changing that
}), }),
}; };
muxer.gen_init()?; muxer.gen_init()?;

View file

@ -11,12 +11,17 @@ impl mp4::MP4Muxer {
pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> { pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> {
let stsd_v = self.handle_v_cc()?; let stsd_v = self.handle_v_cc()?;
let stsd_a = self.handle_a_cc()?; let stsd_a = self.handle_a_cc()?;
// only need moov for codec declarations, ftyp is uneeded apparently
let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata); let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata);
mp4::atoms::dump_init_tree(init_tree)?; mp4::atoms::dump_init_tree(init_tree)?; // write out moov in init.mp4 segment
Ok(()) Ok(())
} }
fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
// handles getting stsd per sample, parsing codec config mostly
// and peculiarities per codec
// AV1 has the first nalu as is in the samples
// also contains info for codec config
let v_cc = self.v.as_ref().expect("must be owned at init").recv()?; let v_cc = self.v.as_ref().expect("must be owned at init").recv()?;
return match v_cc.packet_type { return match v_cc.packet_type {
util::NALUPacketType::Video(util::VideoCodec::AV1) => { util::NALUPacketType::Video(util::VideoCodec::AV1) => {
@ -28,6 +33,10 @@ impl mp4::MP4Muxer {
} }
fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
// handles getting stsd per sample, parsing codec config mostly
// and peculiarities per codec
// Ogg (Opus) first packet has codec config info, not needed in samples
// Second packet has no discernable use for ISOBMF? Also not needed in samples
let a_cc = self.a.as_ref().expect("must be owned at init").recv()?; let a_cc = self.a.as_ref().expect("must be owned at init").recv()?;
return match a_cc.packet_type { return match a_cc.packet_type {
util::NALUPacketType::Audio(util::AudioCodec::OPUS) => { util::NALUPacketType::Audio(util::AudioCodec::OPUS) => {
@ -48,8 +57,15 @@ impl mp4::MP4Muxer {
let mut i = 1; let mut i = 1;
loop { loop {
match v.recv() { match v.recv() {
// hacky fix for video keyframe sample flags
// set at 30 frames per keyframe, would need some way to send the same value to
// the encoder as here or attempt to parse the actual OBU (even the header
// seems cursed, just use a lib instead)
// no clue how those flags are parsed, but they work (0x02 is keyframe, 0x0101
// is interframe)
Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});}, Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});},
Err(_) => { Err(_) => {
// send EOF to main for proper shutdown
util::thread_freeze(err_in1, Box::new(util::MuxerError::VideoEOF)); util::thread_freeze(err_in1, Box::new(util::MuxerError::VideoEOF));
return return
} }
@ -61,8 +77,11 @@ impl mp4::MP4Muxer {
thread::spawn(move || { thread::spawn(move || {
loop { loop {
match a.recv() { match a.recv() {
// audio should be all keyframes, should use the tfhd default var, but
// more work to decouple video and audio impls
Ok(x) => {a_queue.push(x.packet_data, 0x02000000);}, Ok(x) => {a_queue.push(x.packet_data, 0x02000000);},
Err(_) => { Err(_) => {
// send EOF for proper shutdown
util::thread_freeze(err_in2, Box::new(util::MuxerError::AudioEOF)); util::thread_freeze(err_in2, Box::new(util::MuxerError::AudioEOF));
return return
} }
@ -72,6 +91,9 @@ impl mp4::MP4Muxer {
} }
pub fn get_segment(&mut self, idx: usize, time: usize) -> Option<segments::Segment> { pub fn get_segment(&mut self, idx: usize, time: usize) -> Option<segments::Segment> {
// if sample queues are long enough to extract a segment, do so, otherwise return None
// probably should not be an option, instead loop here until ready to allow flushing at
// exit
if self.v_samples.segment_ready(time) && self.a_samples.segment_ready(time) { if self.v_samples.segment_ready(time) && self.a_samples.segment_ready(time) {
return Some(segments::Segment { return Some(segments::Segment {
filename: idx.to_string() + ".m4s", filename: idx.to_string() + ".m4s",
@ -100,7 +122,7 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
dref_idx: 1, dref_idx: 1,
width: metadata.video.width as u16, width: metadata.video.width as u16,
height: metadata.video.height as u16, height: metadata.video.height as u16,
resolution: (72u64 << 48) + (72u64 << 16), resolution: (72u64 << 48) + (72u64 << 16), // 72 dpi, fixed point numbers (72.00 + 72.00)
sample_per_frame: 1, sample_per_frame: 1,
encoder_name: String::from("libsvtav1"), encoder_name: String::from("libsvtav1"),
pixel_depth: 24, pixel_depth: 24,
@ -115,6 +137,7 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
} }
fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD { fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD {
// see opus encapsulation in ISOBMF for details
let mut opus_binding = vec![0u8; 11]; let mut opus_binding = vec![0u8; 11];
opus_binding[1] = metadata.audio.channels; opus_binding[1] = metadata.audio.channels;
let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap()); let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap());
@ -132,7 +155,7 @@ fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms:
dref_idx: 1, dref_idx: 1,
channels: metadata.audio.channels as u16, channels: metadata.audio.channels as u16,
sample_size: 16, sample_size: 16,
sample_rate: metadata.audio.samplerate << 16, sample_rate: metadata.audio.samplerate << 16, // fixed point (u16 int + u16 decimals)
codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding} codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding}
}; };
return mp4::atoms::STSD { return mp4::atoms::STSD {

View file

@ -2,6 +2,8 @@ use std::sync::{Arc, Mutex};
use crate::muxer::hls::segments; use crate::muxer::hls::segments;
// forces a common duration, no composition time offset
// flags declared per sample as well as size
pub struct Sample { pub struct Sample {
pub data: Vec<u8>, pub data: Vec<u8>,
pub size: u32, pub size: u32,

View file

@ -10,14 +10,16 @@ pub struct Segment {
pub filename: String, pub filename: String,
pub idx: usize, pub idx: usize,
pub segment_video: Vec<samples::Sample>, pub segment_video: Vec<samples::Sample>,
pub sample_duration_v: u32, pub sample_duration_v: u32, // default interval per sample at designated timescale (same for
// audio instead below)
pub segment_audio: Vec<samples::Sample>, pub segment_audio: Vec<samples::Sample>,
pub sample_duration_a: u32, pub sample_duration_a: u32,
} }
impl Segment { impl Segment {
pub fn dump(&self) { pub fn dump(&self) {
let mdat = self.new_mdat(); let mdat = self.new_mdat(); // sample full byte length needed for moof generation
// increment idx (0-indexed) for moofs (1-indexed)
let moof = atoms::new_moof(&self.segment_video, self.sample_duration_v, &self.segment_audio, self.sample_duration_a, 0, mdat.v_samples.len(), self.idx + 1); let moof = atoms::new_moof(&self.segment_video, self.sample_duration_v, &self.segment_audio, self.sample_duration_a, 0, mdat.v_samples.len(), self.idx + 1);
let mut segment_bytes = moof.marshall(); let mut segment_bytes = moof.marshall();
segment_bytes.append(&mut mdat.marshall()); segment_bytes.append(&mut mdat.marshall());

View file

@ -1,3 +1,5 @@
// general use structs, functions and errors go here
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::thread; use std::thread;
@ -222,11 +224,15 @@ impl fmt::Display for MuxerError {
} }
// funcs // funcs
// parks a thread and sends an error over the param channel
// used to pause threads keeping channels open, so that main can receive the actual error and then
// exit, instead of a "RecvError"
pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) { pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) {
err_sender.send(err).unwrap(); err_sender.send(err).unwrap();
thread::park(); thread::park();
} }
// should probably make this a macro instead, eh
pub fn blank_vec(n: usize) -> Vec<u8> { pub fn blank_vec(n: usize) -> Vec<u8> {
return vec![0u8; n]; return vec![0u8; n];
} }