From 05087ccbb3d531ff9656ed337c671eba3102fd40 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Thu, 10 Aug 2023 15:28:43 +0500 Subject: [PATCH] Export ChunkBuffers, track bytes read in total --- rtmp/chunk.go | 60 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/rtmp/chunk.go b/rtmp/chunk.go index 870aec4..3029ca9 100644 --- a/rtmp/chunk.go +++ b/rtmp/chunk.go @@ -22,7 +22,7 @@ type Message struct { msg_len uint32 } -type chunk_bufs struct { +type ChunkBuffers struct { time []byte msg_len []byte msg_typeid []byte @@ -31,8 +31,8 @@ type chunk_bufs struct { csid_true []byte } -func new_chunk_bufs() (*chunk_bufs) { - new_chunk_bufs := chunk_bufs{ +func new_chunk_bufs() (*ChunkBuffers) { + new_chunk_bufs := ChunkBuffers{ make([]byte, 4), make([]byte, 4), make([]byte, 1), @@ -43,14 +43,14 @@ func new_chunk_bufs() (*chunk_bufs) { return &new_chunk_bufs } -func OpenStreamsMapInit() (map[uint32]*ChunkStream, map[uint32]*Message, *chunk_bufs) { +func OpenStreamsMapInit() (map[uint32]*ChunkStream, map[uint32]*Message, *ChunkBuffers) { open_chnkstrms := make(map[uint32]*ChunkStream) open_msgs := make(map[uint32]*Message) chunk_bufs_ptr := new_chunk_bufs() return open_chnkstrms, open_msgs, chunk_bufs_ptr } -func read_basic_header(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (format uint8, csid uint32, err error) { +func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil { return } @@ -72,7 +72,7 @@ func read_basic_header(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (format uint8, return } -func read_time(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, extended_time bool, err error) { +func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extended_time bool, err error) { chunk_bufs_ptr.time[0] = 0 if _, err = conn.Read(chunk_bufs_ptr.time[1:]); err != nil { return @@ -84,7 +84,7 @@ func read_time(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, extended return } -func read_msg_len(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_len uint32, err error) { +func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_len[1:]); err != nil { return } @@ -92,7 +92,7 @@ func read_msg_len(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_len uint32, er return } -func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_type uint8, err error) { +func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint8, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_typeid); err != nil { return } @@ -100,7 +100,7 @@ func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_type uint8, return } -func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_streamid uint32, err error) { +func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streamid uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_streamid); err != nil { return } @@ -108,7 +108,7 @@ func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_streamid return } -func read_time_extd(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, err error) { +func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.time); err != nil { return } @@ -116,7 +116,7 @@ func read_time_extd(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, err return } -func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) { +func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr) @@ -153,7 +153,7 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } -func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) { +func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error @@ -186,7 +186,7 @@ func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } -func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) { +func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error @@ -212,7 +212,7 @@ func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr return nil } -func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (error) { +func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) { bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read var buffer_end uint32 if bytes_left < chnk_size { @@ -222,16 +222,16 @@ func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (error) } n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end]) if err != nil { - return err + return 0, err } msg_ptr.curr_bytes_read += uint32(n) - return nil + return uint32(n), nil } -func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *chunk_bufs) (*Message, error){ +func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){ conn.SetDeadline(time.Now().Add(10 * time.Second)) - + var chnkstream_ptr *ChunkStream var msg_ptr *Message @@ -239,6 +239,13 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs if err != nil { return nil, err } + if csid < 64 { + *n_read += 1 + } else if csid < 320 { + *n_read += 2 + } else { + *n_read += 3 + } switch format { case 0: @@ -249,6 +256,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } + *n_read += 11 + if msg_ptr.timestamp > 0xffffff { + *n_read += 4 + } case 1: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) @@ -256,6 +267,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs if err := read_message_header_1(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } + *n_read += 7 + if msg_ptr.timestamp > 0xffffff { + *n_read += 4 + } case 2: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) @@ -263,15 +278,20 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs if err := read_message_header_2(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } + *n_read += 3 + if msg_ptr.timestamp > 0xffffff { + *n_read += 4 + } case 3: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = open_msgs[chnkstream_ptr.last_msg_strm_id] } - - if err := read_chunk_data(conn, msg_ptr, chnk_size); err != nil { + n, err := read_chunk_data(conn, msg_ptr, chnk_size) + if err != nil { return nil, err } + *n_read += n conn.SetDeadline(time.Time{})