From 699666695e36f878f2de9317389c8d843bbdcf84 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Mon, 21 Aug 2023 20:25:54 +0500 Subject: [PATCH] Comments --- http/server.go | 12 +++++------ rtmp/amf/command.go | 6 ++++++ rtmp/amf/decode.go | 23 +++++++++++++++++---- rtmp/amf/encode.go | 2 ++ rtmp/chunk.go | 49 ++++++++++++++++++++++++++++++++++++--------- rtmp/chunk2.go | 18 +++++++++++------ rtmp/chunk_wrap.go | 20 ++++++++++++++++-- rtmp/connect.go | 14 +++++++++++++ rtmp/data_loop.go | 16 ++++++++------- rtmp/flv/writer.go | 29 ++++++++++++++++++--------- rtmp/handshake.go | 6 ++++-- rtmp/server.go | 10 ++++++--- 12 files changed, 155 insertions(+), 50 deletions(-) diff --git a/http/server.go b/http/server.go index 537f109..0bb826d 100644 --- a/http/server.go +++ b/http/server.go @@ -17,10 +17,10 @@ func NewServer(port string) (error) { } func server_setup(server *http.ServeMux) { - server.HandleFunc("/live/", serve_main_page) - server.HandleFunc("/list/", serve_live_playlist) - server.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static/")))) - server.HandleFunc("/vid/", serve_vid_segment) + server.HandleFunc("/live/", serve_main_page) // main page with the video element, separate for each streamkey + server.HandleFunc("/list/", serve_live_playlist) // uri returns the playlist file for the given stream key + server.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static/")))) // returns static files (eventually for the local hls player implementation) + server.HandleFunc("/vid/", serve_vid_segment) // serves the appropriate static video segment } func serve_vid_segment(w http.ResponseWriter, r *http.Request) { @@ -43,8 +43,8 @@ func serve_live_playlist(w http.ResponseWriter, r *http.Request) { } func serve_main_page(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Origin", "*") // bad jank for hls.js loading from cdn. cross-site js bad. page := template.Must(template.ParseFiles("static/index.html")) stream_user := strings.TrimPrefix(r.URL.Path, "/live/") - page.Execute(w, stream_user) + page.Execute(w, stream_user) // literally only to define the uri for playlist loading, should be a script but eh } diff --git a/rtmp/amf/command.go b/rtmp/amf/command.go index 925b1ab..46abdb6 100644 --- a/rtmp/amf/command.go +++ b/rtmp/amf/command.go @@ -4,6 +4,9 @@ import ( "errors" ) +// most are hard-coded checks to make sure the received AMF0 command is what was received + + func (amf_obj_root AMFObj) ProcessConnect() (err error) { err = errors.New("Bad AMF connect command") if !check_object(amf_obj_root, 0, "connect") { @@ -74,6 +77,8 @@ func (amf_obj_root AMFObj) ProcessPublish(trans_id *float64, stream_key *string) return } +// Encodes do the reverse, pacakge some data into the corresponding AMF0 message to send + func EncodeConnectResponse() ([]byte, error) { amf_root_obj := make(AMFObj) amf_root_obj[0] = "_result" @@ -119,6 +124,7 @@ func EncodePublishResponse(trans_id float64) ([]byte, error) { return Encode(amf_root_obj) } +// helper function, checks if the given key exists for the object and checks if the matching value is the same as the target func check_object(amf_obj AMFObj, key interface{}, target interface{}) (bool) { if val, ok := amf_obj[key]; ok && val == target{ return true diff --git a/rtmp/amf/decode.go b/rtmp/amf/decode.go index 7a524a0..7929598 100644 --- a/rtmp/amf/decode.go +++ b/rtmp/amf/decode.go @@ -6,13 +6,15 @@ import ( "math" ) -type AMFObj map[interface{}]interface{} +type AMFObj map[interface{}]interface{} // interface mostly so root obj can be included as an index of ints +// always a msg type of 20 for AMF0 func DecodeAMF(data *[]byte) (AMFObj, error) { var byte_idx uint32 - var root_obj_idx int + var root_obj_idx int // top level object can be referred to as int amf_root_obj := make(AMFObj) for { + // read the next thing and store it under the given root index top_level_obj, err := read_next(data, &byte_idx) if err != nil { return amf_root_obj, err @@ -20,13 +22,14 @@ func DecodeAMF(data *[]byte) (AMFObj, error) { amf_root_obj[root_obj_idx] = top_level_obj root_obj_idx += 1 - if byte_idx == uint32(len(*data)) { + if byte_idx == uint32(len(*data)) { // check if full message read break } } return amf_root_obj, nil } +// generic wrapper to read n bytes, what I tried to avoid with ReadChunk in rtmp func read_bytes(data *[]byte, byte_idx *uint32, n uint32) ([]byte, error) { if int(*byte_idx + n) > len(*data) { return make([]byte, 0), errors.New("Read goes past end") @@ -36,6 +39,8 @@ func read_bytes(data *[]byte, byte_idx *uint32, n uint32) ([]byte, error) { return read_slice, nil } +// assuming the next thing is a number, read it +// format is 8 bytes encoded big-endian as float64 func read_number(data *[]byte, byte_idx *uint32) (float64, error) { float_bytes, err := read_bytes(data, byte_idx, 8) if err != nil { @@ -44,6 +49,8 @@ func read_number(data *[]byte, byte_idx *uint32) (float64, error) { return math.Float64frombits(binary.BigEndian.Uint64(float_bytes)), nil } +// assuming next is boolean, read +// format is 1 byte, 0 for false 1 for true func read_bool(data *[]byte, byte_idx *uint32) (bool, error) { bool_byte, err := read_bytes(data, byte_idx, 1) if err != nil { @@ -55,6 +62,8 @@ func read_bool(data *[]byte, byte_idx *uint32) (bool, error) { return bool_byte[0] != 0, nil } +// assuming next is string, read +// format is 2 bytes for string len n, next n bytes are ascii chars func read_string(data *[]byte, byte_idx *uint32) (string, error) { string_len, err := read_bytes(data, byte_idx, 2) if err != nil { @@ -67,6 +76,9 @@ func read_string(data *[]byte, byte_idx *uint32) (string, error) { return string(string_bytes), err } +// assuming next is an object, read +// format is always a string for a key, then a marker to determine next data type then data +// ends with an empty string for a key followed by a 9 func read_object(data *[]byte, byte_idx *uint32) (AMFObj, error) { root_obj := make(AMFObj) for { @@ -91,6 +103,9 @@ func read_object(data *[]byte, byte_idx *uint32) (AMFObj, error) { } +// read the next thing +// read the next byte to determine the data type, then call one of the above to read +// defined as AMF0 spec func read_next(data *[]byte, byte_idx *uint32) (interface{}, error) { data_type, err := read_bytes(data, byte_idx, 1) var next_obj interface{} @@ -106,7 +121,7 @@ func read_next(data *[]byte, byte_idx *uint32) (interface{}, error) { next_obj, err = read_string(data, byte_idx) case 3: next_obj, err = read_object(data, byte_idx) - case 5: + case 5: // null marker, no extra empty bytes, just skip to next object return nil, nil default: return nil, errors.New("Unhandled data type") diff --git a/rtmp/amf/encode.go b/rtmp/amf/encode.go index 0e9c7a4..dd696c8 100644 --- a/rtmp/amf/encode.go +++ b/rtmp/amf/encode.go @@ -6,6 +6,8 @@ import ( "errors" ) +// reverse of the decode in amf/decode,go +// see that file for what most of this means func Encode(amf_root_obj AMFObj) ([]byte, error) { tmp_buffer := make([]byte, 1024) bytes_encoded := 0 diff --git a/rtmp/chunk.go b/rtmp/chunk.go index 4b72a75..430aaa3 100644 --- a/rtmp/chunk.go +++ b/rtmp/chunk.go @@ -6,6 +6,8 @@ import ( "encoding/binary" ) +// data intake object, only needed since most of the work is +// just keeping track of the previous values for higher chunk formats to reuse type ChunkStream struct { timestamp uint32 last_msg_strm_id uint32 @@ -14,6 +16,8 @@ type ChunkStream struct { timedelta uint32 } +// actual message object, metadata from the chunk header +// data either spans the chunk or split across multiple with format 3 type Message struct { data []byte curr_bytes_read uint32 @@ -22,6 +26,9 @@ type Message struct { msg_len uint32 } +// just here because I got tired of constantly assigning byte buffers and didn't want to +// abstract it, plus helped figuring out how to use pointers and memory allocation on +// restricted memory type ChunkBuffers struct { time []byte msg_len []byte @@ -31,23 +38,25 @@ type ChunkBuffers struct { csid_true []byte } +// reads the initial variable size header that defines the chunk's format and chunkstream id +// 5.3.1.1 func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil { return } - format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) - switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { - case 0: + format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) // get first 2 bits for format 0-3 + switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { // last 6 bits 0-63 + case 0: // csid 0 is invalid, means true csid is the next byte, + 64 for the 6 bits prior if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil { return } csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64 - case 1: + case 1: // csid 1 is invalid, means true csid is in the next 2 bytes, reverse order (little endian) and the 64, reconstruct if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil { return } csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64 - default: + default: // by default true csid was read csid = uint32(chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f) } return @@ -97,9 +106,13 @@ func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, e return } +// msg header format 0, all data, typically for the start of a new chunkstream or reset, or all data changes +// 5.3.1.2.1 func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error + // first bit of data is time in the first 3 bytes, if max value, defined as extended time + // true time value is at end of header (technically after but its still effectively in the header) chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr) if err != nil { return err @@ -110,7 +123,7 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return err } msg_ptr.data = make([]byte, msg_ptr.msg_len) - chnk_stream_ptr.last_msg_len = msg_ptr.msg_len + chnk_stream_ptr.last_msg_len = msg_ptr.msg_len // assign data for different formats msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr) if err != nil { @@ -134,6 +147,10 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } +// format 1, typically for the 2nd message in a previously init chunkstream with most other data changed +// no longer time but timedelta. timestamp for the message is computed from the stored timestamp when the stream was opened +// otherwise the same except the msg_stream_id is reused (unchanged) +// 5.3.1.2.2 func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error @@ -167,6 +184,8 @@ func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } +// format 2, typically for when format 1 can be used but even the message len and type are identical +// 5.3.1.2.3 func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error @@ -193,6 +212,8 @@ func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } +// read the actual chunkdata given the space for the message that has been constructed using the chunk headers and +// configured peer chunk size func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) { bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read var buffer_end uint32 @@ -210,6 +231,8 @@ func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, } +// wrapper function to handle all formats and chunkstreams returns a pointer to a Message (can be nil if not completed) +// n_read was there to handle ack win, but doesn't really seem to matter at all func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){ conn.SetDeadline(time.Now().Add(10 * time.Second)) @@ -230,9 +253,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs switch format { case 0: + // format 0, assume new chunkstream which will override and reset any previous chunkstream regardless chnkstream_ptr = new(ChunkStream) - open_chnkstrms[csid] = chnkstream_ptr - msg_ptr = new(Message) + open_chnkstrms[csid] = chnkstream_ptr // assign the newly made chunkstream to the map of those currently open + msg_ptr = new(Message) // cannot be a continued message since the stream just started if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err @@ -242,6 +266,8 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs *n_read += 4 } case 1: + // format 1, chunkstream with this csid MUST have been opened before, but message is just starting since + // message metadata is being declared chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) @@ -253,6 +279,7 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs *n_read += 4 } case 2: + // format 2, same as 1 chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) @@ -264,6 +291,8 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs *n_read += 4 } case 3: + // format 3 has no header, only happens when either a previous message over the csid was too large for the chunksize + // or a new message with the same exact metadata including timedelta is being sent. chnkstream_ptr = open_chnkstrms[csid] msg_prev, ok := open_msgs[chnkstream_ptr.last_msg_strm_id] if !ok { @@ -287,10 +316,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs conn.SetDeadline(time.Time{}) - if msg_ptr.curr_bytes_read < msg_ptr.msg_len { + if msg_ptr.curr_bytes_read < msg_ptr.msg_len { // if the full message was not read, save it as an open message for the next chunk open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr return nil, nil - } else { + } else { // else a full message was read, remove it from the list of open messages if it was there at all and return it delete(open_msgs, chnkstream_ptr.last_msg_strm_id) return msg_ptr, nil } diff --git a/rtmp/chunk2.go b/rtmp/chunk2.go index 513b9fd..1a27fb2 100644 --- a/rtmp/chunk2.go +++ b/rtmp/chunk2.go @@ -6,6 +6,7 @@ import ( "time" ) +// counterpart to the read_basic_header from chunk.go, basically the reverse of that func write_basic_header(chunk_buffer *[]byte, format uint8, csid uint32) { if csid < 64 { (*chunk_buffer)[0] = (format << 6) + uint8(csid) @@ -18,25 +19,30 @@ func write_basic_header(chunk_buffer *[]byte, format uint8, csid uint32) { } } +// only consider writing format 0 for new, or format 3 for cut off. As the server only has to send +// protocol messages, no real point optimizing for a demo func write_message_header_0(chunk_buffer *[]byte, msg_strmid uint32, msg_ptr *Message, basic_header_size uint32) { - msg_len_inter := make([]byte, 4) + msg_len_inter := make([]byte, 4) // uint32 to uint24 hack binary.BigEndian.PutUint32(msg_len_inter, msg_ptr.msg_len) - copy((*chunk_buffer)[basic_header_size + 3:basic_header_size + 6], msg_len_inter[1:]) + copy((*chunk_buffer)[basic_header_size + 3:basic_header_size + 6], msg_len_inter[1:]) // skip timestamp since it will always be 0 for protocol and control msgs (*chunk_buffer)[basic_header_size + 6] = msg_ptr.msg_type binary.LittleEndian.PutUint32((*chunk_buffer)[basic_header_size + 7:basic_header_size + 11], msg_strmid) } +// write the actual data func write_chunk_data(chunk_buffer *[]byte, data []byte, bytes_buffered *uint32, chunk_data_size uint32, header_size uint32) { copy((*chunk_buffer)[header_size:header_size+chunk_data_size], data[*bytes_buffered:*bytes_buffered + chunk_data_size]) *bytes_buffered += chunk_data_size } +// chunk writing wrapper func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message, chunk_size uint32) (error) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - var i uint32 = 0 - for i < msg_ptr.msg_len { + var i uint32 = 0 // track number of bytes written + for i < msg_ptr.msg_len { // until message is fully sent var chunk_data_size uint32 + // check if remaining message needs to be chunked or not if msg_ptr.msg_len - i > chunk_size { chunk_data_size = chunk_size } else { @@ -53,7 +59,7 @@ func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message, var chunk_buffer []byte var header_size uint32 - if i == 0 { + if i == 0 { // if chunkstream is to be opened/reset (format 0 = message header) chunk_buffer = make([]byte, basic_header_size + 11 + chunk_data_size) write_basic_header(&chunk_buffer, 0, csid) write_message_header_0(&chunk_buffer, msg_strmid, msg_ptr, basic_header_size) @@ -64,7 +70,7 @@ func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message, header_size = basic_header_size } write_chunk_data(&chunk_buffer, msg_ptr.data, &i, chunk_data_size, header_size) - if _, err := conn.Write(chunk_buffer); err != nil { + if _, err := conn.Write(chunk_buffer); err != nil { // expunge buffer over network connection return err } } diff --git a/rtmp/chunk_wrap.go b/rtmp/chunk_wrap.go index 71130c7..87ec728 100644 --- a/rtmp/chunk_wrap.go +++ b/rtmp/chunk_wrap.go @@ -8,6 +8,11 @@ import ( "errors" ) +// meta wrapper for each connection, consolidates a bunch of info +// for each individual connection together to make calling reads divorced from +// chunking +// params -> see server.go +// open_chnkstrms + open_msgs + chunk_bufs -> see chunk.go type ChunkWrapper struct { conn net.Conn params *ProtocolParams @@ -16,13 +21,15 @@ type ChunkWrapper struct { chunk_buffs *ChunkBuffers } +// inits the above struct object for a given connection, most values +// can be init as the data type default func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) { chnk_wrp_ptr = new(ChunkWrapper) chnk_wrp_ptr.conn = conn chnk_wrp_ptr.params = new(ProtocolParams) - chnk_wrp_ptr.params.chunk_size = 256 - chnk_wrp_ptr.params.peer_chunk_size = 4096 + chnk_wrp_ptr.params.chunk_size = 256 // since only small messages sent, no real need to be large + chnk_wrp_ptr.params.peer_chunk_size = 4096 // can be arbitrary, just set here to OBS default chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream) chnk_wrp_ptr.open_msgs = make(map[uint32]*Message) buffers := ChunkBuffers{ @@ -37,6 +44,7 @@ func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) { return } +// basic wrapper for ReadChunk from chunk.go. just forwards the relevant data and pointers. func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) { full_msg_ptr, err := ReadChunk( chnk_wrp_ptr.conn, @@ -52,6 +60,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) { return full_msg_ptr, nil } +// basic wrapper for WriteChunk from chunk2.go, see above func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg_ptr *Message) (error) { err := WriteChunk( chnk_wrp_ptr.conn, @@ -66,6 +75,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg return nil } +// see section 5.4.1 of RTMP spec doc func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 { @@ -75,6 +85,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { return nil } +// see 7.2.1.1 func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || connect_cmd_msg.msg_type != 20 { @@ -91,6 +102,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { return err } +// 5.4.4 func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 5 @@ -103,6 +115,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) { return nil } +// 5.4.5 func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 6 @@ -116,6 +129,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) { return nil } +// 5.4.1 func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 1 @@ -128,6 +142,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) { return nil } +// you get the drill func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 20 @@ -207,6 +222,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) { return nil } +// first actual data packet, write to output pipe func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) { metadata, err := chnk_wrp_ptr.ReadChunk() if err != nil || metadata.msg_type != 18 { diff --git a/rtmp/connect.go b/rtmp/connect.go index 834efdf..1cc5cbc 100644 --- a/rtmp/connect.go +++ b/rtmp/connect.go @@ -4,6 +4,8 @@ import ( "os" ) +// series of messages sent from the client and responses +// window ack and peer bw seem to be completely useless to obs func NegotiateConnect(chnk_wrp_ptr *ChunkWrapper) (bool) { if err := chnk_wrp_ptr.ReadPeerChunkSize(); err != nil { return false @@ -26,7 +28,13 @@ func NegotiateConnect(chnk_wrp_ptr *ChunkWrapper) (bool) { return true } +// next sequence func CreateStream(chnk_wrp_ptr *ChunkWrapper) (bool) { + // first 2 messages are "releaseStream" and "FCPublish" + // no idea why the second exists since the next command is Publish + // the first has no docs since flash server docs are gone + // assuming it just signals a delete on any streams the current connection had + // it can be ignored for a pure stream intake server if _, err := chnk_wrp_ptr.ReadChunk(); err != nil { return false } @@ -42,6 +50,8 @@ func CreateStream(chnk_wrp_ptr *ChunkWrapper) (bool) { return true } +// Publish command handling, this is where stream keys are exchanged which means +// auth checking happens here func PublishAsKey(chnk_wrp_ptr *ChunkWrapper) (bool) { if err := chnk_wrp_ptr.ReadPublish(); err != nil { return false @@ -55,6 +65,10 @@ func PublishAsKey(chnk_wrp_ptr *ChunkWrapper) (bool) { return true } +// auth checking. again db would be the goal, for now just using a dir system +// pre generate a directory under ~/live/___/. Set the stream key to the folder name +// and this just checks if the folder exists (like a known public key) +// not secure at all but works for a demo func check_stream_key(stream_key string) (bool) { base_dir, _ := os.UserHomeDir() if fileinfo, err := os.Stat(base_dir + "/live/" + stream_key); err == nil && fileinfo.IsDir() { diff --git a/rtmp/data_loop.go b/rtmp/data_loop.go index c911158..18c822e 100644 --- a/rtmp/data_loop.go +++ b/rtmp/data_loop.go @@ -8,12 +8,13 @@ import ( ) func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { - StreamCleanup(chnk_wrp_ptr.params.stream_key, 0) - file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) + StreamCleanup(chnk_wrp_ptr.params.stream_key, 0) // remove any unwanted media files still left over + file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) // create a file writer (techincally a pipe to ffmpeg) defer file_writer.Close() if err != nil { return } + // first data message must always be metadata if err = chnk_wrp_ptr.ReadMetadata(file_writer); err != nil { return } @@ -21,13 +22,13 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { msg_ptr, err := chnk_wrp_ptr.ReadChunk() if err != nil { return - } else if msg_ptr == nil { - continue + } else if msg_ptr == nil { // if message was not fully read (mostly first video chunk) + continue // skip until full message } switch msg_ptr.msg_type { case 20: - return - case 8, 9: + return // should try to process and check specifically for unpublish, works about the same either way + case 8, 9: // audio or video data if err = file_writer.WriteMediaTag(&(msg_ptr.data), msg_ptr.timestamp, msg_ptr.msg_type); err != nil { return } @@ -35,9 +36,10 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { } } +// find all files in given directory and delete, simple as func StreamCleanup(stream_key string, delay time.Duration) { time.Sleep(delay * time.Second) - base_dir, _ := os.UserHomeDir() + base_dir, _ := os.UserHomeDir() // why would this ever need error handling? if it throws a problem something went very wrong somewhere anyway stream_dir := base_dir + "/live/" + stream_key leftover_files, _ := filepath.Glob(stream_dir + "/*") diff --git a/rtmp/flv/writer.go b/rtmp/flv/writer.go index a2703c6..3b5f60a 100644 --- a/rtmp/flv/writer.go +++ b/rtmp/flv/writer.go @@ -8,12 +8,14 @@ import ( ) type FLVWriter struct { - w io.WriteCloser + w io.WriteCloser // abstraction as a wrapper to the actual transcoder command } func NewFLVWriter(stream_key string) (*FLVWriter, error) { base_dir, _ := os.UserHomeDir() writer := new(FLVWriter) + // spawn ffmpeg as a transcoder + hls segmenter + // most are generic commands, added a prefix to each segment url to make serving it over http easier transcoder := exec.Command( "ffmpeg", "-probesize", "500", @@ -28,14 +30,14 @@ func NewFLVWriter(stream_key string) (*FLVWriter, error) { "-hls_flags", "+program_date_time", "stream.m3u8", ) - transcoder.Dir = base_dir + "/live/" + stream_key + "/" - flvpipe, err := transcoder.StdinPipe() - transcoder.Start() + transcoder.Dir = base_dir + "/live/" + stream_key + "/" // shift to the appropriate dir for the given stream key + flvpipe, err := transcoder.StdinPipe() // give control over the ffmpeg input as a stdin pipe (will push in flv packets as they come) + transcoder.Start() // spawn ffmpeg if err != nil { return nil, err } writer.w = flvpipe - if err = writer.write_flv_header(); err != nil { + if err = writer.write_flv_header(); err != nil { // init its header now, might as well return nil, err } return writer, nil @@ -45,11 +47,13 @@ func (writer *FLVWriter) Close() (error) { return writer.w.Close() } +// first data tag, AMF0 message but FLV spec requires the encoded data +// just requires skipping the RTMP specific bits, then just writing as is func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) { uint24_buf := make([]byte, 4) tag_header := make([]byte, 11) - tag_header[0] = 18 - binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:]))) + tag_header[0] = 18 // RTMP msg_type 18 + binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:]))) // "@setDataFrame", null, actual data copy(tag_header[1:4], uint24_buf[1:]) if _, err = writer.w.Write(tag_header); err != nil { @@ -66,6 +70,7 @@ func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) { return } +// write the actual audio + video data, same as metadata except no skipping func (writer *FLVWriter) WriteMediaTag(data *[]byte, timestamp uint32, media_type uint8) (err error) { uint24_buf := make([]byte, 4) tag_header := make([]byte, 11) @@ -91,13 +96,17 @@ func (writer *FLVWriter) WriteMediaTag(data *[]byte, timestamp uint32, media_typ return } +// FLV file header always 9 bytes + 4 bytes for first ghost data tag func (writer *FLVWriter) write_flv_header() (err error) { header := make([]byte, 13) copy(header[:3], "FLV") - header[3] = 1 - header[4] = 5 - header[8] = 9 + header[3] = 1 // FLV version, only 1 is ever used + header[4] = 5 // 1 and 4 -> audio OR video | 5 -> audio + video + header[8] = 9 // len of header + // flv spec requires each tag + tag_len. defines a ghost tag at start with len 0 + // likely for player seeking? + _, err = writer.w.Write(header) return } diff --git a/rtmp/handshake.go b/rtmp/handshake.go index fcc60d5..8be432a 100644 --- a/rtmp/handshake.go +++ b/rtmp/handshake.go @@ -6,6 +6,7 @@ import ( "encoding/binary" ) +// see adobe specs for RTMP func DoHandshake(conn net.Conn) (hs_success bool) { C0C1C2 := make([]byte, 1536*2 + 1) S0S1S2 := make([]byte, 1536*2 + 1) @@ -13,6 +14,7 @@ func DoHandshake(conn net.Conn) (hs_success bool) { S0S1S2[0] = 3 binary.BigEndian.PutUint32(S0S1S2[1:1+4], uint32(time.Now().UnixMilli())) + // force handshake to finish in under 15 seconds (aribtrary) or throw an error conn.SetDeadline(time.Now().Add(15 * time.Second)) if _, err := conn.Read(C0C1C2); err != nil || C0C1C2[0] != 3 { @@ -21,13 +23,13 @@ func DoHandshake(conn net.Conn) (hs_success bool) { copy(C0C1C2[1:1536], S0S1S2[1+1536:]) binary.BigEndian.PutUint32(S0S1S2[1+1536+4:1+1536+8], uint32(time.Now().UnixMilli())) - if _, err := conn.Write(S0S1S2); err != nil { + if _, err := conn.Write(S0S1S2); err != nil { // specs say only send S0S1 and wait for C2 before sending S2, obs doesn't care apparently return } if _, err := conn.Read(C0C1C2[1+1536:]); err != nil { return } hs_success = true - conn.SetDeadline(time.Time{}) + conn.SetDeadline(time.Time{}) // remove deadline for next function return } diff --git a/rtmp/server.go b/rtmp/server.go index 4ff6216..3bf1c74 100644 --- a/rtmp/server.go +++ b/rtmp/server.go @@ -4,6 +4,7 @@ import ( "net" ) +// meta-struct, holding together params for each streaing peer type ProtocolParams struct { peer_chunk_size uint32 chunk_size uint32 @@ -23,6 +24,9 @@ func NewServer(port string) (error) { } func start(l net.Listener) { + // bool to block connections while a stream is currently active + // should have it on a per user basis instead, but adding a db + // system would make this even more bloated stream_live := false for { conn, err := l.Accept() @@ -41,7 +45,7 @@ func handle_conn(conn net.Conn, stream_live *bool) { defer conn.Close() defer func(a *bool) { *a = false - }(stream_live) + }(stream_live) // flip the stream state back when done if !DoHandshake(conn) { return } @@ -55,6 +59,6 @@ func handle_conn(conn net.Conn, stream_live *bool) { if !PublishAsKey(chunk_wrapper) { return } - HandleDataLoop(chunk_wrapper) - go StreamCleanup(chunk_wrapper.params.stream_key, 60) + HandleDataLoop(chunk_wrapper) // no error handle since the connection ends either way + go StreamCleanup(chunk_wrapper.params.stream_key, 60) // remove any media files left 1 min after stream ends }