diff --git a/rtmp/chunk.go b/rtmp/chunk.go new file mode 100644 index 0000000..28cfef6 --- /dev/null +++ b/rtmp/chunk.go @@ -0,0 +1,140 @@ +package rtmp + +import ( + "net" + "time" + "encoding/binary" +) + +type chnk_stream struct { + timestamp uint32 + last_msg_strm_id uint32 + timedelta uint32 +} + +type message struct { +// msg_strm_id uint32 + data []byte + curr_bytes_read uint32 + timestamp uint32 + msg_type uint8 + msg_len uint32 +} + + +func read_basic_header(conn net.Conn) (uint8, uint32, error) { + fmt_csid_byte := make([]byte, 1) + if _, err := conn.Read(fmt_csid_byte); err != nil { + return 0, 0, err + } + format := uint8(fmt_csid_byte[0] >> 6) + var csid uint32 + switch fmt_csid_byte[0] & 0x3f { + case 0: + csid_true := make([]byte, 1) + if _, err := conn.Read(csid_true); err != nil { + return 0, 0, err + } + csid = uint32(csid_true[0]) + 64 + case 1: + csid_true := make([]byte, 2) + if _, err := conn.Read(csid_true); err != nil { + return 0, 0, err + } + csid = uint32(binary.LittleEndian.Uint16(csid_true)) + 64 + default: + csid = uint32(fmt_csid_byte[0] & 0x3f) + } + return format, csid, nil +} + +func read_message_header_0(conn net.Conn, chnk_stream_ptr *chnk_stream, msg_ptr *message) (error) { + var extended_time bool + timestamp := make([]byte, 4) + if _, err := conn.Read(timestamp[1:]); err != nil { + return err + } + chnk_stream_ptr.timestamp = binary.BigEndian.Uint32(timestamp) + + if chnk_stream_ptr.timestamp ^ 0xffffff == 0 { + extended_time = true + } + + msg_len := make([]byte, 4) + if _, err := conn.Read(msg_len[1:]); err != nil { + return err + } + msg_ptr.msg_len = binary.BigEndian.Uint32(msg_len) + msg_ptr.data = make([]byte, msg_ptr.msg_len) + + msg_typeid := make([]byte, 1) + if _, err := conn.Read(msg_typeid); err != nil { + return err + } + msg_ptr.msg_type = msg_typeid[0] + + msg_streamid := make([]byte, 4) + if _, err := conn.Read(msg_typeid); err != nil { + return err + } + chnk_stream_ptr.last_msg_strm_id = binary.BigEndian.Uint32(msg_streamid) + + if extended_time { + if _, err := conn.Read(timestamp); err != nil { + return err + } + chnk_stream_ptr.timestamp = binary.BigEndian.Uint32(timestamp) + } + msg_ptr.timestamp = chnk_stream_ptr.timestamp + + return nil +} + +func read_chunk_data(conn net.Conn, msg_ptr *message, chnk_size uint32) (error) { + bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read + var buffer_end uint32 + if bytes_left < chnk_size { + buffer_end = bytes_left + msg_ptr.curr_bytes_read + 1 + } else { + buffer_end = chnk_size + msg_ptr.curr_bytes_read + 1 + } + n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end]) + if err != nil { + return err + } + msg_ptr.curr_bytes_read += uint32(n) + return nil + +} + +func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*chnk_stream, open_msgs map[uint32]*message, chnk_size uint32) (*message, error){ + conn.SetDeadline(time.Now().Add(10 * time.Second)) + + format, csid, err := read_basic_header(conn) + if err != nil { + return nil, err + } + + switch format { + case 0: + chnkstream_ptr := new(chnk_stream) + open_chnkstrms[csid] = chnkstream_ptr + msg_ptr := new(message) + + if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr); err != nil { + return nil, err + } + if err := read_chunk_data(conn, msg_ptr, chnk_size); err != nil { + return nil, err + } + if msg_ptr.msg_len > chnk_size { + open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr + return nil, nil + } else { + return msg_ptr, nil + } + } + + conn.SetDeadline(time.Time{}) + return nil, nil +}