package rtmp import ( "net" "encoding/binary" "stream_server/rtmp/amf" "stream_server/rtmp/flv" "errors" ) // meta wrapper for each connection, consolidates a bunch of info // for each individual connection together to make calling reads divorced from // chunking // params -> see server.go // open_chnkstrms + open_msgs + chunk_bufs -> see chunk.go type ChunkWrapper struct { conn net.Conn params *ProtocolParams open_chnkstrms map[uint32]*ChunkStream open_msgs map[uint32]*Message chunk_buffs *ChunkBuffers } // inits the above struct object for a given connection, most values // can be init as the data type default func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) { chnk_wrp_ptr = new(ChunkWrapper) chnk_wrp_ptr.conn = conn chnk_wrp_ptr.params = new(ProtocolParams) chnk_wrp_ptr.params.chunk_size = 256 // since only small messages sent, no real need to be large chnk_wrp_ptr.params.peer_chunk_size = 4096 // can be arbitrary, just set here to OBS default chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream) chnk_wrp_ptr.open_msgs = make(map[uint32]*Message) buffers := ChunkBuffers{ make([]byte, 4), make([]byte, 4), make([]byte, 1), make([]byte, 4), make([]byte, 1), make([]byte, 2), } chnk_wrp_ptr.chunk_buffs = &buffers return } // basic wrapper for ReadChunk from chunk.go. just forwards the relevant data and pointers. func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) { full_msg_ptr, err := ReadChunk( chnk_wrp_ptr.conn, chnk_wrp_ptr.open_chnkstrms, chnk_wrp_ptr.open_msgs, chnk_wrp_ptr.params.peer_chunk_size, chnk_wrp_ptr.chunk_buffs, &(chnk_wrp_ptr.params.curr_read), ) if err != nil { return nil, err } return full_msg_ptr, nil } // basic wrapper for WriteChunk from chunk2.go, see above func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg_ptr *Message) (error) { err := WriteChunk( chnk_wrp_ptr.conn, csid, msg_strmid, msg_ptr, chnk_wrp_ptr.params.chunk_size, ) if err != nil { return err } return nil } // see section 5.4.1 of RTMP spec doc func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 { return errors.New("Bad chunksize msg") } chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data) return nil } // see 7.2.1.1 func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || connect_cmd_msg.msg_type != 20 { return errors.New("Bad connect") } amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data)) if err != nil { return err } err = amf_obj.ProcessConnect() if err != nil { return err } return err } // 5.4.4 func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 5 msg_ptr.data = make([]byte, 4) msg_ptr.msg_len = 4 binary.BigEndian.PutUint32(msg_ptr.data, 5000000) if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil { return err } return nil } // 5.4.5 func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 6 msg_ptr.data = make([]byte, 5) msg_ptr.msg_len = 5 binary.BigEndian.PutUint32(msg_ptr.data, 5000000) msg_ptr.data[4] = 2 if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil { return err } return nil } // 5.4.1 func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 1 msg_ptr.data = make([]byte, 4) msg_ptr.msg_len = 4 binary.BigEndian.PutUint32(msg_ptr.data, chnk_wrp_ptr.params.chunk_size) if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil { return err } return nil } // you get the drill func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 20 msg_data, err := amf.EncodeConnectResponse() if err != nil { return err } msg_ptr.msg_len = uint32(len(msg_data)) msg_ptr.data = msg_data if err := chnk_wrp_ptr.WriteChunk(3, 0, msg_ptr); err != nil { return err } return nil } func (chnk_wrp_ptr *ChunkWrapper) ReadCreateStream() (error) { create_stream_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || create_stream_msg.msg_type != 20 { return errors.New("Bad create stream") } amf_obj, err := amf.DecodeAMF(&(create_stream_msg.data)) if err != nil { return err } err = amf_obj.ProcessCreateStream(&(chnk_wrp_ptr.params.trans_id)) if err != nil { return err } return nil } func (chnk_wrp_ptr *ChunkWrapper) WriteCreateStreamResponse() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 20 msg_data, err := amf.EncodeCreateStreamResponse(chnk_wrp_ptr.params.trans_id) if err != nil { return err } msg_ptr.msg_len = uint32(len(msg_data)) msg_ptr.data = msg_data if err := chnk_wrp_ptr.WriteChunk(3, 0, msg_ptr); err != nil { return err } return nil } func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) { publish_cmd, err := chnk_wrp_ptr.ReadChunk() if err != nil || publish_cmd.msg_type != 20 { return errors.New("Bad publish command") } amf_obj, err := amf.DecodeAMF(&(publish_cmd.data)) if err != nil { return err } err = amf_obj.ProcessPublish(&(chnk_wrp_ptr.params.trans_id), &(chnk_wrp_ptr.params.stream_key)) if err != nil { return err } return nil } func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) { msg_ptr := new(Message) msg_ptr.msg_type = 20 msg_data, err := amf.EncodePublishResponse(chnk_wrp_ptr.params.trans_id) if err != nil { return err } msg_ptr.msg_len = uint32(len(msg_data)) msg_ptr.data = msg_data if err := chnk_wrp_ptr.WriteChunk(3, 1, msg_ptr); err != nil { return err } return nil } // first actual data packet, write to output pipe func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) { metadata, err := chnk_wrp_ptr.ReadChunk() if err != nil || metadata.msg_type != 18 { return errors.New("Bad metadata msg") } if err = writer.WriteMetadataTag(&(metadata.data)); err != nil { return err } return nil }