Compare commits

...

10 commits

23 changed files with 545 additions and 81 deletions

15
README.md Normal file
View file

@ -0,0 +1,15 @@
# hls-transcoder-2
Extremely basic hls segmenter for live video written in Rust. 1st was scrapped since I had 0 clue what I was doing at all. Singular clue has since been acquired. Would not recommend using or expanding on this for anything other than figuring out the how and what.
Currently only works with FLV input (so RTMP only), H264 and AAC codec inputs, and outputs AV1 and Opus codecs in a fragmented mp4 playlist. Decoding and encoding happens via 3rd party programs accessible on $PATH. (ffmpeg and faad for decoding, SvtEncAv1App and opusenc for encoding). Would use lib versions, but would only do so for a proper project using extern crates
Uses only uses Rust's stdlib.
**Not intended for actual use**. Will have limitations and some aspects are just not well implemented, also relies on progs being installed for encoding and decoding.
`cargo build` to a bin. Usage is `hls-transcoder-2 <hls_time> <uri_prepend> <hls_list_size>`
* `hls_time`: duration of each segment (usize only currently)
* `uri_prepend`: string to add to each segment and init.mp4, for http server uris
* `hls_list_size`: max number of segments to include in the playlist, older segments will be deleted, (usize)

View file

@ -7,6 +7,7 @@ use crate::decode::codecs::Decoder;
use crate::decode::codecs; use crate::decode::codecs;
pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> { pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> {
// create appropriate decoder implementation for input codec
let v = match metadata.video.codec { let v = match metadata.video.codec {
Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?, Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?,
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));} _ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
@ -19,9 +20,10 @@ pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decode
} }
pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) { pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
// take stdin and stdout for process to read from and write to on different threads
let stdin = decoder.stdin(); let stdin = decoder.stdin();
let stdout = decoder.stdout(); let stdout = decoder.stdout();
let raw_buff_size = decoder.raw_buff_size(); let raw_buff_size = decoder.raw_buff_size(); // determined by input parameters and codec
let raw_sample_type = decoder.media_type(); let raw_sample_type = decoder.media_type();
let c_err2 = c_err.clone(); let c_err2 = c_err.clone();
thread::spawn(move || { thread::spawn(move || {

View file

@ -30,9 +30,10 @@ impl Decoder for AACDecoder {
} }
pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> { pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> {
// faad for AAC -> raw PCM 16-bit little-e, piped
let cmd = Command::new("faad") let cmd = Command::new("faad")
.args(["-f", "2", "-w", "-q", "-"]) .args(["-f", "2", "-w", "-q", "-"])
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; // channels * 2 bytes per sample, * samplerate for bytes per second
return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE}) return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE})
} }

View file

@ -20,11 +20,14 @@ pub trait Decoder {
fn media_type(&self) -> util::RawMediaType; fn media_type(&self) -> util::RawMediaType;
} }
// write to decoder
fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> { fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
cmd_in.write_all(nalu.packet_data.as_slice())?; cmd_in.write_all(nalu.packet_data.as_slice())?; // nalus should already be in bytestream
// format, send as is
Ok(()) Ok(())
} }
// read from decoder
fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> { fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
let mut raw_buff = vec![0u8; raw_buff_size]; let mut raw_buff = vec![0u8; raw_buff_size];
match cmd_out.read_exact(raw_buff.as_mut_slice()) { match cmd_out.read_exact(raw_buff.as_mut_slice()) {
@ -39,11 +42,14 @@ pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::NALUPac
loop { loop {
let nalu = match c_in.recv() { let nalu = match c_in.recv() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(cmd_in); thread::park();return} Err(err) => {drop(cmd_in); thread::park();return} // drop cmd_in to signal EOF to
// decoder, letting the decoder close
// stdout when done
}; };
match write_nalu(&mut cmd_in, nalu) { match write_nalu(&mut cmd_in, nalu) {
Ok(_) => (), Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, err); return} Err(err) => {util::thread_freeze(c_err, err); return} // pause thread, wait for main to
// exit
} }
} }
} }
@ -52,7 +58,7 @@ pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: ut
loop { loop {
let media = match read_raw(&mut cmd_out, buff_size, media_type) { let media = match read_raw(&mut cmd_out, buff_size, media_type) {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(c_out); thread::park(); return} Err(err) => {drop(c_out); thread::park(); return} // drop to signal EOF to encoder
}; };
match c_out.send(media) { match c_out.send(media) {
Ok(_) => (), Ok(_) => (),

View file

@ -30,9 +30,12 @@ impl Decoder for H264Decoder {
} }
pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> { pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> {
// ffmpeg pipe input output, H264 -> raw YUV420P sequence
let cmd = Command::new("ffmpeg") let cmd = Command::new("ffmpeg")
.args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"]) .args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"])
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; .stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; // size of single
// yuv is
// 1.5*num_pixels
return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P}); return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P});
} }

View file

@ -22,7 +22,7 @@ pub fn spawn(
let (raw_v_in, raw_v_out) = mpsc::channel(); let (raw_v_in, raw_v_out) = mpsc::channel();
let (raw_a_in, raw_a_out) = mpsc::channel(); let (raw_a_in, raw_a_out) = mpsc::channel();
let (err_in_v, err_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel();
let err_in_a = err_in_v.clone(); let err_in_a = err_in_v.clone(); // double for 2 threads
let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?; let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?;
thread::spawn(move || { thread::spawn(move || {
cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v); cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v);

View file

@ -30,6 +30,8 @@ pub struct FLVReader {
impl FLVReader { impl FLVReader {
fn check_signature(&mut self) -> Result<(), Box<dyn Error>> { fn check_signature(&mut self) -> Result<(), Box<dyn Error>> {
// check for "LV" for FLV header
// "F" already used in spawning, header[..4] always bytes "F" "L" "V" 0x1
let mut sig_bytes_rem = [0u8; 3]; let mut sig_bytes_rem = [0u8; 3];
self.stdin.read_exact(&mut sig_bytes_rem)?; self.stdin.read_exact(&mut sig_bytes_rem)?;
if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 { if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 {
@ -39,6 +41,7 @@ impl FLVReader {
} }
fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> { fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> {
// skip rest of header. Header is (FLV1 + 1byte + 4byte offset + 4byte reserved)
let mut rest_of_header = [0u8; 5]; let mut rest_of_header = [0u8; 5];
self.stdin.read_exact(&mut rest_of_header)?; self.stdin.read_exact(&mut rest_of_header)?;
let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4; let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4;
@ -48,13 +51,15 @@ impl FLVReader {
} }
fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> { fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> {
// FLV packet header always 11 bytes
let mut tag_head = [0u8; 11]; let mut tag_head = [0u8; 11];
match self.stdin.read_exact(&mut tag_head) { match self.stdin.read_exact(&mut tag_head) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), // assumes EOF while reading header means 0 bytes were read, not just less than len
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
}; };
let tag_type = FLVTagType::try_from(tag_head[0])?; let tag_type = FLVTagType::try_from(tag_head[0])?;
// len is u24 not u32, u24 does not exist
let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize; let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize;
let mut packet_data = vec![0u8; tag_data_len]; let mut packet_data = vec![0u8; tag_data_len];
self.stdin.read_exact(packet_data.as_mut_slice())?; self.stdin.read_exact(packet_data.as_mut_slice())?;
@ -85,6 +90,7 @@ impl input::FileReader for FLVReader {
fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let (packet_type, mut data) = self.read_packet()?; let (packet_type, mut data) = self.read_packet()?;
return match packet_type { return match packet_type {
// add headers to raw bytestreams as needed
FLVTagType::Audio => Ok(util::NALUPacket { FLVTagType::Audio => Ok(util::NALUPacket {
packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()), packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()),
packet_data: nalu::prepend_a(data, &metadata), packet_data: nalu::prepend_a(data, &metadata),
@ -106,8 +112,10 @@ pub fn new_reader(stdin: io::Stdin) -> Result<FLVReader, Box<dyn Error>> {
fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> { fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> {
let mut metadata = util::Metadata { ..Default::default() }; let mut metadata = util::Metadata { ..Default::default() };
data.drain(..13); data.drain(..13); // skip until ECMA array, hacky, but seems fixed length and I'm not
// implementing a proper amf parser for 1 single tag that seems to be fixed length offset
let ecma_sync_byte = data.drain(..1).next().unwrap(); let ecma_sync_byte = data.drain(..1).next().unwrap();
// AMF ECMA array object marker
if ecma_sync_byte != 8 { if ecma_sync_byte != 8 {
return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail)); return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail));
} }
@ -115,8 +123,12 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
for _ in 0..arr_len { for _ in 0..arr_len {
let key = String::fetch_amf_val(&mut data); let key = String::fetch_amf_val(&mut data);
let amf_marker = u8::fetch_amf_val(&mut data); let amf_marker = u8::fetch_amf_val(&mut data);
if !handle_relevant_keys(key, &mut data, &mut metadata)? { if !handle_relevant_keys(key, &mut data, &mut metadata)? { // most are completely useless
// or unused (levels), if they
// are, get the value
match amf_marker { match amf_marker {
// added functions to base types, see below.
// would try generics, but restricting types is weird
0 => {let _ = f64::fetch_amf_val(&mut data);}, 0 => {let _ = f64::fetch_amf_val(&mut data);},
1 => {let _ = bool::fetch_amf_val(&mut data);}, 1 => {let _ = bool::fetch_amf_val(&mut data);},
2 => {let _ = String::fetch_amf_val(&mut data);}, 2 => {let _ = String::fetch_amf_val(&mut data);},
@ -124,11 +136,13 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
} }
} }
} }
// check if all needed values are set
return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))}; return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))};
} }
fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> { fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> {
match key.as_str() { match key.as_str() {
// fetches relevant values, should try to use above instead, but codecs complicate things
"width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)}, "width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)},
"height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)}, "height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)},
"framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)}, "framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)},

View file

@ -15,18 +15,20 @@ pub trait FileReader {
let nalu = match self.read_nalu(&metadata) { let nalu = match self.read_nalu(&metadata) {
Ok(x) => x, Ok(x) => x,
Err(err) => { Err(err) => {
// pause loop on EOF
if let Some(e) = err.downcast_ref::<util::DemuxerError>() { if let Some(e) = err.downcast_ref::<util::DemuxerError>() {
match e { match e {
util::DemuxerError::EOF => { util::DemuxerError::EOF => {
// drop to trigger RecvError on decode module to end
drop(v_in); drop(v_in);
drop(a_in); drop(a_in);
thread::park(); thread::park(); // prevent dropping err_out to keep main running
return return
}, },
_ => () _ => ()
} }
}; };
util::thread_freeze(err_in, err); util::thread_freeze(err_in, err); // pause thread, trigger main to exit
return return
} }
}; };
@ -50,7 +52,7 @@ pub trait FileReader {
pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> { pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> {
let mut stdin_hndl = io::stdin(); let mut stdin_hndl = io::stdin();
let mut byte1 = [0u8; 1]; let mut byte1 = [0u8; 1]; // sync byte to determine filetype, limited checking capability
stdin_hndl.read_exact(&mut byte1)?; stdin_hndl.read_exact(&mut byte1)?;
match &byte1[0] { match &byte1[0] {
0x46 => flv::new_reader(stdin_hndl), 0x46 => flv::new_reader(stdin_hndl),

View file

@ -1,6 +1,6 @@
mod input; mod input;
mod flv; mod flv;
mod nalu_flv; mod nalu_flv; // probably should organize into an flv submodule instead, but eh
use std::sync::mpsc; use std::sync::mpsc;
use std::error::Error; use std::error::Error;
@ -20,7 +20,7 @@ pub fn spawn() -> Result<
Box<dyn Error> Box<dyn Error>
> { > {
let mut reader = input::new_reader()?; let mut reader = input::new_reader()?;
let metadata = Arc::new(reader.init()?); let metadata = Arc::new(reader.init()?); // arc to share across modules
let (v_in, v_out) = mpsc::channel(); let (v_in, v_out) = mpsc::channel();
let (a_in, a_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel();
let (err_in, err_out) = mpsc::channel(); let (err_in, err_out) = mpsc::channel();

View file

@ -29,25 +29,28 @@ fn parse_avcc(mut data: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
} }
fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> { fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; // base nalu delimiter for H264
match data[1] { match data[1] {
0 => { 0 => {
data.drain(..5); // sequence header contains avcC as in ISOBMF binding,
// contains sps and pps needed for bytestream
// must be segmented for NALU format
data.drain(..5); // composition time info, etc, see FLV spec
let (mut sps, mut pps) = parse_avcc(data); let (mut sps, mut pps) = parse_avcc(data);
nalu.append(&mut sps); nalu.append(&mut sps);
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
nalu.append(&mut pps); nalu.append(&mut pps);
return nalu; return nalu;
}, },
2 => {return vec![0u8; 0];}, 2 => {return vec![0u8; 0];}, // end seq, return null
1 => { 1 => {
data.drain(..5); data.drain(..5);
loop { loop { // may be multiple NALU packets per tag
let mut buff = [0u8; 4]; let mut buff = [0u8; 4];
buff.copy_from_slice(data.drain(..4).as_slice()); buff.copy_from_slice(data.drain(..4).as_slice());
let nalu_len = u32::from_be_bytes(buff) as usize; let nalu_len = u32::from_be_bytes(buff) as usize;
nalu.extend_from_slice(data.drain(..nalu_len).as_slice()); nalu.extend_from_slice(data.drain(..nalu_len).as_slice());
if data.len() <= 4 {return nalu} if data.len() <= 4 {return nalu} // at end of packet
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]); nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
} }
}, },
@ -56,6 +59,7 @@ fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
} }
fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> { fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
// see multimedia wiki for details
let aot_idx = 1u8; let aot_idx = 1u8;
let frame_len = (data_len + 7) as u16; let frame_len = (data_len + 7) as u16;
let samplerate_idx: u8 = match metadata.audio.samplerate { let samplerate_idx: u8 = match metadata.audio.samplerate {
@ -75,7 +79,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
_ => {panic!("invalid samplerate")} _ => {panic!("invalid samplerate")}
}; };
let chan_conf_idx = metadata.audio.channels; let chan_conf_idx = metadata.audio.channels;
let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; // sync bytes and guaranteed
// value set
head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2); head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2);
head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8); head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8);
head[4] = ((frame_len >> 3) & 0xff) as u8; head[4] = ((frame_len >> 3) & 0xff) as u8;
@ -86,6 +91,8 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> { fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> {
match data[1] { match data[1] {
0 => { 0 => {
// bytestream needs no codec config, all information already in metadata, also would
// not be persistent across packets
return vec![0u8; 0]; return vec![0u8; 0];
}, },
1 => { 1 => {

View file

@ -6,6 +6,7 @@ use crate::util;
use crate::encode::codecs::Encoder; use crate::encode::codecs::Encoder;
use crate::encode::codecs; use crate::encode::codecs;
// spawn encoder for a given output encoding
pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> { pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> {
let v = match v_target { let v = match v_target {
util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?, util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?,

View file

@ -18,19 +18,21 @@ impl Encoder for OpusEncoder {
} }
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> { fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// opus outputs Ogg format, can be parsed with the same parser as the rest
return Ok(()); return Ok(());
} }
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
let mut buff = [0u8; 27]; let mut buff = [0u8; 27]; // each packet has a 27 byte header, mostly irrelevant
match stdout.read_exact(&mut buff) { match stdout.read_exact(&mut buff) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume EOF at header means EOF proper
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
}; };
let num_segments = buff[buff.len() - 1] as usize; let num_segments = buff[buff.len() - 1] as usize;
let mut segment_table_buff = vec![0u8; num_segments]; let mut segment_table_buff = vec![0u8; num_segments]; // dictates actual size of data as
// raw sum of u8
stdout.read_exact(&mut segment_table_buff)?; stdout.read_exact(&mut segment_table_buff)?;
let mut page_size:usize = 0; let mut page_size:usize = 0;
for i in segment_table_buff { for i in segment_table_buff {
@ -45,6 +47,7 @@ impl Encoder for OpusEncoder {
pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> { pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> {
let channels = metadata.audio.channels.to_string(); let channels = metadata.audio.channels.to_string();
let samplerate = metadata.audio.samplerate.to_string(); let samplerate = metadata.audio.samplerate.to_string();
// opus enc for PCM LE 16 -> Opus
let cmd = Command::new("opusenc") let cmd = Command::new("opusenc")
.args([ .args([
"--quiet", "--quiet",
@ -52,7 +55,7 @@ pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Er
"--raw-bits", "16", "--raw-bits", "16",
"--raw-rate", samplerate.as_str(), "--raw-rate", samplerate.as_str(),
"--raw-chan", channels.as_str(), "--raw-chan", channels.as_str(),
"--max-delay", "1", "--max-delay", "1", // 0 works worse than 1 somehow
"-", "-" "-", "-"
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; ]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
return Ok(OpusEncoder {cmd: cmd}); return Ok(OpusEncoder {cmd: cmd});

View file

@ -12,8 +12,13 @@ use crate::util;
pub trait Encoder { pub trait Encoder {
fn stdin(&mut self) -> ChildStdin; fn stdin(&mut self) -> ChildStdin;
// read from encoder
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>; fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>;
// for specific encoder peculiarities
// svtav1 gives DKIF format, which has its own extra header which breaks just parsing as is
// opus outputs Ogg which needs full and header follows the same format as the rest for packets
// use as needed
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>; fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) { fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
@ -24,7 +29,9 @@ pub trait Encoder {
loop { loop {
let nalu = match self.read_nalu() { let nalu = match self.read_nalu() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(c_out); thread::park(); return} Err(err) => {drop(c_out); thread::park(); return} // drop c_out to signal EOF to
// muxer, park to keep c_err
// allocated
}; };
match c_out.send(nalu) { match c_out.send(nalu) {
Ok(_) => (), Ok(_) => (),
@ -34,6 +41,7 @@ pub trait Encoder {
} }
} }
// write to encoder
fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> { fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> {
cmd_in.write_all(media.sample.as_slice())?; cmd_in.write_all(media.sample.as_slice())?;
Ok(()) Ok(())
@ -43,11 +51,15 @@ pub fn read_raw_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::RawMedia
loop { loop {
let media = match c_in.recv() { let media = match c_in.recv() {
Ok(x) => x, Ok(x) => x,
Err(err) => {drop(cmd_in); thread::park(); return} Err(err) => {drop(cmd_in); thread::park(); return} // drop stdin to signal EOF, which
// lets encoder close stdout as
// needed, park to keep c_err
// allocated
}; };
match write_raw(&mut cmd_in, media) { match write_raw(&mut cmd_in, media) {
Ok(_) => (), Ok(_) => (),
Err(err) => {util::thread_freeze(c_err, err); return} Err(err) => {util::thread_freeze(c_err, err); return} // park thread, wait for main to
// exit
} }
} }
} }

View file

@ -17,7 +17,9 @@ impl Encoder for AV1Encoder {
} }
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> { fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// SVTAV1 outputs Duck IVF framing, check specs if needed
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
// remove dkif header
let mut buff = [0u8; 8]; let mut buff = [0u8; 8];
stdout.read_exact(&mut buff)?; stdout.read_exact(&mut buff)?;
let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize; let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize;
@ -28,16 +30,18 @@ impl Encoder for AV1Encoder {
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> { fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
let mut stdout = self.cmd.stdout.as_mut().unwrap(); let mut stdout = self.cmd.stdout.as_mut().unwrap();
let mut buff = [0u8; 12]; let mut buff = [0u8; 12]; // frame always 12 byte header
match stdout.read_exact(&mut buff) { match stdout.read_exact(&mut buff) {
Ok(_) => (), Ok(_) => (),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume eof if header read fails
Err(err) => return Err(Box::new(err)) Err(err) => return Err(Box::new(err))
} }
let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; // actual sample
// size in frame
// header
let mut data = vec![0u8; nalu_len]; let mut data = vec![0u8; nalu_len];
stdout.read_exact(data.as_mut_slice())?; stdout.read_exact(data.as_mut_slice())?;
data.drain(..2); data.drain(..2); // skip temporal delimiter packet (2 bytes, always first of each dkif frame)
return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data}); return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data});
} }
} }
@ -46,15 +50,16 @@ pub fn new_av1(metadata: Arc<util::Metadata>) -> Result<AV1Encoder, Box<dyn Erro
let width = metadata.video.width.to_string(); let width = metadata.video.width.to_string();
let height = metadata.video.height.to_string(); let height = metadata.video.height.to_string();
let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string(); let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string();
// SVTAV1 for YUV420 -> AV1, piped input output
let cmd = Command::new("SvtAv1EncApp") let cmd = Command::new("SvtAv1EncApp")
.args([ .args([
"--errlog", "/dev/null", "--errlog", "/dev/null", // dump errors
"--progress", "0", "--progress", "0",
"-w", width.as_str(), "-w", width.as_str(),
"-h", height.as_str(), "-h", height.as_str(),
"--fps-num", fps_num.as_str(), "--fps-num", fps_num.as_str(),
"--preset", "12", "--preset", "12", // super fast
"--keyint", "30", "--keyint", "30", // keyframe every 30 frames
"-i", "-", "-i", "-",
"-b", "-" "-b", "-"
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?; ]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;

View file

@ -24,7 +24,7 @@ pub fn spawn(
let (v_in, v_out) = mpsc::channel(); let (v_in, v_out) = mpsc::channel();
let (a_in, a_out) = mpsc::channel(); let (a_in, a_out) = mpsc::channel();
let (err_in_v, err_out) = mpsc::channel(); let (err_in_v, err_out) = mpsc::channel();
let err_in_a = err_in_v.clone(); let err_in_a = err_in_v.clone(); // double for each thread
let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?; let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?;
thread::spawn(move || { thread::spawn(move || {
cmds::handle_encoder(v_encoder, v, v_in, err_in_v); cmds::handle_encoder(v_encoder, v, v_in, err_in_v);

View file

@ -9,29 +9,63 @@ use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
fn init() -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>>{ // spawns threads for each subtask, linking input/output channels
// returns an array of error receivers to exit if needed
// passes input metadata around for video and audio metadata, makes scaling and resampling
// impossible
fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>> {
let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?; let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?;
let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?; let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?;
// output codecs hardcoded, probably want to pass in via cmd args instead
let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?; let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?;
let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), util::HLSArgs{segment_time: 4, segment_prepend: String::from("/vid/tmp/"), max_segments: 4})?; let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), args)?;
return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]); return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]);
} }
// post init loop. Modules are initialized without error, loop and check for errors
fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> { fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> {
let (mut v_eof, mut a_eof) = (false, false); // proper EOF handling
loop { loop {
if v_eof && a_eof {return Ok(())} // if both video and audio streams have ended
for i in 0..4 { for i in 0..4 {
match error_handles[i].try_recv() { match error_handles[i].try_recv() {
Ok(err) => return Err(err), Ok(err) => {
Err(mpsc::TryRecvError::Empty) => (), // check if EOF error found.
if let Some(eof_error) = err.downcast_ref::<util::MuxerError>() {
match eof_error {
util::MuxerError::VideoEOF => {v_eof = true;},
util::MuxerError::AudioEOF => {a_eof = true;},
_ => return Err(err)
}
} else {
return Err(err)
}
},
Err(mpsc::TryRecvError::Empty) => (), // do nothing since nothing was sent and
// thread still active
Err(err) => return Err(Box::new(util::ThreadError(i))), Err(err) => return Err(Box::new(util::ThreadError(i))),
} }
} }
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100)); // sleep so isn't running constantly for no
// reason
}
}
// gets args
// bad, replace with clap impl if meaning to actually use
fn parse_args() -> util::HLSArgs {
let mut args = std::env::args();
args.next(); // skip prog name
util::HLSArgs {
segment_time: args.next().expect("no time interval").parse().expect("not int"),
segment_prepend: args.next().expect("no prepend"),
max_segments: args.next().expect("no max list size").parse().expect("not int"),
} }
} }
fn main() { fn main() {
let error_handles = match init() { let error_handles = match init(parse_args()) {
Ok(x) => x, Ok(x) => x,
Err(err) => {dbg!(err); exit(1);} Err(err) => {dbg!(err); exit(1);}
}; };

View file

@ -3,6 +3,8 @@ use std::sync::{mpsc, Arc};
use std::time; use std::time;
use std::thread; use std::thread;
use std::fs; use std::fs;
use std::cmp::max;
use std::io::Write;
use crate::util; use crate::util;
use crate::muxer::hls::mp4; use crate::muxer::hls::mp4;
@ -14,46 +16,72 @@ pub struct HLSHandler {
args: util::HLSArgs, args: util::HLSArgs,
curr_segment_idx: usize, curr_segment_idx: usize,
curr_segments: Vec<String>, curr_segments: Vec<String>,
curr_media_seq_idx: usize,
} }
impl HLSHandler { impl HLSHandler {
pub fn data_loop(&mut self) { pub fn data_loop(&mut self) {
self.muxer.spawn_read_loops(); self.muxer.spawn_read_loops();
loop { loop {
match self.muxer.get_segment(self.curr_segment_idx) { match self.muxer.get_segment(self.curr_segment_idx, self.args.segment_time) {
None => (), None => (),
Some(segment) => { Some(segment) => {
let del_list = self.new_segment(segment); let del_list = self.new_segment(segment); // making a new segment should push
delete_files(del_list); // out old segments
self.delete_files(del_list);
self.dump_playlist(); self.dump_playlist();
self.curr_segment_idx += 1; self.curr_segment_idx += 1;
} }
} }
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100)); // don't need to have it constantly
// running,
} }
} }
fn new_segment(&mut self, segment: segments::Segment) -> std::vec::Drain<String> { fn new_segment(&mut self, segment: segments::Segment) -> Vec<String> {
segment.dump(); segment.dump(); // write out the segment,
self.curr_segments.push(segment.filename); self.curr_segments.push(segment.filename); // add its filename to the curr playlist,
self.curr_segments.drain(..self.curr_segments.len() - self.args.max_segments) let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; // and remove segement filenames that are old
self.curr_segments.drain(..to_drain).collect()
} }
fn delete_files(&mut self, mut del_list: Vec<String>) {
for filename in &del_list {
_ = fs::remove_file(filename);
self.curr_media_seq_idx += 1; // media sequence must increment per deletion as per HLS
// spec
}
}
fn dump_playlist(&self) { fn dump_playlist(&self) {
todo!(); let mut segment_listing = String::new();
for segment_filename in &self.curr_segments {
// each segment has its duration and access URI, should also have Datetime, but neither
// really needed nor "simple" with std rust, would need chrono or time or be ready for
// headaches
segment_listing += format!("\
#EXTINF:{},\n\
{}\n\
", self.args.segment_time, (self.args.segment_prepend.clone() + segment_filename)).as_str();
}
// full playlist string, see HLS spec for details
let playlist_string = format!("\
#EXTM3U\n\
#EXT-X-VERSION:7\n\
#EXT-X-TARGETDURATION:{}\n\
#EXT-X-MEDIA-SEQUENCE:{}\n\
#EXT-X-MAP:URI=\"{}init.mp4\"\n\
{}\
#EXT-X-ENDLIST\
", self.args.segment_time, self.curr_media_seq_idx, self.args.segment_prepend.clone(), segment_listing);
let mut f = fs::File::create("stream.m3u8").unwrap();
f.write_all(&playlist_string.as_bytes());
} }
} }
pub fn spawn(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>, metadata: Arc<util::Metadata>, args: util::HLSArgs) -> Result<HLSHandler, Box<dyn Error>> { pub fn spawn(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>, metadata: Arc<util::Metadata>, args: util::HLSArgs) -> Result<HLSHandler, Box<dyn Error>> {
Ok(HLSHandler {muxer: mp4::new_muxer(v, a, metadata, err_in.clone())?, err_in: err_in, args: args, curr_segment_idx: 0, curr_segments: Vec::new()}) Ok(HLSHandler {muxer: mp4::new_muxer(v, a, metadata, err_in.clone())?, err_in: err_in, args: args, curr_segment_idx: 0, curr_segments: Vec::new(), curr_media_seq_idx: 0})
} }
fn delete_files(mut del_list: std::vec::Drain<String>) {
loop {
match del_list.next() {
Some(filename) => fs::remove_file(filename),
None => return
};
}
}

View file

@ -1,8 +1,14 @@
// cursed abomination, not well thought out, but works fine, rust also complicates inheritance
// every necessary MP4 atom to create readable files,
// lookup the corresponding item in the ISO doc or the quicktime spec (for everything not under
// moof), because I am not doing all of that here.
use std::sync::Arc; use std::sync::Arc;
use std::io::Write; use std::io::Write;
use std::fs::File; use std::fs::File;
use crate::util; use crate::util;
use crate::muxer::hls::mp4::samples;
pub trait MP4Atom { pub trait MP4Atom {
fn marshall(&self) -> Vec<u8>; fn marshall(&self) -> Vec<u8>;
@ -224,7 +230,7 @@ impl Default for TKHD {
fn default() -> Self { fn default() -> Self {
TKHD { TKHD {
version: 0, version: 0,
flags: 1, flags: 3,
create_time: 0, create_time: 0,
modify_time: 0, modify_time: 0,
track_id: 0, track_id: 0,
@ -434,7 +440,7 @@ impl Default for VMHD {
fn default() -> Self { fn default() -> Self {
VMHD { VMHD {
version: 0, version: 0,
flags: 0, flags: 1,
mode: 0, mode: 0,
opcolor: [0,0,0], opcolor: [0,0,0],
} }
@ -920,6 +926,7 @@ pub fn construct_moov(stsd_v: STSD, stsd_a: STSD, metadata: &Arc<util::Metadata>
} }
} }
// video will always be track 1, audio in trak 2
fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK { fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK {
let mut tkhd = TKHD {..Default::default() }; let mut tkhd = TKHD {..Default::default() };
match stsd.entry { match stsd.entry {
@ -1014,6 +1021,238 @@ pub fn dump_init_tree(moov: MOOV) -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
pub struct MOOF {
mfhd: MFHD,
trafs: [TRAF; 2],
}
impl MP4Atom for MOOF {
fn marshall(&self) -> Vec<u8> {
let mut content = self.mfhd.marshall();
content.append(&mut self.trafs[0].marshall());
content.append(&mut self.trafs[1].marshall());
make_box(content, *b"moof")
}
}
struct MFHD {
version: u8,
flags: u32,
seq_num: u32,
}
impl Default for MFHD {
fn default() -> Self {
MFHD {
version: 0,
flags: 0,
seq_num: 0,
}
}
}
impl MP4Atom for MFHD {
fn marshall(&self) -> Vec<u8> {
let mut content = Vec::new();
content.push(self.version);
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
content.extend_from_slice(&self.seq_num.to_be_bytes());
make_box(content, *b"mfhd")
}
}
struct TRAF {
tfhd: TFHD,
tfdt: TFDT,
trun: TRUN,
}
impl MP4Atom for TRAF {
fn marshall(&self) -> Vec<u8> {
let mut content = self.tfhd.marshall();
content.append(&mut self.tfdt.marshall());
content.append(&mut self.trun.marshall());
make_box(content, *b"traf")
}
}
struct TFHD {
version: u8,
flags: u32,
track_id: u32,
default_duration: u32,
}
impl MP4Atom for TFHD {
fn marshall(&self) -> Vec<u8> {
let mut content = Vec::new();
content.push(self.version);
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
content.extend_from_slice(&self.track_id.to_be_bytes());
content.extend_from_slice(&self.default_duration.to_be_bytes());
make_box(content, *b"tfhd")
}
}
impl Default for TFHD {
fn default() -> Self {
TFHD {
version: 0,
flags: 0x00020008,
track_id: 0,
default_duration: 0,
}
}
}
struct TFDT {
version: u8,
flags: u32,
start_time: u64,
}
impl MP4Atom for TFDT {
fn marshall(&self) -> Vec<u8> {
let mut content = Vec::new();
content.push(self.version);
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
if self.version == 1 {
content.extend_from_slice(&self.start_time.to_be_bytes());
} else {
content.extend_from_slice(&(self.start_time as u32).to_be_bytes());
}
make_box(content, *b"tfdt")
}
}
impl Default for TFDT {
fn default() -> Self {
TFDT {
version: 0,
flags: 0,
start_time: 0,
}
}
}
struct TRUN {
version: u8,
flags: u32,
num_samples: u32,
offset: u32,
samples: Vec<SampleData>,
}
impl Default for TRUN {
fn default() -> Self {
TRUN {
version: 0,
flags: 0x00000601,
num_samples: 0,
offset: 0,
samples: Vec::new(),
}
}
}
impl MP4Atom for TRUN {
fn marshall(&self) -> Vec<u8> {
let mut content = Vec::new();
content.push(self.version);
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
content.extend_from_slice(&self.num_samples.to_be_bytes());
content.extend_from_slice(&self.offset.to_be_bytes());
for sample in &self.samples {
content.append(&mut sample.marshall());
}
make_box(content, *b"trun")
}
}
struct SampleData {
size: u32,
flags: u32,
}
impl MP4Atom for SampleData {
fn marshall(&self) -> Vec<u8> {
let mut content = Vec::new();
content.extend_from_slice(&self.size.to_be_bytes());
content.extend_from_slice(&self.flags.to_be_bytes());
return content;
}
}
pub struct MDAT {
pub v_samples: Vec<u8>,
pub a_samples: Vec<u8>,
}
impl MP4Atom for MDAT {
fn marshall(&self) -> Vec<u8> {
let mut content = self.v_samples.clone();
content.append(&mut self.a_samples.clone());
make_box(content, *b"mdat")
}
}
pub fn new_moof(v_samples: &Vec<samples::Sample>, default_dur_v: u32, a_samples: &Vec<samples::Sample>, default_dur_a: u32, mdat_v_offset: usize, mdat_a_offset: usize, moof_idx: usize) -> MOOF {
// hacky fix, since sticking to a standard format and defaults for segments, only trun is
// variable which has a predictable size regardless, moof offset + v and a trun sample table
// len and mdat header offset, will break using version 1 atoms for time,
// alternative is to somehow record idx for base offset (which breaks MP4atoms impl) to modify
// later, or calc full length properly, also complicated
let mdat_data_start = 152 + v_samples.len() * 8 + a_samples.len() * 8 + 8;
MOOF {
mfhd: MFHD { seq_num: moof_idx as u32, ..Default::default() },
trafs: [ TRAF {
tfhd: TFHD {
track_id: 1,
default_duration: default_dur_v,
..Default::default()
},
trun: new_trun(v_samples, mdat_v_offset + mdat_data_start),
tfdt: TFDT {
start_time: default_dur_v as u64 * (v_samples.len() * (moof_idx - 1)) as u64,
..Default::default()
},
}, TRAF{
tfhd: TFHD {
track_id: 2,
default_duration: default_dur_a,
..Default::default()
},
trun: new_trun(a_samples, mdat_a_offset + mdat_data_start),
tfdt: TFDT {
start_time: default_dur_a as u64 * (a_samples.len() * (moof_idx - 1)) as u64,
..Default::default()
},
} ],
}
}
fn new_trun(samples: &Vec<samples::Sample>, data_offset: usize) -> TRUN {
TRUN {
num_samples: samples.len() as u32,
offset: data_offset as u32,
samples: make_sample_table(samples),
..Default::default()
}
}
fn make_sample_table(samples: &Vec<samples::Sample>) -> Vec<SampleData> {
let mut table = Vec::new();
for sample in samples {
table.push( SampleData {
size: sample.size,
flags: sample.flags,
});
}
return table;
}
// takes content, applies MP4 atom encapsulation
fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> { fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
let mut head = vec![0u8; 8]; let mut head = vec![0u8; 8];
head.splice(..4, ((content.len() + 8) as u32).to_be_bytes()); head.splice(..4, ((content.len() + 8) as u32).to_be_bytes());
@ -1021,3 +1260,5 @@ fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
head.append(&mut content); head.append(&mut content);
return head; return head;
} }

View file

@ -1,5 +1,5 @@
mod mp4muxer; mod mp4muxer;
mod atoms; pub mod atoms;
pub mod samples; pub mod samples;
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
@ -8,6 +8,7 @@ use std::error::Error;
use crate::util; use crate::util;
pub struct MP4Muxer { pub struct MP4Muxer {
// option to allow being taken by threads
v: Option<mpsc::Receiver<util::NALUPacket>>, v: Option<mpsc::Receiver<util::NALUPacket>>,
pub v_samples: Arc<samples::SampleQueue>, pub v_samples: Arc<samples::SampleQueue>,
a: Option<mpsc::Receiver<util::NALUPacket>>, a: Option<mpsc::Receiver<util::NALUPacket>>,
@ -22,15 +23,22 @@ pub fn new_muxer(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NA
a: Some(a), a: Some(a),
metadata: metadata.clone(), metadata: metadata.clone(),
err_in: err_in, err_in: err_in,
// Arc<Mutex<Vec>> for multithread write access
v_samples: Arc::new(samples::SampleQueue { v_samples: Arc::new(samples::SampleQueue {
queue: Mutex::new(Vec::new()), queue: Mutex::new(Vec::new()),
default_duration: 1000, default_duration: 1000, // timescale as fps * 1000, each sample then always 1000
// regardless of fps
samples_per_sec: metadata.video.framerate as usize, samples_per_sec: metadata.video.framerate as usize,
}), }),
a_samples: Arc::new(samples::SampleQueue { a_samples: Arc::new(samples::SampleQueue {
queue: Mutex::new(Vec::new()), queue: Mutex::new(Vec::new()),
default_duration: metadata.audio.samplerate * 20 / 1000, default_duration: metadata.audio.samplerate * 20 / 1000, // timescale set as
samples_per_sec: 50, // samplerate, duration per
// sample is sample duration
// (20ms packet) * samples per
// time (samplerate)
samples_per_sec: 50, // opus default is 20ms packets (1000/20 = 50), would need to change this if
// changing that
}), }),
}; };
muxer.gen_init()?; muxer.gen_init()?;

View file

@ -11,12 +11,17 @@ impl mp4::MP4Muxer {
pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> { pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> {
let stsd_v = self.handle_v_cc()?; let stsd_v = self.handle_v_cc()?;
let stsd_a = self.handle_a_cc()?; let stsd_a = self.handle_a_cc()?;
// only need moov for codec declarations, ftyp is uneeded apparently
let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata); let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata);
mp4::atoms::dump_init_tree(init_tree)?; mp4::atoms::dump_init_tree(init_tree)?; // write out moov in init.mp4 segment
Ok(()) Ok(())
} }
fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
// handles getting stsd per sample, parsing codec config mostly
// and peculiarities per codec
// AV1 has the first nalu as is in the samples
// also contains info for codec config
let v_cc = self.v.as_ref().expect("must be owned at init").recv()?; let v_cc = self.v.as_ref().expect("must be owned at init").recv()?;
return match v_cc.packet_type { return match v_cc.packet_type {
util::NALUPacketType::Video(util::VideoCodec::AV1) => { util::NALUPacketType::Video(util::VideoCodec::AV1) => {
@ -28,6 +33,10 @@ impl mp4::MP4Muxer {
} }
fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> { fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
// handles getting stsd per sample, parsing codec config mostly
// and peculiarities per codec
// Ogg (Opus) first packet has codec config info, not needed in samples
// Second packet has no discernable use for ISOBMF? Also not needed in samples
let a_cc = self.a.as_ref().expect("must be owned at init").recv()?; let a_cc = self.a.as_ref().expect("must be owned at init").recv()?;
return match a_cc.packet_type { return match a_cc.packet_type {
util::NALUPacketType::Audio(util::AudioCodec::OPUS) => { util::NALUPacketType::Audio(util::AudioCodec::OPUS) => {
@ -48,21 +57,32 @@ impl mp4::MP4Muxer {
let mut i = 1; let mut i = 1;
loop { loop {
match v.recv() { match v.recv() {
// hacky fix for video keyframe sample flags
// set at 30 frames per keyframe, would need some way to send the same value to
// the encoder as here or attempt to parse the actual OBU (even the header
// seems cursed, just use a lib instead)
// no clue how those flags are parsed, but they work (0x02 is keyframe, 0x0101
// is interframe)
Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});}, Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});},
Err(_) => { Err(_) => {
util::thread_freeze(err_in1, Box::new(util::MuxerError::EOF)); // send EOF to main for proper shutdown
util::thread_freeze(err_in1, Box::new(util::MuxerError::VideoEOF));
return return
} }
} }
i += 1;
} }
}); });
thread::spawn(move || { thread::spawn(move || {
loop { loop {
match a.recv() { match a.recv() {
Ok(x) => {a_queue.push(x.packet_data, 0x01010000);}, // audio should be all keyframes, should use the tfhd default var, but
// more work to decouple video and audio impls
Ok(x) => {a_queue.push(x.packet_data, 0x02000000);},
Err(_) => { Err(_) => {
util::thread_freeze(err_in2, Box::new(util::MuxerError::EOF)); // send EOF for proper shutdown
util::thread_freeze(err_in2, Box::new(util::MuxerError::AudioEOF));
return return
} }
} }
@ -70,8 +90,21 @@ impl mp4::MP4Muxer {
}); });
} }
pub fn get_segment(&mut self, idx: usize) -> Option<segments::Segment> { pub fn get_segment(&mut self, idx: usize, time: usize) -> Option<segments::Segment> {
todo!(); // if sample queues are long enough to extract a segment, do so, otherwise return None
// probably should not be an option, instead loop here until ready to allow flushing at
// exit
if self.v_samples.segment_ready(time) && self.a_samples.segment_ready(time) {
return Some(segments::Segment {
filename: idx.to_string() + ".m4s",
idx: idx,
segment_video: self.v_samples.get_segment_samples(time),
sample_duration_v: self.v_samples.default_duration,
segment_audio: self.a_samples.get_segment_samples(time),
sample_duration_a: self.a_samples.default_duration,
})
}
return None
} }
} }
@ -89,7 +122,7 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
dref_idx: 1, dref_idx: 1,
width: metadata.video.width as u16, width: metadata.video.width as u16,
height: metadata.video.height as u16, height: metadata.video.height as u16,
resolution: (72u64 << 48) + (72u64 << 16), resolution: (72u64 << 48) + (72u64 << 16), // 72 dpi, fixed point numbers (72.00 + 72.00)
sample_per_frame: 1, sample_per_frame: 1,
encoder_name: String::from("libsvtav1"), encoder_name: String::from("libsvtav1"),
pixel_depth: 24, pixel_depth: 24,
@ -104,6 +137,7 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
} }
fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD { fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD {
// see opus encapsulation in ISOBMF for details
let mut opus_binding = vec![0u8; 11]; let mut opus_binding = vec![0u8; 11];
opus_binding[1] = metadata.audio.channels; opus_binding[1] = metadata.audio.channels;
let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap()); let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap());
@ -121,7 +155,7 @@ fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms:
dref_idx: 1, dref_idx: 1,
channels: metadata.audio.channels as u16, channels: metadata.audio.channels as u16,
sample_size: 16, sample_size: 16,
sample_rate: metadata.audio.samplerate, sample_rate: metadata.audio.samplerate << 16, // fixed point (u16 int + u16 decimals)
codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding} codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding}
}; };
return mp4::atoms::STSD { return mp4::atoms::STSD {

View file

@ -1,5 +1,9 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::muxer::hls::segments;
// forces a common duration, no composition time offset
// flags declared per sample as well as size
pub struct Sample { pub struct Sample {
pub data: Vec<u8>, pub data: Vec<u8>,
pub size: u32, pub size: u32,
@ -20,4 +24,13 @@ impl SampleQueue {
flags: flags, flags: flags,
}) })
} }
pub fn segment_ready(&self, time: usize) -> bool {
self.queue.lock().unwrap().len() >= time * self.samples_per_sec
}
pub fn get_segment_samples(&self, time: usize) -> Vec<Sample> {
let samples_to_drain = time * self.samples_per_sec;
self.queue.lock().unwrap().drain(..samples_to_drain).collect()
}
} }

View file

@ -1,15 +1,44 @@
use std::time; use std::time;
use std::fs::File;
use std::io::Write;
use crate::muxer::hls::mp4::samples; use crate::muxer::hls::mp4::samples;
use crate::muxer::hls::mp4::atoms;
use crate::muxer::hls::mp4::atoms::MP4Atom;
pub struct Segment { pub struct Segment {
segment_idx: usize,
pub filename: String, pub filename: String,
segment_video: Vec<samples::Sample>, pub idx: usize,
segment_audio: Vec<samples::Sample>, pub segment_video: Vec<samples::Sample>,
pub sample_duration_v: u32, // default interval per sample at designated timescale (same for
// audio instead below)
pub segment_audio: Vec<samples::Sample>,
pub sample_duration_a: u32,
} }
impl Segment { impl Segment {
pub fn dump(&self) { pub fn dump(&self) {
todo!(); let mdat = self.new_mdat(); // sample full byte length needed for moof generation
// increment idx (0-indexed) for moofs (1-indexed)
let moof = atoms::new_moof(&self.segment_video, self.sample_duration_v, &self.segment_audio, self.sample_duration_a, 0, mdat.v_samples.len(), self.idx + 1);
let mut segment_bytes = moof.marshall();
segment_bytes.append(&mut mdat.marshall());
let mut f = File::create(&self.filename).unwrap();
f.write_all(&segment_bytes);
}
fn new_mdat(&self) -> atoms::MDAT {
let (mut v_samples, mut a_samples) = (Vec::new(), Vec::new());
for sample in &self.segment_video {
v_samples.extend_from_slice(sample.data.as_slice());
}
for sample in &self.segment_audio {
a_samples.extend_from_slice(sample.data.as_slice());
}
atoms::MDAT {
v_samples: v_samples,
a_samples: a_samples,
}
} }
} }

View file

@ -1,3 +1,5 @@
// general use structs, functions and errors go here
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::thread; use std::thread;
@ -198,7 +200,8 @@ impl fmt::Display for EncoderError {
pub enum MuxerError { pub enum MuxerError {
InvalidCodec, InvalidCodec,
CCParseError, CCParseError,
EOF, VideoEOF,
AudioEOF,
} }
impl Error for MuxerError {} impl Error for MuxerError {}
@ -209,7 +212,6 @@ impl fmt::Debug for MuxerError {
match self { match self {
MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"), MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"),
MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"), MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"),
MuxerError::EOF => write!(f, "input EOF"),
_ => write!(f, "Error not described yet") _ => write!(f, "Error not described yet")
} }
} }
@ -222,11 +224,15 @@ impl fmt::Display for MuxerError {
} }
// funcs // funcs
// parks a thread and sends an error over the param channel
// used to pause threads keeping channels open, so that main can receive the actual error and then
// exit, instead of a "RecvError"
pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) { pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) {
err_sender.send(err).unwrap(); err_sender.send(err).unwrap();
thread::park(); thread::park();
} }
// should probably make this a macro instead, eh
pub fn blank_vec(n: usize) -> Vec<u8> { pub fn blank_vec(n: usize) -> Vec<u8> {
return vec![0u8; n]; return vec![0u8; n];
} }