This commit is contained in:
Muaz Ahmad 2023-08-21 20:25:54 +05:00
parent 21fc4a5816
commit 699666695e
12 changed files with 155 additions and 50 deletions

View file

@ -17,10 +17,10 @@ func NewServer(port string) (error) {
} }
func server_setup(server *http.ServeMux) { func server_setup(server *http.ServeMux) {
server.HandleFunc("/live/", serve_main_page) server.HandleFunc("/live/", serve_main_page) // main page with the video element, separate for each streamkey
server.HandleFunc("/list/", serve_live_playlist) server.HandleFunc("/list/", serve_live_playlist) // uri returns the playlist file for the given stream key
server.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static/")))) server.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static/")))) // returns static files (eventually for the local hls player implementation)
server.HandleFunc("/vid/", serve_vid_segment) server.HandleFunc("/vid/", serve_vid_segment) // serves the appropriate static video segment
} }
func serve_vid_segment(w http.ResponseWriter, r *http.Request) { func serve_vid_segment(w http.ResponseWriter, r *http.Request) {
@ -43,8 +43,8 @@ func serve_live_playlist(w http.ResponseWriter, r *http.Request) {
} }
func serve_main_page(w http.ResponseWriter, r *http.Request) { func serve_main_page(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*") // bad jank for hls.js loading from cdn. cross-site js bad.
page := template.Must(template.ParseFiles("static/index.html")) page := template.Must(template.ParseFiles("static/index.html"))
stream_user := strings.TrimPrefix(r.URL.Path, "/live/") stream_user := strings.TrimPrefix(r.URL.Path, "/live/")
page.Execute(w, stream_user) page.Execute(w, stream_user) // literally only to define the uri for playlist loading, should be a script but eh
} }

View file

@ -4,6 +4,9 @@ import (
"errors" "errors"
) )
// most are hard-coded checks to make sure the received AMF0 command is what was received
func (amf_obj_root AMFObj) ProcessConnect() (err error) { func (amf_obj_root AMFObj) ProcessConnect() (err error) {
err = errors.New("Bad AMF connect command") err = errors.New("Bad AMF connect command")
if !check_object(amf_obj_root, 0, "connect") { if !check_object(amf_obj_root, 0, "connect") {
@ -74,6 +77,8 @@ func (amf_obj_root AMFObj) ProcessPublish(trans_id *float64, stream_key *string)
return return
} }
// Encodes do the reverse, pacakge some data into the corresponding AMF0 message to send
func EncodeConnectResponse() ([]byte, error) { func EncodeConnectResponse() ([]byte, error) {
amf_root_obj := make(AMFObj) amf_root_obj := make(AMFObj)
amf_root_obj[0] = "_result" amf_root_obj[0] = "_result"
@ -119,6 +124,7 @@ func EncodePublishResponse(trans_id float64) ([]byte, error) {
return Encode(amf_root_obj) return Encode(amf_root_obj)
} }
// helper function, checks if the given key exists for the object and checks if the matching value is the same as the target
func check_object(amf_obj AMFObj, key interface{}, target interface{}) (bool) { func check_object(amf_obj AMFObj, key interface{}, target interface{}) (bool) {
if val, ok := amf_obj[key]; ok && val == target{ if val, ok := amf_obj[key]; ok && val == target{
return true return true

View file

@ -6,13 +6,15 @@ import (
"math" "math"
) )
type AMFObj map[interface{}]interface{} type AMFObj map[interface{}]interface{} // interface mostly so root obj can be included as an index of ints
// always a msg type of 20 for AMF0
func DecodeAMF(data *[]byte) (AMFObj, error) { func DecodeAMF(data *[]byte) (AMFObj, error) {
var byte_idx uint32 var byte_idx uint32
var root_obj_idx int var root_obj_idx int // top level object can be referred to as int
amf_root_obj := make(AMFObj) amf_root_obj := make(AMFObj)
for { for {
// read the next thing and store it under the given root index
top_level_obj, err := read_next(data, &byte_idx) top_level_obj, err := read_next(data, &byte_idx)
if err != nil { if err != nil {
return amf_root_obj, err return amf_root_obj, err
@ -20,13 +22,14 @@ func DecodeAMF(data *[]byte) (AMFObj, error) {
amf_root_obj[root_obj_idx] = top_level_obj amf_root_obj[root_obj_idx] = top_level_obj
root_obj_idx += 1 root_obj_idx += 1
if byte_idx == uint32(len(*data)) { if byte_idx == uint32(len(*data)) { // check if full message read
break break
} }
} }
return amf_root_obj, nil return amf_root_obj, nil
} }
// generic wrapper to read n bytes, what I tried to avoid with ReadChunk in rtmp
func read_bytes(data *[]byte, byte_idx *uint32, n uint32) ([]byte, error) { func read_bytes(data *[]byte, byte_idx *uint32, n uint32) ([]byte, error) {
if int(*byte_idx + n) > len(*data) { if int(*byte_idx + n) > len(*data) {
return make([]byte, 0), errors.New("Read goes past end") return make([]byte, 0), errors.New("Read goes past end")
@ -36,6 +39,8 @@ func read_bytes(data *[]byte, byte_idx *uint32, n uint32) ([]byte, error) {
return read_slice, nil return read_slice, nil
} }
// assuming the next thing is a number, read it
// format is 8 bytes encoded big-endian as float64
func read_number(data *[]byte, byte_idx *uint32) (float64, error) { func read_number(data *[]byte, byte_idx *uint32) (float64, error) {
float_bytes, err := read_bytes(data, byte_idx, 8) float_bytes, err := read_bytes(data, byte_idx, 8)
if err != nil { if err != nil {
@ -44,6 +49,8 @@ func read_number(data *[]byte, byte_idx *uint32) (float64, error) {
return math.Float64frombits(binary.BigEndian.Uint64(float_bytes)), nil return math.Float64frombits(binary.BigEndian.Uint64(float_bytes)), nil
} }
// assuming next is boolean, read
// format is 1 byte, 0 for false 1 for true
func read_bool(data *[]byte, byte_idx *uint32) (bool, error) { func read_bool(data *[]byte, byte_idx *uint32) (bool, error) {
bool_byte, err := read_bytes(data, byte_idx, 1) bool_byte, err := read_bytes(data, byte_idx, 1)
if err != nil { if err != nil {
@ -55,6 +62,8 @@ func read_bool(data *[]byte, byte_idx *uint32) (bool, error) {
return bool_byte[0] != 0, nil return bool_byte[0] != 0, nil
} }
// assuming next is string, read
// format is 2 bytes for string len n, next n bytes are ascii chars
func read_string(data *[]byte, byte_idx *uint32) (string, error) { func read_string(data *[]byte, byte_idx *uint32) (string, error) {
string_len, err := read_bytes(data, byte_idx, 2) string_len, err := read_bytes(data, byte_idx, 2)
if err != nil { if err != nil {
@ -67,6 +76,9 @@ func read_string(data *[]byte, byte_idx *uint32) (string, error) {
return string(string_bytes), err return string(string_bytes), err
} }
// assuming next is an object, read
// format is always a string for a key, then a marker to determine next data type then data
// ends with an empty string for a key followed by a 9
func read_object(data *[]byte, byte_idx *uint32) (AMFObj, error) { func read_object(data *[]byte, byte_idx *uint32) (AMFObj, error) {
root_obj := make(AMFObj) root_obj := make(AMFObj)
for { for {
@ -91,6 +103,9 @@ func read_object(data *[]byte, byte_idx *uint32) (AMFObj, error) {
} }
// read the next thing
// read the next byte to determine the data type, then call one of the above to read
// defined as AMF0 spec
func read_next(data *[]byte, byte_idx *uint32) (interface{}, error) { func read_next(data *[]byte, byte_idx *uint32) (interface{}, error) {
data_type, err := read_bytes(data, byte_idx, 1) data_type, err := read_bytes(data, byte_idx, 1)
var next_obj interface{} var next_obj interface{}
@ -106,7 +121,7 @@ func read_next(data *[]byte, byte_idx *uint32) (interface{}, error) {
next_obj, err = read_string(data, byte_idx) next_obj, err = read_string(data, byte_idx)
case 3: case 3:
next_obj, err = read_object(data, byte_idx) next_obj, err = read_object(data, byte_idx)
case 5: case 5: // null marker, no extra empty bytes, just skip to next object
return nil, nil return nil, nil
default: default:
return nil, errors.New("Unhandled data type") return nil, errors.New("Unhandled data type")

View file

@ -6,6 +6,8 @@ import (
"errors" "errors"
) )
// reverse of the decode in amf/decode,go
// see that file for what most of this means
func Encode(amf_root_obj AMFObj) ([]byte, error) { func Encode(amf_root_obj AMFObj) ([]byte, error) {
tmp_buffer := make([]byte, 1024) tmp_buffer := make([]byte, 1024)
bytes_encoded := 0 bytes_encoded := 0

View file

@ -6,6 +6,8 @@ import (
"encoding/binary" "encoding/binary"
) )
// data intake object, only needed since most of the work is
// just keeping track of the previous values for higher chunk formats to reuse
type ChunkStream struct { type ChunkStream struct {
timestamp uint32 timestamp uint32
last_msg_strm_id uint32 last_msg_strm_id uint32
@ -14,6 +16,8 @@ type ChunkStream struct {
timedelta uint32 timedelta uint32
} }
// actual message object, metadata from the chunk header
// data either spans the chunk or split across multiple with format 3
type Message struct { type Message struct {
data []byte data []byte
curr_bytes_read uint32 curr_bytes_read uint32
@ -22,6 +26,9 @@ type Message struct {
msg_len uint32 msg_len uint32
} }
// just here because I got tired of constantly assigning byte buffers and didn't want to
// abstract it, plus helped figuring out how to use pointers and memory allocation on
// restricted memory
type ChunkBuffers struct { type ChunkBuffers struct {
time []byte time []byte
msg_len []byte msg_len []byte
@ -31,23 +38,25 @@ type ChunkBuffers struct {
csid_true []byte csid_true []byte
} }
// reads the initial variable size header that defines the chunk's format and chunkstream id
// 5.3.1.1
func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) { func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil { if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil {
return return
} }
format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) // get first 2 bits for format 0-3
switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { // last 6 bits 0-63
case 0: case 0: // csid 0 is invalid, means true csid is the next byte, + 64 for the 6 bits prior
if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil { if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil {
return return
} }
csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64 csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64
case 1: case 1: // csid 1 is invalid, means true csid is in the next 2 bytes, reverse order (little endian) and the 64, reconstruct
if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil { if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil {
return return
} }
csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64 csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64
default: default: // by default true csid was read
csid = uint32(chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f) csid = uint32(chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f)
} }
return return
@ -97,9 +106,13 @@ func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, e
return return
} }
// msg header format 0, all data, typically for the start of a new chunkstream or reset, or all data changes
// 5.3.1.2.1
func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
var extended_time bool var extended_time bool
var err error var err error
// first bit of data is time in the first 3 bytes, if max value, defined as extended time
// true time value is at end of header (technically after but its still effectively in the header)
chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr) chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr)
if err != nil { if err != nil {
return err return err
@ -110,7 +123,7 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
return err return err
} }
msg_ptr.data = make([]byte, msg_ptr.msg_len) msg_ptr.data = make([]byte, msg_ptr.msg_len)
chnk_stream_ptr.last_msg_len = msg_ptr.msg_len chnk_stream_ptr.last_msg_len = msg_ptr.msg_len // assign data for different formats
msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr) msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr)
if err != nil { if err != nil {
@ -134,6 +147,10 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
return nil return nil
} }
// format 1, typically for the 2nd message in a previously init chunkstream with most other data changed
// no longer time but timedelta. timestamp for the message is computed from the stored timestamp when the stream was opened
// otherwise the same except the msg_stream_id is reused (unchanged)
// 5.3.1.2.2
func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
var extended_time bool var extended_time bool
var err error var err error
@ -167,6 +184,8 @@ func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
return nil return nil
} }
// format 2, typically for when format 1 can be used but even the message len and type are identical
// 5.3.1.2.3
func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
var extended_time bool var extended_time bool
var err error var err error
@ -193,6 +212,8 @@ func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
return nil return nil
} }
// read the actual chunkdata given the space for the message that has been constructed using the chunk headers and
// configured peer chunk size
func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) { func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) {
bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read
var buffer_end uint32 var buffer_end uint32
@ -210,6 +231,8 @@ func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32,
} }
// wrapper function to handle all formats and chunkstreams returns a pointer to a Message (can be nil if not completed)
// n_read was there to handle ack win, but doesn't really seem to matter at all
func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){
conn.SetDeadline(time.Now().Add(10 * time.Second)) conn.SetDeadline(time.Now().Add(10 * time.Second))
@ -230,9 +253,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
switch format { switch format {
case 0: case 0:
// format 0, assume new chunkstream which will override and reset any previous chunkstream regardless
chnkstream_ptr = new(ChunkStream) chnkstream_ptr = new(ChunkStream)
open_chnkstrms[csid] = chnkstream_ptr open_chnkstrms[csid] = chnkstream_ptr // assign the newly made chunkstream to the map of those currently open
msg_ptr = new(Message) msg_ptr = new(Message) // cannot be a continued message since the stream just started
if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
return nil, err return nil, err
@ -242,6 +266,8 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
*n_read += 4 *n_read += 4
} }
case 1: case 1:
// format 1, chunkstream with this csid MUST have been opened before, but message is just starting since
// message metadata is being declared
chnkstream_ptr = open_chnkstrms[csid] chnkstream_ptr = open_chnkstrms[csid]
msg_ptr = new(Message) msg_ptr = new(Message)
@ -253,6 +279,7 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
*n_read += 4 *n_read += 4
} }
case 2: case 2:
// format 2, same as 1
chnkstream_ptr = open_chnkstrms[csid] chnkstream_ptr = open_chnkstrms[csid]
msg_ptr = new(Message) msg_ptr = new(Message)
@ -264,6 +291,8 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
*n_read += 4 *n_read += 4
} }
case 3: case 3:
// format 3 has no header, only happens when either a previous message over the csid was too large for the chunksize
// or a new message with the same exact metadata including timedelta is being sent.
chnkstream_ptr = open_chnkstrms[csid] chnkstream_ptr = open_chnkstrms[csid]
msg_prev, ok := open_msgs[chnkstream_ptr.last_msg_strm_id] msg_prev, ok := open_msgs[chnkstream_ptr.last_msg_strm_id]
if !ok { if !ok {
@ -287,10 +316,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
conn.SetDeadline(time.Time{}) conn.SetDeadline(time.Time{})
if msg_ptr.curr_bytes_read < msg_ptr.msg_len { if msg_ptr.curr_bytes_read < msg_ptr.msg_len { // if the full message was not read, save it as an open message for the next chunk
open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr
return nil, nil return nil, nil
} else { } else { // else a full message was read, remove it from the list of open messages if it was there at all and return it
delete(open_msgs, chnkstream_ptr.last_msg_strm_id) delete(open_msgs, chnkstream_ptr.last_msg_strm_id)
return msg_ptr, nil return msg_ptr, nil
} }

View file

@ -6,6 +6,7 @@ import (
"time" "time"
) )
// counterpart to the read_basic_header from chunk.go, basically the reverse of that
func write_basic_header(chunk_buffer *[]byte, format uint8, csid uint32) { func write_basic_header(chunk_buffer *[]byte, format uint8, csid uint32) {
if csid < 64 { if csid < 64 {
(*chunk_buffer)[0] = (format << 6) + uint8(csid) (*chunk_buffer)[0] = (format << 6) + uint8(csid)
@ -18,25 +19,30 @@ func write_basic_header(chunk_buffer *[]byte, format uint8, csid uint32) {
} }
} }
// only consider writing format 0 for new, or format 3 for cut off. As the server only has to send
// protocol messages, no real point optimizing for a demo
func write_message_header_0(chunk_buffer *[]byte, msg_strmid uint32, msg_ptr *Message, basic_header_size uint32) { func write_message_header_0(chunk_buffer *[]byte, msg_strmid uint32, msg_ptr *Message, basic_header_size uint32) {
msg_len_inter := make([]byte, 4) msg_len_inter := make([]byte, 4) // uint32 to uint24 hack
binary.BigEndian.PutUint32(msg_len_inter, msg_ptr.msg_len) binary.BigEndian.PutUint32(msg_len_inter, msg_ptr.msg_len)
copy((*chunk_buffer)[basic_header_size + 3:basic_header_size + 6], msg_len_inter[1:]) copy((*chunk_buffer)[basic_header_size + 3:basic_header_size + 6], msg_len_inter[1:]) // skip timestamp since it will always be 0 for protocol and control msgs
(*chunk_buffer)[basic_header_size + 6] = msg_ptr.msg_type (*chunk_buffer)[basic_header_size + 6] = msg_ptr.msg_type
binary.LittleEndian.PutUint32((*chunk_buffer)[basic_header_size + 7:basic_header_size + 11], msg_strmid) binary.LittleEndian.PutUint32((*chunk_buffer)[basic_header_size + 7:basic_header_size + 11], msg_strmid)
} }
// write the actual data
func write_chunk_data(chunk_buffer *[]byte, data []byte, bytes_buffered *uint32, chunk_data_size uint32, header_size uint32) { func write_chunk_data(chunk_buffer *[]byte, data []byte, bytes_buffered *uint32, chunk_data_size uint32, header_size uint32) {
copy((*chunk_buffer)[header_size:header_size+chunk_data_size], data[*bytes_buffered:*bytes_buffered + chunk_data_size]) copy((*chunk_buffer)[header_size:header_size+chunk_data_size], data[*bytes_buffered:*bytes_buffered + chunk_data_size])
*bytes_buffered += chunk_data_size *bytes_buffered += chunk_data_size
} }
// chunk writing wrapper
func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message, chunk_size uint32) (error) { func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message, chunk_size uint32) (error) {
conn.SetDeadline(time.Now().Add(10 * time.Second)) conn.SetDeadline(time.Now().Add(10 * time.Second))
var i uint32 = 0 var i uint32 = 0 // track number of bytes written
for i < msg_ptr.msg_len { for i < msg_ptr.msg_len { // until message is fully sent
var chunk_data_size uint32 var chunk_data_size uint32
// check if remaining message needs to be chunked or not
if msg_ptr.msg_len - i > chunk_size { if msg_ptr.msg_len - i > chunk_size {
chunk_data_size = chunk_size chunk_data_size = chunk_size
} else { } else {
@ -53,7 +59,7 @@ func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message,
var chunk_buffer []byte var chunk_buffer []byte
var header_size uint32 var header_size uint32
if i == 0 { if i == 0 { // if chunkstream is to be opened/reset (format 0 = message header)
chunk_buffer = make([]byte, basic_header_size + 11 + chunk_data_size) chunk_buffer = make([]byte, basic_header_size + 11 + chunk_data_size)
write_basic_header(&chunk_buffer, 0, csid) write_basic_header(&chunk_buffer, 0, csid)
write_message_header_0(&chunk_buffer, msg_strmid, msg_ptr, basic_header_size) write_message_header_0(&chunk_buffer, msg_strmid, msg_ptr, basic_header_size)
@ -64,7 +70,7 @@ func WriteChunk(conn net.Conn, csid uint32, msg_strmid uint32, msg_ptr *Message,
header_size = basic_header_size header_size = basic_header_size
} }
write_chunk_data(&chunk_buffer, msg_ptr.data, &i, chunk_data_size, header_size) write_chunk_data(&chunk_buffer, msg_ptr.data, &i, chunk_data_size, header_size)
if _, err := conn.Write(chunk_buffer); err != nil { if _, err := conn.Write(chunk_buffer); err != nil { // expunge buffer over network connection
return err return err
} }
} }

View file

@ -8,6 +8,11 @@ import (
"errors" "errors"
) )
// meta wrapper for each connection, consolidates a bunch of info
// for each individual connection together to make calling reads divorced from
// chunking
// params -> see server.go
// open_chnkstrms + open_msgs + chunk_bufs -> see chunk.go
type ChunkWrapper struct { type ChunkWrapper struct {
conn net.Conn conn net.Conn
params *ProtocolParams params *ProtocolParams
@ -16,13 +21,15 @@ type ChunkWrapper struct {
chunk_buffs *ChunkBuffers chunk_buffs *ChunkBuffers
} }
// inits the above struct object for a given connection, most values
// can be init as the data type default
func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) { func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) {
chnk_wrp_ptr = new(ChunkWrapper) chnk_wrp_ptr = new(ChunkWrapper)
chnk_wrp_ptr.conn = conn chnk_wrp_ptr.conn = conn
chnk_wrp_ptr.params = new(ProtocolParams) chnk_wrp_ptr.params = new(ProtocolParams)
chnk_wrp_ptr.params.chunk_size = 256 chnk_wrp_ptr.params.chunk_size = 256 // since only small messages sent, no real need to be large
chnk_wrp_ptr.params.peer_chunk_size = 4096 chnk_wrp_ptr.params.peer_chunk_size = 4096 // can be arbitrary, just set here to OBS default
chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream) chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream)
chnk_wrp_ptr.open_msgs = make(map[uint32]*Message) chnk_wrp_ptr.open_msgs = make(map[uint32]*Message)
buffers := ChunkBuffers{ buffers := ChunkBuffers{
@ -37,6 +44,7 @@ func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) {
return return
} }
// basic wrapper for ReadChunk from chunk.go. just forwards the relevant data and pointers.
func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) { func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) {
full_msg_ptr, err := ReadChunk( full_msg_ptr, err := ReadChunk(
chnk_wrp_ptr.conn, chnk_wrp_ptr.conn,
@ -52,6 +60,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) {
return full_msg_ptr, nil return full_msg_ptr, nil
} }
// basic wrapper for WriteChunk from chunk2.go, see above
func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg_ptr *Message) (error) { func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg_ptr *Message) (error) {
err := WriteChunk( err := WriteChunk(
chnk_wrp_ptr.conn, chnk_wrp_ptr.conn,
@ -66,6 +75,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg
return nil return nil
} }
// see section 5.4.1 of RTMP spec doc
func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk() set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 { if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 {
@ -75,6 +85,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
return nil return nil
} }
// see 7.2.1.1
func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk() connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || connect_cmd_msg.msg_type != 20 { if err != nil || connect_cmd_msg.msg_type != 20 {
@ -91,6 +102,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
return err return err
} }
// 5.4.4
func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) { func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) {
msg_ptr := new(Message) msg_ptr := new(Message)
msg_ptr.msg_type = 5 msg_ptr.msg_type = 5
@ -103,6 +115,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) {
return nil return nil
} }
// 5.4.5
func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) { func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) {
msg_ptr := new(Message) msg_ptr := new(Message)
msg_ptr.msg_type = 6 msg_ptr.msg_type = 6
@ -116,6 +129,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) {
return nil return nil
} }
// 5.4.1
func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) { func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) {
msg_ptr := new(Message) msg_ptr := new(Message)
msg_ptr.msg_type = 1 msg_ptr.msg_type = 1
@ -128,6 +142,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) {
return nil return nil
} }
// you get the drill
func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) { func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) {
msg_ptr := new(Message) msg_ptr := new(Message)
msg_ptr.msg_type = 20 msg_ptr.msg_type = 20
@ -207,6 +222,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) {
return nil return nil
} }
// first actual data packet, write to output pipe
func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) {
metadata, err := chnk_wrp_ptr.ReadChunk() metadata, err := chnk_wrp_ptr.ReadChunk()
if err != nil || metadata.msg_type != 18 { if err != nil || metadata.msg_type != 18 {

View file

@ -4,6 +4,8 @@ import (
"os" "os"
) )
// series of messages sent from the client and responses
// window ack and peer bw seem to be completely useless to obs
func NegotiateConnect(chnk_wrp_ptr *ChunkWrapper) (bool) { func NegotiateConnect(chnk_wrp_ptr *ChunkWrapper) (bool) {
if err := chnk_wrp_ptr.ReadPeerChunkSize(); err != nil { if err := chnk_wrp_ptr.ReadPeerChunkSize(); err != nil {
return false return false
@ -26,7 +28,13 @@ func NegotiateConnect(chnk_wrp_ptr *ChunkWrapper) (bool) {
return true return true
} }
// next sequence
func CreateStream(chnk_wrp_ptr *ChunkWrapper) (bool) { func CreateStream(chnk_wrp_ptr *ChunkWrapper) (bool) {
// first 2 messages are "releaseStream" and "FCPublish"
// no idea why the second exists since the next command is Publish
// the first has no docs since flash server docs are gone
// assuming it just signals a delete on any streams the current connection had
// it can be ignored for a pure stream intake server
if _, err := chnk_wrp_ptr.ReadChunk(); err != nil { if _, err := chnk_wrp_ptr.ReadChunk(); err != nil {
return false return false
} }
@ -42,6 +50,8 @@ func CreateStream(chnk_wrp_ptr *ChunkWrapper) (bool) {
return true return true
} }
// Publish command handling, this is where stream keys are exchanged which means
// auth checking happens here
func PublishAsKey(chnk_wrp_ptr *ChunkWrapper) (bool) { func PublishAsKey(chnk_wrp_ptr *ChunkWrapper) (bool) {
if err := chnk_wrp_ptr.ReadPublish(); err != nil { if err := chnk_wrp_ptr.ReadPublish(); err != nil {
return false return false
@ -55,6 +65,10 @@ func PublishAsKey(chnk_wrp_ptr *ChunkWrapper) (bool) {
return true return true
} }
// auth checking. again db would be the goal, for now just using a dir system
// pre generate a directory under ~/live/___/. Set the stream key to the folder name
// and this just checks if the folder exists (like a known public key)
// not secure at all but works for a demo
func check_stream_key(stream_key string) (bool) { func check_stream_key(stream_key string) (bool) {
base_dir, _ := os.UserHomeDir() base_dir, _ := os.UserHomeDir()
if fileinfo, err := os.Stat(base_dir + "/live/" + stream_key); err == nil && fileinfo.IsDir() { if fileinfo, err := os.Stat(base_dir + "/live/" + stream_key); err == nil && fileinfo.IsDir() {

View file

@ -8,12 +8,13 @@ import (
) )
func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) {
StreamCleanup(chnk_wrp_ptr.params.stream_key, 0) StreamCleanup(chnk_wrp_ptr.params.stream_key, 0) // remove any unwanted media files still left over
file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) // create a file writer (techincally a pipe to ffmpeg)
defer file_writer.Close() defer file_writer.Close()
if err != nil { if err != nil {
return return
} }
// first data message must always be metadata
if err = chnk_wrp_ptr.ReadMetadata(file_writer); err != nil { if err = chnk_wrp_ptr.ReadMetadata(file_writer); err != nil {
return return
} }
@ -21,13 +22,13 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) {
msg_ptr, err := chnk_wrp_ptr.ReadChunk() msg_ptr, err := chnk_wrp_ptr.ReadChunk()
if err != nil { if err != nil {
return return
} else if msg_ptr == nil { } else if msg_ptr == nil { // if message was not fully read (mostly first video chunk)
continue continue // skip until full message
} }
switch msg_ptr.msg_type { switch msg_ptr.msg_type {
case 20: case 20:
return return // should try to process and check specifically for unpublish, works about the same either way
case 8, 9: case 8, 9: // audio or video data
if err = file_writer.WriteMediaTag(&(msg_ptr.data), msg_ptr.timestamp, msg_ptr.msg_type); err != nil { if err = file_writer.WriteMediaTag(&(msg_ptr.data), msg_ptr.timestamp, msg_ptr.msg_type); err != nil {
return return
} }
@ -35,9 +36,10 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) {
} }
} }
// find all files in given directory and delete, simple as
func StreamCleanup(stream_key string, delay time.Duration) { func StreamCleanup(stream_key string, delay time.Duration) {
time.Sleep(delay * time.Second) time.Sleep(delay * time.Second)
base_dir, _ := os.UserHomeDir() base_dir, _ := os.UserHomeDir() // why would this ever need error handling? if it throws a problem something went very wrong somewhere anyway
stream_dir := base_dir + "/live/" + stream_key stream_dir := base_dir + "/live/" + stream_key
leftover_files, _ := filepath.Glob(stream_dir + "/*") leftover_files, _ := filepath.Glob(stream_dir + "/*")

View file

@ -8,12 +8,14 @@ import (
) )
type FLVWriter struct { type FLVWriter struct {
w io.WriteCloser w io.WriteCloser // abstraction as a wrapper to the actual transcoder command
} }
func NewFLVWriter(stream_key string) (*FLVWriter, error) { func NewFLVWriter(stream_key string) (*FLVWriter, error) {
base_dir, _ := os.UserHomeDir() base_dir, _ := os.UserHomeDir()
writer := new(FLVWriter) writer := new(FLVWriter)
// spawn ffmpeg as a transcoder + hls segmenter
// most are generic commands, added a prefix to each segment url to make serving it over http easier
transcoder := exec.Command( transcoder := exec.Command(
"ffmpeg", "ffmpeg",
"-probesize", "500", "-probesize", "500",
@ -28,14 +30,14 @@ func NewFLVWriter(stream_key string) (*FLVWriter, error) {
"-hls_flags", "+program_date_time", "-hls_flags", "+program_date_time",
"stream.m3u8", "stream.m3u8",
) )
transcoder.Dir = base_dir + "/live/" + stream_key + "/" transcoder.Dir = base_dir + "/live/" + stream_key + "/" // shift to the appropriate dir for the given stream key
flvpipe, err := transcoder.StdinPipe() flvpipe, err := transcoder.StdinPipe() // give control over the ffmpeg input as a stdin pipe (will push in flv packets as they come)
transcoder.Start() transcoder.Start() // spawn ffmpeg
if err != nil { if err != nil {
return nil, err return nil, err
} }
writer.w = flvpipe writer.w = flvpipe
if err = writer.write_flv_header(); err != nil { if err = writer.write_flv_header(); err != nil { // init its header now, might as well
return nil, err return nil, err
} }
return writer, nil return writer, nil
@ -45,11 +47,13 @@ func (writer *FLVWriter) Close() (error) {
return writer.w.Close() return writer.w.Close()
} }
// first data tag, AMF0 message but FLV spec requires the encoded data
// just requires skipping the RTMP specific bits, then just writing as is
func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) { func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) {
uint24_buf := make([]byte, 4) uint24_buf := make([]byte, 4)
tag_header := make([]byte, 11) tag_header := make([]byte, 11)
tag_header[0] = 18 tag_header[0] = 18 // RTMP msg_type 18
binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:]))) binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:]))) // "@setDataFrame", null, actual data
copy(tag_header[1:4], uint24_buf[1:]) copy(tag_header[1:4], uint24_buf[1:])
if _, err = writer.w.Write(tag_header); err != nil { if _, err = writer.w.Write(tag_header); err != nil {
@ -66,6 +70,7 @@ func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) {
return return
} }
// write the actual audio + video data, same as metadata except no skipping
func (writer *FLVWriter) WriteMediaTag(data *[]byte, timestamp uint32, media_type uint8) (err error) { func (writer *FLVWriter) WriteMediaTag(data *[]byte, timestamp uint32, media_type uint8) (err error) {
uint24_buf := make([]byte, 4) uint24_buf := make([]byte, 4)
tag_header := make([]byte, 11) tag_header := make([]byte, 11)
@ -91,12 +96,16 @@ func (writer *FLVWriter) WriteMediaTag(data *[]byte, timestamp uint32, media_typ
return return
} }
// FLV file header always 9 bytes + 4 bytes for first ghost data tag
func (writer *FLVWriter) write_flv_header() (err error) { func (writer *FLVWriter) write_flv_header() (err error) {
header := make([]byte, 13) header := make([]byte, 13)
copy(header[:3], "FLV") copy(header[:3], "FLV")
header[3] = 1 header[3] = 1 // FLV version, only 1 is ever used
header[4] = 5 header[4] = 5 // 1 and 4 -> audio OR video | 5 -> audio + video
header[8] = 9 header[8] = 9 // len of header
// flv spec requires each tag + tag_len. defines a ghost tag at start with len 0
// likely for player seeking?
_, err = writer.w.Write(header) _, err = writer.w.Write(header)
return return

View file

@ -6,6 +6,7 @@ import (
"encoding/binary" "encoding/binary"
) )
// see adobe specs for RTMP
func DoHandshake(conn net.Conn) (hs_success bool) { func DoHandshake(conn net.Conn) (hs_success bool) {
C0C1C2 := make([]byte, 1536*2 + 1) C0C1C2 := make([]byte, 1536*2 + 1)
S0S1S2 := make([]byte, 1536*2 + 1) S0S1S2 := make([]byte, 1536*2 + 1)
@ -13,6 +14,7 @@ func DoHandshake(conn net.Conn) (hs_success bool) {
S0S1S2[0] = 3 S0S1S2[0] = 3
binary.BigEndian.PutUint32(S0S1S2[1:1+4], uint32(time.Now().UnixMilli())) binary.BigEndian.PutUint32(S0S1S2[1:1+4], uint32(time.Now().UnixMilli()))
// force handshake to finish in under 15 seconds (aribtrary) or throw an error
conn.SetDeadline(time.Now().Add(15 * time.Second)) conn.SetDeadline(time.Now().Add(15 * time.Second))
if _, err := conn.Read(C0C1C2); err != nil || C0C1C2[0] != 3 { if _, err := conn.Read(C0C1C2); err != nil || C0C1C2[0] != 3 {
@ -21,13 +23,13 @@ func DoHandshake(conn net.Conn) (hs_success bool) {
copy(C0C1C2[1:1536], S0S1S2[1+1536:]) copy(C0C1C2[1:1536], S0S1S2[1+1536:])
binary.BigEndian.PutUint32(S0S1S2[1+1536+4:1+1536+8], uint32(time.Now().UnixMilli())) binary.BigEndian.PutUint32(S0S1S2[1+1536+4:1+1536+8], uint32(time.Now().UnixMilli()))
if _, err := conn.Write(S0S1S2); err != nil { if _, err := conn.Write(S0S1S2); err != nil { // specs say only send S0S1 and wait for C2 before sending S2, obs doesn't care apparently
return return
} }
if _, err := conn.Read(C0C1C2[1+1536:]); err != nil { if _, err := conn.Read(C0C1C2[1+1536:]); err != nil {
return return
} }
hs_success = true hs_success = true
conn.SetDeadline(time.Time{}) conn.SetDeadline(time.Time{}) // remove deadline for next function
return return
} }

View file

@ -4,6 +4,7 @@ import (
"net" "net"
) )
// meta-struct, holding together params for each streaing peer
type ProtocolParams struct { type ProtocolParams struct {
peer_chunk_size uint32 peer_chunk_size uint32
chunk_size uint32 chunk_size uint32
@ -23,6 +24,9 @@ func NewServer(port string) (error) {
} }
func start(l net.Listener) { func start(l net.Listener) {
// bool to block connections while a stream is currently active
// should have it on a per user basis instead, but adding a db
// system would make this even more bloated
stream_live := false stream_live := false
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@ -41,7 +45,7 @@ func handle_conn(conn net.Conn, stream_live *bool) {
defer conn.Close() defer conn.Close()
defer func(a *bool) { defer func(a *bool) {
*a = false *a = false
}(stream_live) }(stream_live) // flip the stream state back when done
if !DoHandshake(conn) { if !DoHandshake(conn) {
return return
} }
@ -55,6 +59,6 @@ func handle_conn(conn net.Conn, stream_live *bool) {
if !PublishAsKey(chunk_wrapper) { if !PublishAsKey(chunk_wrapper) {
return return
} }
HandleDataLoop(chunk_wrapper) HandleDataLoop(chunk_wrapper) // no error handle since the connection ends either way
go StreamCleanup(chunk_wrapper.params.stream_key, 60) go StreamCleanup(chunk_wrapper.params.stream_key, 60) // remove any media files left 1 min after stream ends
} }