package rtmp import ( "net" "time" "encoding/binary" ) type ChunkStream struct { timestamp uint32 last_msg_strm_id uint32 last_msg_len uint32 last_msg_type uint8 timedelta uint32 } type Message struct { data []byte curr_bytes_read uint32 timestamp uint32 msg_type uint8 msg_len uint32 } type ChunkBuffers struct { time []byte msg_len []byte msg_typeid []byte msg_streamid []byte fmt_csid_byte []byte csid_true []byte } func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil { return } format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { case 0: if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil { return } csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64 case 1: if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil { return } csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64 default: csid = uint32(chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f) } return } func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extended_time bool, err error) { chunk_bufs_ptr.time[0] = 0 if _, err = conn.Read(chunk_bufs_ptr.time[1:]); err != nil { return } time = binary.BigEndian.Uint32(chunk_bufs_ptr.time) if time ^ 0xffffff == 0 { extended_time = true } return } func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_len[1:]); err != nil { return } msg_len = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_len) return } func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint8, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_typeid); err != nil { return } msg_type = chunk_bufs_ptr.msg_typeid[0] return } func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streamid uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.msg_streamid); err != nil { return } msg_streamid = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_streamid) return } func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, err error) { if _, err = conn.Read(chunk_bufs_ptr.time); err != nil { return } time = binary.BigEndian.Uint32(chunk_bufs_ptr.time) return } func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr) if err != nil { return err } msg_ptr.msg_len, err = read_msg_len(conn, chunk_bufs_ptr) if err != nil { return err } msg_ptr.data = make([]byte, msg_ptr.msg_len) chnk_stream_ptr.last_msg_len = msg_ptr.msg_len msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr) if err != nil { return err } chnk_stream_ptr.last_msg_type = msg_ptr.msg_type chnk_stream_ptr.last_msg_strm_id, err = read_msg_streamid(conn, chunk_bufs_ptr) if err != nil { return err } if extended_time { chnk_stream_ptr.timestamp, err = read_time_extd(conn, chunk_bufs_ptr) if err != nil { return err } } msg_ptr.timestamp = chnk_stream_ptr.timestamp return nil } func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error chnk_stream_ptr.timedelta, extended_time, err = read_time(conn, chunk_bufs_ptr) if err != nil { return err } msg_ptr.msg_len, err = read_msg_len(conn, chunk_bufs_ptr) if err != nil { return err } msg_ptr.data = make([]byte, msg_ptr.msg_len) chnk_stream_ptr.last_msg_len = msg_ptr.msg_len msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr) if err != nil { return err } chnk_stream_ptr.last_msg_type = msg_ptr.msg_type if extended_time { chnk_stream_ptr.timedelta, err = read_time_extd(conn, chunk_bufs_ptr) if err != nil { return err } } chnk_stream_ptr.timestamp += chnk_stream_ptr.timedelta msg_ptr.timestamp = chnk_stream_ptr.timestamp return nil } func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) { var extended_time bool var err error chnk_stream_ptr.timedelta, extended_time, err = read_time(conn, chunk_bufs_ptr) if err != nil { return err } msg_ptr.msg_len = chnk_stream_ptr.last_msg_len msg_ptr.data = make([]byte, msg_ptr.msg_len) msg_ptr.msg_type = chnk_stream_ptr.last_msg_type if extended_time { chnk_stream_ptr.timedelta, err = read_time_extd(conn, chunk_bufs_ptr) if err != nil { return err } } chnk_stream_ptr.timestamp += chnk_stream_ptr.timedelta msg_ptr.timestamp = chnk_stream_ptr.timestamp return nil } func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) { bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read var buffer_end uint32 if bytes_left < chnk_size { buffer_end = bytes_left + msg_ptr.curr_bytes_read } else { buffer_end = chnk_size + msg_ptr.curr_bytes_read } n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end]) if err != nil { return 0, err } msg_ptr.curr_bytes_read += uint32(n) return uint32(n), nil } func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){ conn.SetDeadline(time.Now().Add(10 * time.Second)) var chnkstream_ptr *ChunkStream var msg_ptr *Message format, csid, err := read_basic_header(conn, chunk_bufs_ptr) if err != nil { return nil, err } if csid < 64 { *n_read += 1 } else if csid < 320 { *n_read += 2 } else { *n_read += 3 } switch format { case 0: chnkstream_ptr = new(ChunkStream) open_chnkstrms[csid] = chnkstream_ptr msg_ptr = new(Message) if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } *n_read += 11 if msg_ptr.timestamp > 0xffffff { *n_read += 4 } case 1: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) if err := read_message_header_1(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } *n_read += 7 if msg_ptr.timestamp > 0xffffff { *n_read += 4 } case 2: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = new(Message) if err := read_message_header_2(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil { return nil, err } *n_read += 3 if msg_ptr.timestamp > 0xffffff { *n_read += 4 } case 3: chnkstream_ptr = open_chnkstrms[csid] msg_ptr = open_msgs[chnkstream_ptr.last_msg_strm_id] } n, err := read_chunk_data(conn, msg_ptr, chnk_size) if err != nil { return nil, err } *n_read += n conn.SetDeadline(time.Time{}) if msg_ptr.curr_bytes_read < msg_ptr.msg_len { open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr return nil, nil } else { delete(open_msgs, chnkstream_ptr.last_msg_strm_id) return msg_ptr, nil } }