package rtmp import ( "net" "time" "encoding/binary" ) type chnk_stream struct { timestamp uint32 last_msg_strm_id uint32 timedelta uint32 } type message struct { // msg_strm_id uint32 data []byte curr_bytes_read uint32 timestamp uint32 msg_type uint8 msg_len uint32 } func OpenStreamsMapInit() (map[uint32]*chnk_stream, map[uint32]*message) { open_chnkstrms := make(map[uint32]*chnk_stream) open_msgs := make(map[uint32]*message) return open_chnkstrms, open_msgs } func read_basic_header(conn net.Conn) (uint8, uint32, error) { fmt_csid_byte := make([]byte, 1) if _, err := conn.Read(fmt_csid_byte); err != nil { return 0, 0, err } format := uint8(fmt_csid_byte[0] >> 6) var csid uint32 switch fmt_csid_byte[0] & 0x3f { case 0: csid_true := make([]byte, 1) if _, err := conn.Read(csid_true); err != nil { return 0, 0, err } csid = uint32(csid_true[0]) + 64 case 1: csid_true := make([]byte, 2) if _, err := conn.Read(csid_true); err != nil { return 0, 0, err } csid = uint32(binary.LittleEndian.Uint16(csid_true)) + 64 default: csid = uint32(fmt_csid_byte[0] & 0x3f) } return format, csid, nil } func read_message_header_0(conn net.Conn, chnk_stream_ptr *chnk_stream, msg_ptr *message) (error) { var extended_time bool timestamp := make([]byte, 4) if _, err := conn.Read(timestamp[1:]); err != nil { return err } chnk_stream_ptr.timestamp = binary.BigEndian.Uint32(timestamp) if chnk_stream_ptr.timestamp ^ 0xffffff == 0 { extended_time = true } msg_len := make([]byte, 4) if _, err := conn.Read(msg_len[1:]); err != nil { return err } msg_ptr.msg_len = binary.BigEndian.Uint32(msg_len) msg_ptr.data = make([]byte, msg_ptr.msg_len) msg_typeid := make([]byte, 1) if _, err := conn.Read(msg_typeid); err != nil { return err } msg_ptr.msg_type = msg_typeid[0] msg_streamid := make([]byte, 4) if _, err := conn.Read(msg_streamid); err != nil { return err } chnk_stream_ptr.last_msg_strm_id = binary.BigEndian.Uint32(msg_streamid) if extended_time { if _, err := conn.Read(timestamp); err != nil { return err } chnk_stream_ptr.timestamp = binary.BigEndian.Uint32(timestamp) } msg_ptr.timestamp = chnk_stream_ptr.timestamp return nil } func read_chunk_data(conn net.Conn, msg_ptr *message, chnk_size uint32) (error) { bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read var buffer_end uint32 if bytes_left < chnk_size { buffer_end = bytes_left + msg_ptr.curr_bytes_read } else { buffer_end = chnk_size + msg_ptr.curr_bytes_read } n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end]) if err != nil { return err } msg_ptr.curr_bytes_read += uint32(n) return nil } func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*chnk_stream, open_msgs map[uint32]*message, chnk_size uint32) (*message, error){ conn.SetDeadline(time.Now().Add(10 * time.Second)) format, csid, err := read_basic_header(conn) if err != nil { return nil, err } switch format { case 0: chnkstream_ptr := new(chnk_stream) open_chnkstrms[csid] = chnkstream_ptr msg_ptr := new(message) if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr); err != nil { return nil, err } if err := read_chunk_data(conn, msg_ptr, chnk_size); err != nil { return nil, err } if msg_ptr.msg_len > chnk_size { open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr return nil, nil } else { return msg_ptr, nil } } conn.SetDeadline(time.Time{}) return nil, nil }