Compare commits
No commits in common. "7d24ea4d5c87607a13800286c7dd9aa7d86ace39" and "3889ed32e88199eb030955207b973502fd1c715c" have entirely different histories.
7d24ea4d5c
...
3889ed32e8
23 changed files with 81 additions and 545 deletions
15
README.md
15
README.md
|
@ -1,15 +0,0 @@
|
||||||
# hls-transcoder-2
|
|
||||||
|
|
||||||
Extremely basic hls segmenter for live video written in Rust. 1st was scrapped since I had 0 clue what I was doing at all. Singular clue has since been acquired. Would not recommend using or expanding on this for anything other than figuring out the how and what.
|
|
||||||
|
|
||||||
Currently only works with FLV input (so RTMP only), H264 and AAC codec inputs, and outputs AV1 and Opus codecs in a fragmented mp4 playlist. Decoding and encoding happens via 3rd party programs accessible on $PATH. (ffmpeg and faad for decoding, SvtEncAv1App and opusenc for encoding). Would use lib versions, but would only do so for a proper project using extern crates
|
|
||||||
|
|
||||||
Uses only uses Rust's stdlib.
|
|
||||||
|
|
||||||
**Not intended for actual use**. Will have limitations and some aspects are just not well implemented, also relies on progs being installed for encoding and decoding.
|
|
||||||
|
|
||||||
`cargo build` to a bin. Usage is `hls-transcoder-2 <hls_time> <uri_prepend> <hls_list_size>`
|
|
||||||
|
|
||||||
* `hls_time`: duration of each segment (usize only currently)
|
|
||||||
* `uri_prepend`: string to add to each segment and init.mp4, for http server uris
|
|
||||||
* `hls_list_size`: max number of segments to include in the playlist, older segments will be deleted, (usize)
|
|
|
@ -7,7 +7,6 @@ use crate::decode::codecs::Decoder;
|
||||||
use crate::decode::codecs;
|
use crate::decode::codecs;
|
||||||
|
|
||||||
pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> {
|
pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decoder), Box<dyn Error>> {
|
||||||
// create appropriate decoder implementation for input codec
|
|
||||||
let v = match metadata.video.codec {
|
let v = match metadata.video.codec {
|
||||||
Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?,
|
Some(util::VideoCodec::H264) => codecs::video::new_h264(metadata.clone())?,
|
||||||
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
|
_ => {return Err(Box::new(util::DecoderError::CodecNotImplemented));}
|
||||||
|
@ -20,10 +19,9 @@ pub fn spawn(metadata: Arc<util::Metadata>) -> Result<(impl Decoder, impl Decode
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
|
pub fn handle_decoder(mut decoder: impl Decoder, c_in: mpsc::Receiver<util::NALUPacket>, c_out: mpsc::Sender<util::RawMedia>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
|
||||||
// take stdin and stdout for process to read from and write to on different threads
|
|
||||||
let stdin = decoder.stdin();
|
let stdin = decoder.stdin();
|
||||||
let stdout = decoder.stdout();
|
let stdout = decoder.stdout();
|
||||||
let raw_buff_size = decoder.raw_buff_size(); // determined by input parameters and codec
|
let raw_buff_size = decoder.raw_buff_size();
|
||||||
let raw_sample_type = decoder.media_type();
|
let raw_sample_type = decoder.media_type();
|
||||||
let c_err2 = c_err.clone();
|
let c_err2 = c_err.clone();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
|
|
|
@ -30,10 +30,9 @@ impl Decoder for AACDecoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> {
|
pub fn new_aac(metadata: Arc<util::Metadata>) -> Result<AACDecoder, Box<dyn Error>> {
|
||||||
// faad for AAC -> raw PCM 16-bit little-e, piped
|
|
||||||
let cmd = Command::new("faad")
|
let cmd = Command::new("faad")
|
||||||
.args(["-f", "2", "-w", "-q", "-"])
|
.args(["-f", "2", "-w", "-q", "-"])
|
||||||
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
||||||
let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize; // channels * 2 bytes per sample, * samplerate for bytes per second
|
let bytes_per_sample_chunk = (metadata.audio.samplerate * 2 * metadata.audio.channels as u32) as usize;
|
||||||
return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE})
|
return Ok(AACDecoder {cmd: cmd, raw_buff_size: bytes_per_sample_chunk, raw_sample_type: util::RawMediaType::PCM16LE})
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,11 @@ pub trait Decoder {
|
||||||
fn media_type(&self) -> util::RawMediaType;
|
fn media_type(&self) -> util::RawMediaType;
|
||||||
}
|
}
|
||||||
|
|
||||||
// write to decoder
|
|
||||||
fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
|
fn write_nalu(cmd_in: &mut ChildStdin, nalu: util::NALUPacket) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
cmd_in.write_all(nalu.packet_data.as_slice())?; // nalus should already be in bytestream
|
cmd_in.write_all(nalu.packet_data.as_slice())?;
|
||||||
// format, send as is
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// read from decoder
|
|
||||||
fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
|
fn read_raw(cmd_out: &mut ChildStdout, raw_buff_size: usize, media_type: util::RawMediaType) -> Result<util::RawMedia, Box<dyn Error + Send + Sync>> {
|
||||||
let mut raw_buff = vec![0u8; raw_buff_size];
|
let mut raw_buff = vec![0u8; raw_buff_size];
|
||||||
match cmd_out.read_exact(raw_buff.as_mut_slice()) {
|
match cmd_out.read_exact(raw_buff.as_mut_slice()) {
|
||||||
|
@ -42,14 +39,11 @@ pub fn read_nalu_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::NALUPac
|
||||||
loop {
|
loop {
|
||||||
let nalu = match c_in.recv() {
|
let nalu = match c_in.recv() {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {drop(cmd_in); thread::park();return} // drop cmd_in to signal EOF to
|
Err(err) => {drop(cmd_in); thread::park();return}
|
||||||
// decoder, letting the decoder close
|
|
||||||
// stdout when done
|
|
||||||
};
|
};
|
||||||
match write_nalu(&mut cmd_in, nalu) {
|
match write_nalu(&mut cmd_in, nalu) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {util::thread_freeze(c_err, err); return} // pause thread, wait for main to
|
Err(err) => {util::thread_freeze(c_err, err); return}
|
||||||
// exit
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,7 +52,7 @@ pub fn write_raw_loop(mut cmd_out: ChildStdout, buff_size: usize, media_type: ut
|
||||||
loop {
|
loop {
|
||||||
let media = match read_raw(&mut cmd_out, buff_size, media_type) {
|
let media = match read_raw(&mut cmd_out, buff_size, media_type) {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {drop(c_out); thread::park(); return} // drop to signal EOF to encoder
|
Err(err) => {drop(c_out); thread::park(); return}
|
||||||
};
|
};
|
||||||
match c_out.send(media) {
|
match c_out.send(media) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
|
|
@ -30,12 +30,9 @@ impl Decoder for H264Decoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> {
|
pub fn new_h264(metadata: Arc<util::Metadata>) -> Result<H264Decoder, Box<dyn Error>> {
|
||||||
// ffmpeg pipe input output, H264 -> raw YUV420P sequence
|
|
||||||
let cmd = Command::new("ffmpeg")
|
let cmd = Command::new("ffmpeg")
|
||||||
.args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"])
|
.args(["-y", "-hide_banner", "-loglevel", "error" , "-i", "-", "-c:v", "rawvideo", "-pix_fmt", "yuv420p", "-f", "rawvideo", "-"])
|
||||||
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
.stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
||||||
let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2; // size of single
|
let bytes_per_sample = metadata.video.width * metadata.video.height * 3 / 2;
|
||||||
// yuv is
|
|
||||||
// 1.5*num_pixels
|
|
||||||
return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P});
|
return Ok(H264Decoder {cmd: cmd, raw_buff_size: bytes_per_sample as usize, raw_sample_type: util::RawMediaType::YUV420P});
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ pub fn spawn(
|
||||||
let (raw_v_in, raw_v_out) = mpsc::channel();
|
let (raw_v_in, raw_v_out) = mpsc::channel();
|
||||||
let (raw_a_in, raw_a_out) = mpsc::channel();
|
let (raw_a_in, raw_a_out) = mpsc::channel();
|
||||||
let (err_in_v, err_out) = mpsc::channel();
|
let (err_in_v, err_out) = mpsc::channel();
|
||||||
let err_in_a = err_in_v.clone(); // double for 2 threads
|
let err_in_a = err_in_v.clone();
|
||||||
let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?;
|
let (v_decoder, a_decoder) = cmds::spawn(metadata.clone())?;
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v);
|
cmds::handle_decoder(v_decoder, v, raw_v_in, err_in_v);
|
||||||
|
|
|
@ -30,8 +30,6 @@ pub struct FLVReader {
|
||||||
|
|
||||||
impl FLVReader {
|
impl FLVReader {
|
||||||
fn check_signature(&mut self) -> Result<(), Box<dyn Error>> {
|
fn check_signature(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
// check for "LV" for FLV header
|
|
||||||
// "F" already used in spawning, header[..4] always bytes "F" "L" "V" 0x1
|
|
||||||
let mut sig_bytes_rem = [0u8; 3];
|
let mut sig_bytes_rem = [0u8; 3];
|
||||||
self.stdin.read_exact(&mut sig_bytes_rem)?;
|
self.stdin.read_exact(&mut sig_bytes_rem)?;
|
||||||
if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 {
|
if String::from_utf8(sig_bytes_rem[..2].to_vec())? != "LV" || sig_bytes_rem[2] != 1 {
|
||||||
|
@ -41,7 +39,6 @@ impl FLVReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> {
|
fn skip_init_header(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
// skip rest of header. Header is (FLV1 + 1byte + 4byte offset + 4byte reserved)
|
|
||||||
let mut rest_of_header = [0u8; 5];
|
let mut rest_of_header = [0u8; 5];
|
||||||
self.stdin.read_exact(&mut rest_of_header)?;
|
self.stdin.read_exact(&mut rest_of_header)?;
|
||||||
let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4;
|
let head_offset = u32::from_be_bytes(rest_of_header[1..].try_into()?) as usize - 9 + 4;
|
||||||
|
@ -51,15 +48,13 @@ impl FLVReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> {
|
fn read_packet(&mut self) -> Result<(FLVTagType, Vec<u8>), Box<dyn Error + Send + Sync>> {
|
||||||
// FLV packet header always 11 bytes
|
|
||||||
let mut tag_head = [0u8; 11];
|
let mut tag_head = [0u8; 11];
|
||||||
match self.stdin.read_exact(&mut tag_head) {
|
match self.stdin.read_exact(&mut tag_head) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)), // assumes EOF while reading header means 0 bytes were read, not just less than len
|
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::DemuxerError::EOF)),
|
||||||
Err(err) => return Err(Box::new(err))
|
Err(err) => return Err(Box::new(err))
|
||||||
};
|
};
|
||||||
let tag_type = FLVTagType::try_from(tag_head[0])?;
|
let tag_type = FLVTagType::try_from(tag_head[0])?;
|
||||||
// len is u24 not u32, u24 does not exist
|
|
||||||
let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize;
|
let tag_data_len = (u32::from_be_bytes(tag_head[..4].try_into()?) & 0xffffff) as usize;
|
||||||
let mut packet_data = vec![0u8; tag_data_len];
|
let mut packet_data = vec![0u8; tag_data_len];
|
||||||
self.stdin.read_exact(packet_data.as_mut_slice())?;
|
self.stdin.read_exact(packet_data.as_mut_slice())?;
|
||||||
|
@ -90,7 +85,6 @@ impl input::FileReader for FLVReader {
|
||||||
fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
fn read_nalu(&mut self, metadata: &util::Metadata) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
||||||
let (packet_type, mut data) = self.read_packet()?;
|
let (packet_type, mut data) = self.read_packet()?;
|
||||||
return match packet_type {
|
return match packet_type {
|
||||||
// add headers to raw bytestreams as needed
|
|
||||||
FLVTagType::Audio => Ok(util::NALUPacket {
|
FLVTagType::Audio => Ok(util::NALUPacket {
|
||||||
packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()),
|
packet_type: util::NALUPacketType::Audio(*metadata.audio.codec.as_ref().unwrap()),
|
||||||
packet_data: nalu::prepend_a(data, &metadata),
|
packet_data: nalu::prepend_a(data, &metadata),
|
||||||
|
@ -112,10 +106,8 @@ pub fn new_reader(stdin: io::Stdin) -> Result<FLVReader, Box<dyn Error>> {
|
||||||
|
|
||||||
fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> {
|
fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>> {
|
||||||
let mut metadata = util::Metadata { ..Default::default() };
|
let mut metadata = util::Metadata { ..Default::default() };
|
||||||
data.drain(..13); // skip until ECMA array, hacky, but seems fixed length and I'm not
|
data.drain(..13);
|
||||||
// implementing a proper amf parser for 1 single tag that seems to be fixed length offset
|
|
||||||
let ecma_sync_byte = data.drain(..1).next().unwrap();
|
let ecma_sync_byte = data.drain(..1).next().unwrap();
|
||||||
// AMF ECMA array object marker
|
|
||||||
if ecma_sync_byte != 8 {
|
if ecma_sync_byte != 8 {
|
||||||
return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail));
|
return Err(Box::new(util::DemuxerError::FLVMetadataSyncFail));
|
||||||
}
|
}
|
||||||
|
@ -123,12 +115,8 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
|
||||||
for _ in 0..arr_len {
|
for _ in 0..arr_len {
|
||||||
let key = String::fetch_amf_val(&mut data);
|
let key = String::fetch_amf_val(&mut data);
|
||||||
let amf_marker = u8::fetch_amf_val(&mut data);
|
let amf_marker = u8::fetch_amf_val(&mut data);
|
||||||
if !handle_relevant_keys(key, &mut data, &mut metadata)? { // most are completely useless
|
if !handle_relevant_keys(key, &mut data, &mut metadata)? {
|
||||||
// or unused (levels), if they
|
|
||||||
// are, get the value
|
|
||||||
match amf_marker {
|
match amf_marker {
|
||||||
// added functions to base types, see below.
|
|
||||||
// would try generics, but restricting types is weird
|
|
||||||
0 => {let _ = f64::fetch_amf_val(&mut data);},
|
0 => {let _ = f64::fetch_amf_val(&mut data);},
|
||||||
1 => {let _ = bool::fetch_amf_val(&mut data);},
|
1 => {let _ = bool::fetch_amf_val(&mut data);},
|
||||||
2 => {let _ = String::fetch_amf_val(&mut data);},
|
2 => {let _ = String::fetch_amf_val(&mut data);},
|
||||||
|
@ -136,13 +124,11 @@ fn process_metadata(mut data: Vec<u8>) -> Result<util::Metadata, Box<dyn Error>>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check if all needed values are set
|
|
||||||
return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))};
|
return if metadata.is_valid() {Ok(metadata)} else {Err(Box::new(util::DemuxerError::MetadataValNotSet))};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> {
|
fn handle_relevant_keys(key: String, data: &mut Vec<u8>, metadata: &mut util::Metadata) -> Result<bool, Box<dyn Error>> {
|
||||||
match key.as_str() {
|
match key.as_str() {
|
||||||
// fetches relevant values, should try to use above instead, but codecs complicate things
|
|
||||||
"width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)},
|
"width" => {metadata.video.width = f64::fetch_amf_val(data) as u32; Ok(true)},
|
||||||
"height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)},
|
"height" => {metadata.video.height = f64::fetch_amf_val(data) as u32; Ok(true)},
|
||||||
"framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)},
|
"framerate" => {metadata.video.framerate = f64::fetch_amf_val(data) as f32; Ok(true)},
|
||||||
|
|
|
@ -15,20 +15,18 @@ pub trait FileReader {
|
||||||
let nalu = match self.read_nalu(&metadata) {
|
let nalu = match self.read_nalu(&metadata) {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// pause loop on EOF
|
|
||||||
if let Some(e) = err.downcast_ref::<util::DemuxerError>() {
|
if let Some(e) = err.downcast_ref::<util::DemuxerError>() {
|
||||||
match e {
|
match e {
|
||||||
util::DemuxerError::EOF => {
|
util::DemuxerError::EOF => {
|
||||||
// drop to trigger RecvError on decode module to end
|
|
||||||
drop(v_in);
|
drop(v_in);
|
||||||
drop(a_in);
|
drop(a_in);
|
||||||
thread::park(); // prevent dropping err_out to keep main running
|
thread::park();
|
||||||
return
|
return
|
||||||
},
|
},
|
||||||
_ => ()
|
_ => ()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
util::thread_freeze(err_in, err); // pause thread, trigger main to exit
|
util::thread_freeze(err_in, err);
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -52,7 +50,7 @@ pub trait FileReader {
|
||||||
|
|
||||||
pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> {
|
pub fn new_reader() -> Result<impl FileReader, Box<dyn Error>> {
|
||||||
let mut stdin_hndl = io::stdin();
|
let mut stdin_hndl = io::stdin();
|
||||||
let mut byte1 = [0u8; 1]; // sync byte to determine filetype, limited checking capability
|
let mut byte1 = [0u8; 1];
|
||||||
stdin_hndl.read_exact(&mut byte1)?;
|
stdin_hndl.read_exact(&mut byte1)?;
|
||||||
match &byte1[0] {
|
match &byte1[0] {
|
||||||
0x46 => flv::new_reader(stdin_hndl),
|
0x46 => flv::new_reader(stdin_hndl),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
mod input;
|
mod input;
|
||||||
mod flv;
|
mod flv;
|
||||||
mod nalu_flv; // probably should organize into an flv submodule instead, but eh
|
mod nalu_flv;
|
||||||
|
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
@ -20,7 +20,7 @@ pub fn spawn() -> Result<
|
||||||
Box<dyn Error>
|
Box<dyn Error>
|
||||||
> {
|
> {
|
||||||
let mut reader = input::new_reader()?;
|
let mut reader = input::new_reader()?;
|
||||||
let metadata = Arc::new(reader.init()?); // arc to share across modules
|
let metadata = Arc::new(reader.init()?);
|
||||||
let (v_in, v_out) = mpsc::channel();
|
let (v_in, v_out) = mpsc::channel();
|
||||||
let (a_in, a_out) = mpsc::channel();
|
let (a_in, a_out) = mpsc::channel();
|
||||||
let (err_in, err_out) = mpsc::channel();
|
let (err_in, err_out) = mpsc::channel();
|
||||||
|
|
|
@ -29,28 +29,25 @@ fn parse_avcc(mut data: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
|
fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
|
||||||
let mut nalu = vec![0u8, 0u8, 0u8, 1u8]; // base nalu delimiter for H264
|
let mut nalu = vec![0u8, 0u8, 0u8, 1u8];
|
||||||
match data[1] {
|
match data[1] {
|
||||||
0 => {
|
0 => {
|
||||||
// sequence header contains avcC as in ISOBMF binding,
|
data.drain(..5);
|
||||||
// contains sps and pps needed for bytestream
|
|
||||||
// must be segmented for NALU format
|
|
||||||
data.drain(..5); // composition time info, etc, see FLV spec
|
|
||||||
let (mut sps, mut pps) = parse_avcc(data);
|
let (mut sps, mut pps) = parse_avcc(data);
|
||||||
nalu.append(&mut sps);
|
nalu.append(&mut sps);
|
||||||
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
|
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
|
||||||
nalu.append(&mut pps);
|
nalu.append(&mut pps);
|
||||||
return nalu;
|
return nalu;
|
||||||
},
|
},
|
||||||
2 => {return vec![0u8; 0];}, // end seq, return null
|
2 => {return vec![0u8; 0];},
|
||||||
1 => {
|
1 => {
|
||||||
data.drain(..5);
|
data.drain(..5);
|
||||||
loop { // may be multiple NALU packets per tag
|
loop {
|
||||||
let mut buff = [0u8; 4];
|
let mut buff = [0u8; 4];
|
||||||
buff.copy_from_slice(data.drain(..4).as_slice());
|
buff.copy_from_slice(data.drain(..4).as_slice());
|
||||||
let nalu_len = u32::from_be_bytes(buff) as usize;
|
let nalu_len = u32::from_be_bytes(buff) as usize;
|
||||||
nalu.extend_from_slice(data.drain(..nalu_len).as_slice());
|
nalu.extend_from_slice(data.drain(..nalu_len).as_slice());
|
||||||
if data.len() <= 4 {return nalu} // at end of packet
|
if data.len() <= 4 {return nalu}
|
||||||
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
|
nalu.append(&mut vec![0u8, 0u8, 0u8, 1u8]);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -59,7 +56,6 @@ fn preprocess_h264(mut data: Vec<u8>) -> Vec<u8> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
|
fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
|
||||||
// see multimedia wiki for details
|
|
||||||
let aot_idx = 1u8;
|
let aot_idx = 1u8;
|
||||||
let frame_len = (data_len + 7) as u16;
|
let frame_len = (data_len + 7) as u16;
|
||||||
let samplerate_idx: u8 = match metadata.audio.samplerate {
|
let samplerate_idx: u8 = match metadata.audio.samplerate {
|
||||||
|
@ -79,8 +75,7 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
|
||||||
_ => {panic!("invalid samplerate")}
|
_ => {panic!("invalid samplerate")}
|
||||||
};
|
};
|
||||||
let chan_conf_idx = metadata.audio.channels;
|
let chan_conf_idx = metadata.audio.channels;
|
||||||
let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8]; // sync bytes and guaranteed
|
let mut head = vec![0xffu8, 0xf1u8, 0u8, 0u8, 0u8, 0u8, 0xfcu8];
|
||||||
// value set
|
|
||||||
head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2);
|
head[2] = (aot_idx << 6) | (samplerate_idx << 2) | (chan_conf_idx >> 2);
|
||||||
head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8);
|
head[3] = ((chan_conf_idx & 0x3) << 6) | ((frame_len >> 11) as u8);
|
||||||
head[4] = ((frame_len >> 3) & 0xff) as u8;
|
head[4] = ((frame_len >> 3) & 0xff) as u8;
|
||||||
|
@ -91,8 +86,6 @@ fn make_adts_head(metadata: &util::Metadata, data_len: usize) -> Vec<u8> {
|
||||||
fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> {
|
fn preprocess_aac(mut data: Vec<u8>, metadata: &util::Metadata) -> Vec<u8> {
|
||||||
match data[1] {
|
match data[1] {
|
||||||
0 => {
|
0 => {
|
||||||
// bytestream needs no codec config, all information already in metadata, also would
|
|
||||||
// not be persistent across packets
|
|
||||||
return vec![0u8; 0];
|
return vec![0u8; 0];
|
||||||
},
|
},
|
||||||
1 => {
|
1 => {
|
||||||
|
|
|
@ -6,7 +6,6 @@ use crate::util;
|
||||||
use crate::encode::codecs::Encoder;
|
use crate::encode::codecs::Encoder;
|
||||||
use crate::encode::codecs;
|
use crate::encode::codecs;
|
||||||
|
|
||||||
// spawn encoder for a given output encoding
|
|
||||||
pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> {
|
pub fn spawn(metadata: Arc<util::Metadata>, v_target: util::VideoCodec, a_target: util::AudioCodec) -> Result<(impl Encoder + Send, impl Encoder + Send), Box<dyn Error>> {
|
||||||
let v = match v_target {
|
let v = match v_target {
|
||||||
util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?,
|
util::VideoCodec::AV1 => codecs::video::new_av1(metadata.clone())?,
|
||||||
|
|
|
@ -18,21 +18,19 @@ impl Encoder for OpusEncoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
// opus outputs Ogg format, can be parsed with the same parser as the rest
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
||||||
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
||||||
let mut buff = [0u8; 27]; // each packet has a 27 byte header, mostly irrelevant
|
let mut buff = [0u8; 27];
|
||||||
match stdout.read_exact(&mut buff) {
|
match stdout.read_exact(&mut buff) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume EOF at header means EOF proper
|
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)),
|
||||||
Err(err) => return Err(Box::new(err))
|
Err(err) => return Err(Box::new(err))
|
||||||
};
|
};
|
||||||
let num_segments = buff[buff.len() - 1] as usize;
|
let num_segments = buff[buff.len() - 1] as usize;
|
||||||
let mut segment_table_buff = vec![0u8; num_segments]; // dictates actual size of data as
|
let mut segment_table_buff = vec![0u8; num_segments];
|
||||||
// raw sum of u8
|
|
||||||
stdout.read_exact(&mut segment_table_buff)?;
|
stdout.read_exact(&mut segment_table_buff)?;
|
||||||
let mut page_size:usize = 0;
|
let mut page_size:usize = 0;
|
||||||
for i in segment_table_buff {
|
for i in segment_table_buff {
|
||||||
|
@ -47,7 +45,6 @@ impl Encoder for OpusEncoder {
|
||||||
pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> {
|
pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Error>> {
|
||||||
let channels = metadata.audio.channels.to_string();
|
let channels = metadata.audio.channels.to_string();
|
||||||
let samplerate = metadata.audio.samplerate.to_string();
|
let samplerate = metadata.audio.samplerate.to_string();
|
||||||
// opus enc for PCM LE 16 -> Opus
|
|
||||||
let cmd = Command::new("opusenc")
|
let cmd = Command::new("opusenc")
|
||||||
.args([
|
.args([
|
||||||
"--quiet",
|
"--quiet",
|
||||||
|
@ -55,7 +52,7 @@ pub fn new_opus(metadata: Arc<util::Metadata>) -> Result<OpusEncoder, Box<dyn Er
|
||||||
"--raw-bits", "16",
|
"--raw-bits", "16",
|
||||||
"--raw-rate", samplerate.as_str(),
|
"--raw-rate", samplerate.as_str(),
|
||||||
"--raw-chan", channels.as_str(),
|
"--raw-chan", channels.as_str(),
|
||||||
"--max-delay", "1", // 0 works worse than 1 somehow
|
"--max-delay", "1",
|
||||||
"-", "-"
|
"-", "-"
|
||||||
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
||||||
return Ok(OpusEncoder {cmd: cmd});
|
return Ok(OpusEncoder {cmd: cmd});
|
||||||
|
|
|
@ -12,13 +12,8 @@ use crate::util;
|
||||||
pub trait Encoder {
|
pub trait Encoder {
|
||||||
fn stdin(&mut self) -> ChildStdin;
|
fn stdin(&mut self) -> ChildStdin;
|
||||||
|
|
||||||
// read from encoder
|
|
||||||
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>;
|
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>>;
|
||||||
|
|
||||||
// for specific encoder peculiarities
|
|
||||||
// svtav1 gives DKIF format, which has its own extra header which breaks just parsing as is
|
|
||||||
// opus outputs Ogg which needs full and header follows the same format as the rest for packets
|
|
||||||
// use as needed
|
|
||||||
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
|
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
|
||||||
|
|
||||||
fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
|
fn write_nalu_loop(&mut self, c_out: mpsc::Sender<util::NALUPacket>, c_err: mpsc::Sender<Box<dyn Error + Send + Sync>>) {
|
||||||
|
@ -29,9 +24,7 @@ pub trait Encoder {
|
||||||
loop {
|
loop {
|
||||||
let nalu = match self.read_nalu() {
|
let nalu = match self.read_nalu() {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {drop(c_out); thread::park(); return} // drop c_out to signal EOF to
|
Err(err) => {drop(c_out); thread::park(); return}
|
||||||
// muxer, park to keep c_err
|
|
||||||
// allocated
|
|
||||||
};
|
};
|
||||||
match c_out.send(nalu) {
|
match c_out.send(nalu) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -41,7 +34,6 @@ pub trait Encoder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// write to encoder
|
|
||||||
fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> {
|
fn write_raw(cmd_in: &mut ChildStdin, media: util::RawMedia) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
cmd_in.write_all(media.sample.as_slice())?;
|
cmd_in.write_all(media.sample.as_slice())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -51,15 +43,11 @@ pub fn read_raw_loop(mut cmd_in: ChildStdin, c_in: mpsc::Receiver<util::RawMedia
|
||||||
loop {
|
loop {
|
||||||
let media = match c_in.recv() {
|
let media = match c_in.recv() {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {drop(cmd_in); thread::park(); return} // drop stdin to signal EOF, which
|
Err(err) => {drop(cmd_in); thread::park(); return}
|
||||||
// lets encoder close stdout as
|
|
||||||
// needed, park to keep c_err
|
|
||||||
// allocated
|
|
||||||
};
|
};
|
||||||
match write_raw(&mut cmd_in, media) {
|
match write_raw(&mut cmd_in, media) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {util::thread_freeze(c_err, err); return} // park thread, wait for main to
|
Err(err) => {util::thread_freeze(c_err, err); return}
|
||||||
// exit
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,7 @@ impl Encoder for AV1Encoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
fn preprocess_stream(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
// SVTAV1 outputs Duck IVF framing, check specs if needed
|
|
||||||
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
||||||
// remove dkif header
|
|
||||||
let mut buff = [0u8; 8];
|
let mut buff = [0u8; 8];
|
||||||
stdout.read_exact(&mut buff)?;
|
stdout.read_exact(&mut buff)?;
|
||||||
let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize;
|
let head_len = u16::from_le_bytes(buff[6..].try_into().unwrap()) as usize;
|
||||||
|
@ -30,18 +28,16 @@ impl Encoder for AV1Encoder {
|
||||||
|
|
||||||
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
fn read_nalu(&mut self) -> Result<util::NALUPacket, Box<dyn Error + Send + Sync>> {
|
||||||
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
let mut stdout = self.cmd.stdout.as_mut().unwrap();
|
||||||
let mut buff = [0u8; 12]; // frame always 12 byte header
|
let mut buff = [0u8; 12];
|
||||||
match stdout.read_exact(&mut buff) {
|
match stdout.read_exact(&mut buff) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)), // assume eof if header read fails
|
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Err(Box::new(util::EncoderError::EOF)),
|
||||||
Err(err) => return Err(Box::new(err))
|
Err(err) => return Err(Box::new(err))
|
||||||
}
|
}
|
||||||
let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize; // actual sample
|
let nalu_len = u32::from_le_bytes(buff[..4].try_into().unwrap()) as usize;
|
||||||
// size in frame
|
|
||||||
// header
|
|
||||||
let mut data = vec![0u8; nalu_len];
|
let mut data = vec![0u8; nalu_len];
|
||||||
stdout.read_exact(data.as_mut_slice())?;
|
stdout.read_exact(data.as_mut_slice())?;
|
||||||
data.drain(..2); // skip temporal delimiter packet (2 bytes, always first of each dkif frame)
|
data.drain(..2);
|
||||||
return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data});
|
return Ok(util::NALUPacket {packet_type: util::NALUPacketType::Video(util::VideoCodec::AV1), packet_data: data});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,16 +46,15 @@ pub fn new_av1(metadata: Arc<util::Metadata>) -> Result<AV1Encoder, Box<dyn Erro
|
||||||
let width = metadata.video.width.to_string();
|
let width = metadata.video.width.to_string();
|
||||||
let height = metadata.video.height.to_string();
|
let height = metadata.video.height.to_string();
|
||||||
let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string();
|
let fps_num = ((metadata.video.framerate * 1000.0) as u32).to_string();
|
||||||
// SVTAV1 for YUV420 -> AV1, piped input output
|
|
||||||
let cmd = Command::new("SvtAv1EncApp")
|
let cmd = Command::new("SvtAv1EncApp")
|
||||||
.args([
|
.args([
|
||||||
"--errlog", "/dev/null", // dump errors
|
"--errlog", "/dev/null",
|
||||||
"--progress", "0",
|
"--progress", "0",
|
||||||
"-w", width.as_str(),
|
"-w", width.as_str(),
|
||||||
"-h", height.as_str(),
|
"-h", height.as_str(),
|
||||||
"--fps-num", fps_num.as_str(),
|
"--fps-num", fps_num.as_str(),
|
||||||
"--preset", "12", // super fast
|
"--preset", "12",
|
||||||
"--keyint", "30", // keyframe every 30 frames
|
"--keyint", "30",
|
||||||
"-i", "-",
|
"-i", "-",
|
||||||
"-b", "-"
|
"-b", "-"
|
||||||
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
]).stdin(Stdio::piped()).stdout(Stdio::piped()).spawn()?;
|
||||||
|
|
|
@ -24,7 +24,7 @@ pub fn spawn(
|
||||||
let (v_in, v_out) = mpsc::channel();
|
let (v_in, v_out) = mpsc::channel();
|
||||||
let (a_in, a_out) = mpsc::channel();
|
let (a_in, a_out) = mpsc::channel();
|
||||||
let (err_in_v, err_out) = mpsc::channel();
|
let (err_in_v, err_out) = mpsc::channel();
|
||||||
let err_in_a = err_in_v.clone(); // double for each thread
|
let err_in_a = err_in_v.clone();
|
||||||
let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?;
|
let (v_encoder, a_encoder) = cmds::spawn(metadata, v_target, a_target)?;
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
cmds::handle_encoder(v_encoder, v, v_in, err_in_v);
|
cmds::handle_encoder(v_encoder, v, v_in, err_in_v);
|
||||||
|
|
46
src/main.rs
46
src/main.rs
|
@ -9,63 +9,29 @@ use std::sync::mpsc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
// spawns threads for each subtask, linking input/output channels
|
fn init() -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>>{
|
||||||
// returns an array of error receivers to exit if needed
|
|
||||||
// passes input metadata around for video and audio metadata, makes scaling and resampling
|
|
||||||
// impossible
|
|
||||||
fn init(args: util::HLSArgs) -> Result<[mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4], Box<dyn std::error::Error>> {
|
|
||||||
let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?;
|
let (v_out, a_out, metadata, demux_err_recv) = demux::spawn()?;
|
||||||
let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?;
|
let (raw_v_out, raw_a_out, decode_err_recv) = decode::spawn(v_out, a_out, metadata.clone())?;
|
||||||
// output codecs hardcoded, probably want to pass in via cmd args instead
|
|
||||||
let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?;
|
let (enc_v_out, enc_a_out, encode_err_recv) = encode::spawn(raw_v_out, raw_a_out, metadata.clone(), util::VideoCodec::AV1, util::AudioCodec::OPUS)?;
|
||||||
let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), args)?;
|
let muxer_err_recv = muxer::spawn(enc_v_out, enc_a_out, metadata.clone(), util::HLSArgs{segment_time: 4, segment_prepend: String::from("/vid/tmp/"), max_segments: 4})?;
|
||||||
return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]);
|
return Ok([demux_err_recv, decode_err_recv, encode_err_recv, muxer_err_recv]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// post init loop. Modules are initialized without error, loop and check for errors
|
|
||||||
fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> {
|
fn process(error_handles: [mpsc::Receiver<Box<dyn std::error::Error + Send + Sync>>; 4]) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let (mut v_eof, mut a_eof) = (false, false); // proper EOF handling
|
|
||||||
loop {
|
loop {
|
||||||
if v_eof && a_eof {return Ok(())} // if both video and audio streams have ended
|
|
||||||
for i in 0..4 {
|
for i in 0..4 {
|
||||||
match error_handles[i].try_recv() {
|
match error_handles[i].try_recv() {
|
||||||
Ok(err) => {
|
Ok(err) => return Err(err),
|
||||||
// check if EOF error found.
|
Err(mpsc::TryRecvError::Empty) => (),
|
||||||
if let Some(eof_error) = err.downcast_ref::<util::MuxerError>() {
|
|
||||||
match eof_error {
|
|
||||||
util::MuxerError::VideoEOF => {v_eof = true;},
|
|
||||||
util::MuxerError::AudioEOF => {a_eof = true;},
|
|
||||||
_ => return Err(err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(err)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(mpsc::TryRecvError::Empty) => (), // do nothing since nothing was sent and
|
|
||||||
// thread still active
|
|
||||||
Err(err) => return Err(Box::new(util::ThreadError(i))),
|
Err(err) => return Err(Box::new(util::ThreadError(i))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
thread::sleep(Duration::from_millis(100)); // sleep so isn't running constantly for no
|
thread::sleep(Duration::from_millis(100));
|
||||||
// reason
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// gets args
|
|
||||||
// bad, replace with clap impl if meaning to actually use
|
|
||||||
fn parse_args() -> util::HLSArgs {
|
|
||||||
let mut args = std::env::args();
|
|
||||||
args.next(); // skip prog name
|
|
||||||
util::HLSArgs {
|
|
||||||
segment_time: args.next().expect("no time interval").parse().expect("not int"),
|
|
||||||
segment_prepend: args.next().expect("no prepend"),
|
|
||||||
max_segments: args.next().expect("no max list size").parse().expect("not int"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let error_handles = match init(parse_args()) {
|
let error_handles = match init() {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => {dbg!(err); exit(1);}
|
Err(err) => {dbg!(err); exit(1);}
|
||||||
};
|
};
|
||||||
|
|
|
@ -3,8 +3,6 @@ use std::sync::{mpsc, Arc};
|
||||||
use std::time;
|
use std::time;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::cmp::max;
|
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
use crate::util;
|
use crate::util;
|
||||||
use crate::muxer::hls::mp4;
|
use crate::muxer::hls::mp4;
|
||||||
|
@ -16,72 +14,46 @@ pub struct HLSHandler {
|
||||||
args: util::HLSArgs,
|
args: util::HLSArgs,
|
||||||
curr_segment_idx: usize,
|
curr_segment_idx: usize,
|
||||||
curr_segments: Vec<String>,
|
curr_segments: Vec<String>,
|
||||||
curr_media_seq_idx: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HLSHandler {
|
impl HLSHandler {
|
||||||
pub fn data_loop(&mut self) {
|
pub fn data_loop(&mut self) {
|
||||||
self.muxer.spawn_read_loops();
|
self.muxer.spawn_read_loops();
|
||||||
loop {
|
loop {
|
||||||
match self.muxer.get_segment(self.curr_segment_idx, self.args.segment_time) {
|
match self.muxer.get_segment(self.curr_segment_idx) {
|
||||||
None => (),
|
None => (),
|
||||||
Some(segment) => {
|
Some(segment) => {
|
||||||
let del_list = self.new_segment(segment); // making a new segment should push
|
let del_list = self.new_segment(segment);
|
||||||
// out old segments
|
delete_files(del_list);
|
||||||
self.delete_files(del_list);
|
|
||||||
self.dump_playlist();
|
self.dump_playlist();
|
||||||
self.curr_segment_idx += 1;
|
self.curr_segment_idx += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
thread::sleep(time::Duration::from_millis(100)); // don't need to have it constantly
|
thread::sleep(time::Duration::from_millis(100));
|
||||||
// running,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_segment(&mut self, segment: segments::Segment) -> Vec<String> {
|
fn new_segment(&mut self, segment: segments::Segment) -> std::vec::Drain<String> {
|
||||||
segment.dump(); // write out the segment,
|
segment.dump();
|
||||||
self.curr_segments.push(segment.filename); // add its filename to the curr playlist,
|
self.curr_segments.push(segment.filename);
|
||||||
let to_drain = if self.curr_segments.len() > self.args.max_segments {self.curr_segments.len() - self.args.max_segments} else {0}; // and remove segement filenames that are old
|
self.curr_segments.drain(..self.curr_segments.len() - self.args.max_segments)
|
||||||
self.curr_segments.drain(..to_drain).collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn delete_files(&mut self, mut del_list: Vec<String>) {
|
|
||||||
for filename in &del_list {
|
|
||||||
_ = fs::remove_file(filename);
|
|
||||||
self.curr_media_seq_idx += 1; // media sequence must increment per deletion as per HLS
|
|
||||||
// spec
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dump_playlist(&self) {
|
fn dump_playlist(&self) {
|
||||||
let mut segment_listing = String::new();
|
todo!();
|
||||||
for segment_filename in &self.curr_segments {
|
|
||||||
// each segment has its duration and access URI, should also have Datetime, but neither
|
|
||||||
// really needed nor "simple" with std rust, would need chrono or time or be ready for
|
|
||||||
// headaches
|
|
||||||
segment_listing += format!("\
|
|
||||||
#EXTINF:{},\n\
|
|
||||||
{}\n\
|
|
||||||
", self.args.segment_time, (self.args.segment_prepend.clone() + segment_filename)).as_str();
|
|
||||||
}
|
|
||||||
// full playlist string, see HLS spec for details
|
|
||||||
let playlist_string = format!("\
|
|
||||||
#EXTM3U\n\
|
|
||||||
#EXT-X-VERSION:7\n\
|
|
||||||
#EXT-X-TARGETDURATION:{}\n\
|
|
||||||
#EXT-X-MEDIA-SEQUENCE:{}\n\
|
|
||||||
#EXT-X-MAP:URI=\"{}init.mp4\"\n\
|
|
||||||
{}\
|
|
||||||
#EXT-X-ENDLIST\
|
|
||||||
", self.args.segment_time, self.curr_media_seq_idx, self.args.segment_prepend.clone(), segment_listing);
|
|
||||||
|
|
||||||
let mut f = fs::File::create("stream.m3u8").unwrap();
|
|
||||||
f.write_all(&playlist_string.as_bytes());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>, metadata: Arc<util::Metadata>, args: util::HLSArgs) -> Result<HLSHandler, Box<dyn Error>> {
|
pub fn spawn(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NALUPacket>, err_in: mpsc::Sender<Box<dyn Error + Send + Sync>>, metadata: Arc<util::Metadata>, args: util::HLSArgs) -> Result<HLSHandler, Box<dyn Error>> {
|
||||||
Ok(HLSHandler {muxer: mp4::new_muxer(v, a, metadata, err_in.clone())?, err_in: err_in, args: args, curr_segment_idx: 0, curr_segments: Vec::new(), curr_media_seq_idx: 0})
|
Ok(HLSHandler {muxer: mp4::new_muxer(v, a, metadata, err_in.clone())?, err_in: err_in, args: args, curr_segment_idx: 0, curr_segments: Vec::new()})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn delete_files(mut del_list: std::vec::Drain<String>) {
|
||||||
|
loop {
|
||||||
|
match del_list.next() {
|
||||||
|
Some(filename) => fs::remove_file(filename),
|
||||||
|
None => return
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,14 +1,8 @@
|
||||||
// cursed abomination, not well thought out, but works fine, rust also complicates inheritance
|
|
||||||
// every necessary MP4 atom to create readable files,
|
|
||||||
// lookup the corresponding item in the ISO doc or the quicktime spec (for everything not under
|
|
||||||
// moof), because I am not doing all of that here.
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
|
||||||
use crate::util;
|
use crate::util;
|
||||||
use crate::muxer::hls::mp4::samples;
|
|
||||||
|
|
||||||
pub trait MP4Atom {
|
pub trait MP4Atom {
|
||||||
fn marshall(&self) -> Vec<u8>;
|
fn marshall(&self) -> Vec<u8>;
|
||||||
|
@ -230,7 +224,7 @@ impl Default for TKHD {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
TKHD {
|
TKHD {
|
||||||
version: 0,
|
version: 0,
|
||||||
flags: 3,
|
flags: 1,
|
||||||
create_time: 0,
|
create_time: 0,
|
||||||
modify_time: 0,
|
modify_time: 0,
|
||||||
track_id: 0,
|
track_id: 0,
|
||||||
|
@ -440,7 +434,7 @@ impl Default for VMHD {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
VMHD {
|
VMHD {
|
||||||
version: 0,
|
version: 0,
|
||||||
flags: 1,
|
flags: 0,
|
||||||
mode: 0,
|
mode: 0,
|
||||||
opcolor: [0,0,0],
|
opcolor: [0,0,0],
|
||||||
}
|
}
|
||||||
|
@ -926,7 +920,6 @@ pub fn construct_moov(stsd_v: STSD, stsd_a: STSD, metadata: &Arc<util::Metadata>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// video will always be track 1, audio in trak 2
|
|
||||||
fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK {
|
fn new_trak(stsd: STSD, metadata: &Arc<util::Metadata>) -> TRAK {
|
||||||
let mut tkhd = TKHD {..Default::default() };
|
let mut tkhd = TKHD {..Default::default() };
|
||||||
match stsd.entry {
|
match stsd.entry {
|
||||||
|
@ -1021,238 +1014,6 @@ pub fn dump_init_tree(moov: MOOV) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MOOF {
|
|
||||||
mfhd: MFHD,
|
|
||||||
trafs: [TRAF; 2],
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for MOOF {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = self.mfhd.marshall();
|
|
||||||
content.append(&mut self.trafs[0].marshall());
|
|
||||||
content.append(&mut self.trafs[1].marshall());
|
|
||||||
make_box(content, *b"moof")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct MFHD {
|
|
||||||
version: u8,
|
|
||||||
flags: u32,
|
|
||||||
seq_num: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for MFHD {
|
|
||||||
fn default() -> Self {
|
|
||||||
MFHD {
|
|
||||||
version: 0,
|
|
||||||
flags: 0,
|
|
||||||
seq_num: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for MFHD {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = Vec::new();
|
|
||||||
content.push(self.version);
|
|
||||||
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
|
|
||||||
content.extend_from_slice(&self.seq_num.to_be_bytes());
|
|
||||||
make_box(content, *b"mfhd")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TRAF {
|
|
||||||
tfhd: TFHD,
|
|
||||||
tfdt: TFDT,
|
|
||||||
trun: TRUN,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for TRAF {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = self.tfhd.marshall();
|
|
||||||
content.append(&mut self.tfdt.marshall());
|
|
||||||
content.append(&mut self.trun.marshall());
|
|
||||||
make_box(content, *b"traf")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TFHD {
|
|
||||||
version: u8,
|
|
||||||
flags: u32,
|
|
||||||
track_id: u32,
|
|
||||||
default_duration: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for TFHD {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = Vec::new();
|
|
||||||
content.push(self.version);
|
|
||||||
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
|
|
||||||
content.extend_from_slice(&self.track_id.to_be_bytes());
|
|
||||||
content.extend_from_slice(&self.default_duration.to_be_bytes());
|
|
||||||
make_box(content, *b"tfhd")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TFHD {
|
|
||||||
fn default() -> Self {
|
|
||||||
TFHD {
|
|
||||||
version: 0,
|
|
||||||
flags: 0x00020008,
|
|
||||||
track_id: 0,
|
|
||||||
default_duration: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TFDT {
|
|
||||||
version: u8,
|
|
||||||
flags: u32,
|
|
||||||
start_time: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for TFDT {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = Vec::new();
|
|
||||||
content.push(self.version);
|
|
||||||
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
|
|
||||||
if self.version == 1 {
|
|
||||||
content.extend_from_slice(&self.start_time.to_be_bytes());
|
|
||||||
} else {
|
|
||||||
content.extend_from_slice(&(self.start_time as u32).to_be_bytes());
|
|
||||||
}
|
|
||||||
make_box(content, *b"tfdt")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TFDT {
|
|
||||||
fn default() -> Self {
|
|
||||||
TFDT {
|
|
||||||
version: 0,
|
|
||||||
flags: 0,
|
|
||||||
start_time: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TRUN {
|
|
||||||
version: u8,
|
|
||||||
flags: u32,
|
|
||||||
num_samples: u32,
|
|
||||||
offset: u32,
|
|
||||||
samples: Vec<SampleData>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TRUN {
|
|
||||||
fn default() -> Self {
|
|
||||||
TRUN {
|
|
||||||
version: 0,
|
|
||||||
flags: 0x00000601,
|
|
||||||
num_samples: 0,
|
|
||||||
offset: 0,
|
|
||||||
samples: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for TRUN {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = Vec::new();
|
|
||||||
content.push(self.version);
|
|
||||||
content.extend_from_slice(&self.flags.to_be_bytes()[1..]);
|
|
||||||
content.extend_from_slice(&self.num_samples.to_be_bytes());
|
|
||||||
content.extend_from_slice(&self.offset.to_be_bytes());
|
|
||||||
for sample in &self.samples {
|
|
||||||
content.append(&mut sample.marshall());
|
|
||||||
}
|
|
||||||
make_box(content, *b"trun")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SampleData {
|
|
||||||
size: u32,
|
|
||||||
flags: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for SampleData {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = Vec::new();
|
|
||||||
content.extend_from_slice(&self.size.to_be_bytes());
|
|
||||||
content.extend_from_slice(&self.flags.to_be_bytes());
|
|
||||||
return content;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MDAT {
|
|
||||||
pub v_samples: Vec<u8>,
|
|
||||||
pub a_samples: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MP4Atom for MDAT {
|
|
||||||
fn marshall(&self) -> Vec<u8> {
|
|
||||||
let mut content = self.v_samples.clone();
|
|
||||||
content.append(&mut self.a_samples.clone());
|
|
||||||
make_box(content, *b"mdat")
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_moof(v_samples: &Vec<samples::Sample>, default_dur_v: u32, a_samples: &Vec<samples::Sample>, default_dur_a: u32, mdat_v_offset: usize, mdat_a_offset: usize, moof_idx: usize) -> MOOF {
|
|
||||||
// hacky fix, since sticking to a standard format and defaults for segments, only trun is
|
|
||||||
// variable which has a predictable size regardless, moof offset + v and a trun sample table
|
|
||||||
// len and mdat header offset, will break using version 1 atoms for time,
|
|
||||||
// alternative is to somehow record idx for base offset (which breaks MP4atoms impl) to modify
|
|
||||||
// later, or calc full length properly, also complicated
|
|
||||||
let mdat_data_start = 152 + v_samples.len() * 8 + a_samples.len() * 8 + 8;
|
|
||||||
MOOF {
|
|
||||||
mfhd: MFHD { seq_num: moof_idx as u32, ..Default::default() },
|
|
||||||
trafs: [ TRAF {
|
|
||||||
tfhd: TFHD {
|
|
||||||
track_id: 1,
|
|
||||||
default_duration: default_dur_v,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
trun: new_trun(v_samples, mdat_v_offset + mdat_data_start),
|
|
||||||
tfdt: TFDT {
|
|
||||||
start_time: default_dur_v as u64 * (v_samples.len() * (moof_idx - 1)) as u64,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
}, TRAF{
|
|
||||||
tfhd: TFHD {
|
|
||||||
track_id: 2,
|
|
||||||
default_duration: default_dur_a,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
trun: new_trun(a_samples, mdat_a_offset + mdat_data_start),
|
|
||||||
tfdt: TFDT {
|
|
||||||
start_time: default_dur_a as u64 * (a_samples.len() * (moof_idx - 1)) as u64,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
} ],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_trun(samples: &Vec<samples::Sample>, data_offset: usize) -> TRUN {
|
|
||||||
TRUN {
|
|
||||||
num_samples: samples.len() as u32,
|
|
||||||
offset: data_offset as u32,
|
|
||||||
samples: make_sample_table(samples),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_sample_table(samples: &Vec<samples::Sample>) -> Vec<SampleData> {
|
|
||||||
let mut table = Vec::new();
|
|
||||||
for sample in samples {
|
|
||||||
table.push( SampleData {
|
|
||||||
size: sample.size,
|
|
||||||
flags: sample.flags,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
// takes content, applies MP4 atom encapsulation
|
|
||||||
fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
|
fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
|
||||||
let mut head = vec![0u8; 8];
|
let mut head = vec![0u8; 8];
|
||||||
head.splice(..4, ((content.len() + 8) as u32).to_be_bytes());
|
head.splice(..4, ((content.len() + 8) as u32).to_be_bytes());
|
||||||
|
@ -1260,5 +1021,3 @@ fn make_box(mut content: Vec<u8>, box_name: [u8; 4]) -> Vec<u8> {
|
||||||
head.append(&mut content);
|
head.append(&mut content);
|
||||||
return head;
|
return head;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
mod mp4muxer;
|
mod mp4muxer;
|
||||||
pub mod atoms;
|
mod atoms;
|
||||||
pub mod samples;
|
pub mod samples;
|
||||||
|
|
||||||
use std::sync::{mpsc, Arc, Mutex};
|
use std::sync::{mpsc, Arc, Mutex};
|
||||||
|
@ -8,7 +8,6 @@ use std::error::Error;
|
||||||
use crate::util;
|
use crate::util;
|
||||||
|
|
||||||
pub struct MP4Muxer {
|
pub struct MP4Muxer {
|
||||||
// option to allow being taken by threads
|
|
||||||
v: Option<mpsc::Receiver<util::NALUPacket>>,
|
v: Option<mpsc::Receiver<util::NALUPacket>>,
|
||||||
pub v_samples: Arc<samples::SampleQueue>,
|
pub v_samples: Arc<samples::SampleQueue>,
|
||||||
a: Option<mpsc::Receiver<util::NALUPacket>>,
|
a: Option<mpsc::Receiver<util::NALUPacket>>,
|
||||||
|
@ -23,22 +22,15 @@ pub fn new_muxer(v: mpsc::Receiver<util::NALUPacket>, a: mpsc::Receiver<util::NA
|
||||||
a: Some(a),
|
a: Some(a),
|
||||||
metadata: metadata.clone(),
|
metadata: metadata.clone(),
|
||||||
err_in: err_in,
|
err_in: err_in,
|
||||||
// Arc<Mutex<Vec>> for multithread write access
|
|
||||||
v_samples: Arc::new(samples::SampleQueue {
|
v_samples: Arc::new(samples::SampleQueue {
|
||||||
queue: Mutex::new(Vec::new()),
|
queue: Mutex::new(Vec::new()),
|
||||||
default_duration: 1000, // timescale as fps * 1000, each sample then always 1000
|
default_duration: 1000,
|
||||||
// regardless of fps
|
|
||||||
samples_per_sec: metadata.video.framerate as usize,
|
samples_per_sec: metadata.video.framerate as usize,
|
||||||
}),
|
}),
|
||||||
a_samples: Arc::new(samples::SampleQueue {
|
a_samples: Arc::new(samples::SampleQueue {
|
||||||
queue: Mutex::new(Vec::new()),
|
queue: Mutex::new(Vec::new()),
|
||||||
default_duration: metadata.audio.samplerate * 20 / 1000, // timescale set as
|
default_duration: metadata.audio.samplerate * 20 / 1000,
|
||||||
// samplerate, duration per
|
samples_per_sec: 50,
|
||||||
// sample is sample duration
|
|
||||||
// (20ms packet) * samples per
|
|
||||||
// time (samplerate)
|
|
||||||
samples_per_sec: 50, // opus default is 20ms packets (1000/20 = 50), would need to change this if
|
|
||||||
// changing that
|
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
muxer.gen_init()?;
|
muxer.gen_init()?;
|
||||||
|
|
|
@ -11,17 +11,12 @@ impl mp4::MP4Muxer {
|
||||||
pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> {
|
pub fn gen_init(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
let stsd_v = self.handle_v_cc()?;
|
let stsd_v = self.handle_v_cc()?;
|
||||||
let stsd_a = self.handle_a_cc()?;
|
let stsd_a = self.handle_a_cc()?;
|
||||||
// only need moov for codec declarations, ftyp is uneeded apparently
|
|
||||||
let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata);
|
let init_tree = mp4::atoms::construct_moov(stsd_v, stsd_a, &self.metadata);
|
||||||
mp4::atoms::dump_init_tree(init_tree)?; // write out moov in init.mp4 segment
|
mp4::atoms::dump_init_tree(init_tree)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
|
fn handle_v_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
|
||||||
// handles getting stsd per sample, parsing codec config mostly
|
|
||||||
// and peculiarities per codec
|
|
||||||
// AV1 has the first nalu as is in the samples
|
|
||||||
// also contains info for codec config
|
|
||||||
let v_cc = self.v.as_ref().expect("must be owned at init").recv()?;
|
let v_cc = self.v.as_ref().expect("must be owned at init").recv()?;
|
||||||
return match v_cc.packet_type {
|
return match v_cc.packet_type {
|
||||||
util::NALUPacketType::Video(util::VideoCodec::AV1) => {
|
util::NALUPacketType::Video(util::VideoCodec::AV1) => {
|
||||||
|
@ -33,10 +28,6 @@ impl mp4::MP4Muxer {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
|
fn handle_a_cc(&mut self) -> Result<mp4::atoms::STSD, Box<dyn Error>> {
|
||||||
// handles getting stsd per sample, parsing codec config mostly
|
|
||||||
// and peculiarities per codec
|
|
||||||
// Ogg (Opus) first packet has codec config info, not needed in samples
|
|
||||||
// Second packet has no discernable use for ISOBMF? Also not needed in samples
|
|
||||||
let a_cc = self.a.as_ref().expect("must be owned at init").recv()?;
|
let a_cc = self.a.as_ref().expect("must be owned at init").recv()?;
|
||||||
return match a_cc.packet_type {
|
return match a_cc.packet_type {
|
||||||
util::NALUPacketType::Audio(util::AudioCodec::OPUS) => {
|
util::NALUPacketType::Audio(util::AudioCodec::OPUS) => {
|
||||||
|
@ -57,32 +48,21 @@ impl mp4::MP4Muxer {
|
||||||
let mut i = 1;
|
let mut i = 1;
|
||||||
loop {
|
loop {
|
||||||
match v.recv() {
|
match v.recv() {
|
||||||
// hacky fix for video keyframe sample flags
|
|
||||||
// set at 30 frames per keyframe, would need some way to send the same value to
|
|
||||||
// the encoder as here or attempt to parse the actual OBU (even the header
|
|
||||||
// seems cursed, just use a lib instead)
|
|
||||||
// no clue how those flags are parsed, but they work (0x02 is keyframe, 0x0101
|
|
||||||
// is interframe)
|
|
||||||
Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});},
|
Ok(x) => {v_queue.push(x.packet_data, if i % 30 == 0 {0x02000000} else {0x01010000});},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// send EOF to main for proper shutdown
|
util::thread_freeze(err_in1, Box::new(util::MuxerError::EOF));
|
||||||
util::thread_freeze(err_in1, Box::new(util::MuxerError::VideoEOF));
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
i += 1;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
loop {
|
loop {
|
||||||
match a.recv() {
|
match a.recv() {
|
||||||
// audio should be all keyframes, should use the tfhd default var, but
|
Ok(x) => {a_queue.push(x.packet_data, 0x01010000);},
|
||||||
// more work to decouple video and audio impls
|
|
||||||
Ok(x) => {a_queue.push(x.packet_data, 0x02000000);},
|
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// send EOF for proper shutdown
|
util::thread_freeze(err_in2, Box::new(util::MuxerError::EOF));
|
||||||
util::thread_freeze(err_in2, Box::new(util::MuxerError::AudioEOF));
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,21 +70,8 @@ impl mp4::MP4Muxer {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_segment(&mut self, idx: usize, time: usize) -> Option<segments::Segment> {
|
pub fn get_segment(&mut self, idx: usize) -> Option<segments::Segment> {
|
||||||
// if sample queues are long enough to extract a segment, do so, otherwise return None
|
todo!();
|
||||||
// probably should not be an option, instead loop here until ready to allow flushing at
|
|
||||||
// exit
|
|
||||||
if self.v_samples.segment_ready(time) && self.a_samples.segment_ready(time) {
|
|
||||||
return Some(segments::Segment {
|
|
||||||
filename: idx.to_string() + ".m4s",
|
|
||||||
idx: idx,
|
|
||||||
segment_video: self.v_samples.get_segment_samples(time),
|
|
||||||
sample_duration_v: self.v_samples.default_duration,
|
|
||||||
segment_audio: self.a_samples.get_segment_samples(time),
|
|
||||||
sample_duration_a: self.a_samples.default_duration,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +89,7 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
|
||||||
dref_idx: 1,
|
dref_idx: 1,
|
||||||
width: metadata.video.width as u16,
|
width: metadata.video.width as u16,
|
||||||
height: metadata.video.height as u16,
|
height: metadata.video.height as u16,
|
||||||
resolution: (72u64 << 48) + (72u64 << 16), // 72 dpi, fixed point numbers (72.00 + 72.00)
|
resolution: (72u64 << 48) + (72u64 << 16),
|
||||||
sample_per_frame: 1,
|
sample_per_frame: 1,
|
||||||
encoder_name: String::from("libsvtav1"),
|
encoder_name: String::from("libsvtav1"),
|
||||||
pixel_depth: 24,
|
pixel_depth: 24,
|
||||||
|
@ -137,7 +104,6 @@ fn get_av1_stsd(mut sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::ato
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD {
|
fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms::STSD {
|
||||||
// see opus encapsulation in ISOBMF for details
|
|
||||||
let mut opus_binding = vec![0u8; 11];
|
let mut opus_binding = vec![0u8; 11];
|
||||||
opus_binding[1] = metadata.audio.channels;
|
opus_binding[1] = metadata.audio.channels;
|
||||||
let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap());
|
let pre_skip = u16::from_le_bytes(sample[10..12].try_into().unwrap());
|
||||||
|
@ -155,7 +121,7 @@ fn get_opus_stsd(sample: Vec<u8>, metadata: &Arc<util::Metadata>) -> mp4::atoms:
|
||||||
dref_idx: 1,
|
dref_idx: 1,
|
||||||
channels: metadata.audio.channels as u16,
|
channels: metadata.audio.channels as u16,
|
||||||
sample_size: 16,
|
sample_size: 16,
|
||||||
sample_rate: metadata.audio.samplerate << 16, // fixed point (u16 int + u16 decimals)
|
sample_rate: metadata.audio.samplerate,
|
||||||
codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding}
|
codec_config: mp4::atoms::CodecConfig { atom_name: *b"dOps", data: opus_binding}
|
||||||
};
|
};
|
||||||
return mp4::atoms::STSD {
|
return mp4::atoms::STSD {
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::muxer::hls::segments;
|
|
||||||
|
|
||||||
// forces a common duration, no composition time offset
|
|
||||||
// flags declared per sample as well as size
|
|
||||||
pub struct Sample {
|
pub struct Sample {
|
||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
pub size: u32,
|
pub size: u32,
|
||||||
|
@ -24,13 +20,4 @@ impl SampleQueue {
|
||||||
flags: flags,
|
flags: flags,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn segment_ready(&self, time: usize) -> bool {
|
|
||||||
self.queue.lock().unwrap().len() >= time * self.samples_per_sec
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_segment_samples(&self, time: usize) -> Vec<Sample> {
|
|
||||||
let samples_to_drain = time * self.samples_per_sec;
|
|
||||||
self.queue.lock().unwrap().drain(..samples_to_drain).collect()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,44 +1,15 @@
|
||||||
use std::time;
|
use std::time;
|
||||||
use std::fs::File;
|
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
use crate::muxer::hls::mp4::samples;
|
use crate::muxer::hls::mp4::samples;
|
||||||
use crate::muxer::hls::mp4::atoms;
|
|
||||||
use crate::muxer::hls::mp4::atoms::MP4Atom;
|
|
||||||
|
|
||||||
pub struct Segment {
|
pub struct Segment {
|
||||||
|
segment_idx: usize,
|
||||||
pub filename: String,
|
pub filename: String,
|
||||||
pub idx: usize,
|
segment_video: Vec<samples::Sample>,
|
||||||
pub segment_video: Vec<samples::Sample>,
|
segment_audio: Vec<samples::Sample>,
|
||||||
pub sample_duration_v: u32, // default interval per sample at designated timescale (same for
|
|
||||||
// audio instead below)
|
|
||||||
pub segment_audio: Vec<samples::Sample>,
|
|
||||||
pub sample_duration_a: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Segment {
|
impl Segment {
|
||||||
pub fn dump(&self) {
|
pub fn dump(&self) {
|
||||||
let mdat = self.new_mdat(); // sample full byte length needed for moof generation
|
todo!();
|
||||||
// increment idx (0-indexed) for moofs (1-indexed)
|
|
||||||
let moof = atoms::new_moof(&self.segment_video, self.sample_duration_v, &self.segment_audio, self.sample_duration_a, 0, mdat.v_samples.len(), self.idx + 1);
|
|
||||||
let mut segment_bytes = moof.marshall();
|
|
||||||
segment_bytes.append(&mut mdat.marshall());
|
|
||||||
|
|
||||||
let mut f = File::create(&self.filename).unwrap();
|
|
||||||
f.write_all(&segment_bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_mdat(&self) -> atoms::MDAT {
|
|
||||||
let (mut v_samples, mut a_samples) = (Vec::new(), Vec::new());
|
|
||||||
for sample in &self.segment_video {
|
|
||||||
v_samples.extend_from_slice(sample.data.as_slice());
|
|
||||||
}
|
|
||||||
for sample in &self.segment_audio {
|
|
||||||
a_samples.extend_from_slice(sample.data.as_slice());
|
|
||||||
}
|
|
||||||
atoms::MDAT {
|
|
||||||
v_samples: v_samples,
|
|
||||||
a_samples: a_samples,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
// general use structs, functions and errors go here
|
|
||||||
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
@ -200,8 +198,7 @@ impl fmt::Display for EncoderError {
|
||||||
pub enum MuxerError {
|
pub enum MuxerError {
|
||||||
InvalidCodec,
|
InvalidCodec,
|
||||||
CCParseError,
|
CCParseError,
|
||||||
VideoEOF,
|
EOF,
|
||||||
AudioEOF,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error for MuxerError {}
|
impl Error for MuxerError {}
|
||||||
|
@ -212,6 +209,7 @@ impl fmt::Debug for MuxerError {
|
||||||
match self {
|
match self {
|
||||||
MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"),
|
MuxerError::InvalidCodec => write!(f, "Codec not valid, unhandled"),
|
||||||
MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"),
|
MuxerError::CCParseError => write!(f, "Generic error while parsing codec config box"),
|
||||||
|
MuxerError::EOF => write!(f, "input EOF"),
|
||||||
_ => write!(f, "Error not described yet")
|
_ => write!(f, "Error not described yet")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -224,15 +222,11 @@ impl fmt::Display for MuxerError {
|
||||||
}
|
}
|
||||||
// funcs
|
// funcs
|
||||||
|
|
||||||
// parks a thread and sends an error over the param channel
|
|
||||||
// used to pause threads keeping channels open, so that main can receive the actual error and then
|
|
||||||
// exit, instead of a "RecvError"
|
|
||||||
pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) {
|
pub fn thread_freeze(err_sender: mpsc::Sender<Box<dyn Error + Send + Sync>>, err: Box<dyn Error + Send + Sync>) {
|
||||||
err_sender.send(err).unwrap();
|
err_sender.send(err).unwrap();
|
||||||
thread::park();
|
thread::park();
|
||||||
}
|
}
|
||||||
|
|
||||||
// should probably make this a macro instead, eh
|
|
||||||
pub fn blank_vec(n: usize) -> Vec<u8> {
|
pub fn blank_vec(n: usize) -> Vec<u8> {
|
||||||
return vec![0u8; n];
|
return vec![0u8; n];
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue