stream-server/rtmp/chunk_wrap.go

206 lines
4.9 KiB
Go

package rtmp
import (
"net"
"encoding/binary"
"stream_server/rtmp/amf"
)
type ChunkWrapper struct {
conn net.Conn
params *ProtocolParams
open_chnkstrms map[uint32]*ChunkStream
open_msgs map[uint32]*Message
chunk_buffs *ChunkBuffers
}
func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) {
chnk_wrp_ptr = new(ChunkWrapper)
chnk_wrp_ptr.conn = conn
chnk_wrp_ptr.params = new(ProtocolParams)
chnk_wrp_ptr.params.chunk_size = 256
chnk_wrp_ptr.params.peer_chunk_size = 4096
chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream)
chnk_wrp_ptr.open_msgs = make(map[uint32]*Message)
buffers := ChunkBuffers{
make([]byte, 4),
make([]byte, 4),
make([]byte, 1),
make([]byte, 4),
make([]byte, 1),
make([]byte, 2),
}
chnk_wrp_ptr.chunk_buffs = &buffers
return
}
func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) {
full_msg_ptr, err := ReadChunk(
chnk_wrp_ptr.conn,
chnk_wrp_ptr.open_chnkstrms,
chnk_wrp_ptr.open_msgs,
chnk_wrp_ptr.params.peer_chunk_size,
chnk_wrp_ptr.chunk_buffs,
&(chnk_wrp_ptr.params.curr_read),
)
if err != nil {
return nil, err
}
return full_msg_ptr, nil
}
func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg_ptr *Message) (error) {
err := WriteChunk(
chnk_wrp_ptr.conn,
csid,
msg_strmid,
msg_ptr,
chnk_wrp_ptr.params.chunk_size,
)
if err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 {
return err
}
chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data)
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || connect_cmd_msg.msg_type != 20 {
return err
}
amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data))
if err != nil {
return err
}
err = amf_obj.ProcessConnect()
if err != nil {
return err
}
return err
}
func (chnk_wrp_ptr *ChunkWrapper) WriteWindowAckSize() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 5
msg_ptr.data = make([]byte, 4)
msg_ptr.msg_len = 4
binary.BigEndian.PutUint32(msg_ptr.data, 5000000)
if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) WritePeerBandwidth() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 6
msg_ptr.data = make([]byte, 5)
msg_ptr.msg_len = 5
binary.BigEndian.PutUint32(msg_ptr.data, 5000000)
msg_ptr.data[4] = 2
if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) WriteChunkSize() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 1
msg_ptr.data = make([]byte, 4)
msg_ptr.msg_len = 4
binary.BigEndian.PutUint32(msg_ptr.data, chnk_wrp_ptr.params.chunk_size)
if err := chnk_wrp_ptr.WriteChunk(2, 0, msg_ptr); err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 20
msg_data, err := amf.EncodeConnectResponse()
if err != nil {
return err
}
msg_ptr.msg_len = uint32(len(msg_data))
msg_ptr.data = msg_data
if err := chnk_wrp_ptr.WriteChunk(3, 0, msg_ptr); err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadCreateStream() (error) {
create_stream_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || create_stream_msg.msg_type != 20 {
return err
}
amf_obj, err := amf.DecodeAMF(&(create_stream_msg.data))
if err != nil {
return err
}
err = amf_obj.ProcessCreateStream(&(chnk_wrp_ptr.params.trans_id))
if err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) WriteCreateStreamResponse() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 20
msg_data, err := amf.EncodeCreateStreamResponse(chnk_wrp_ptr.params.trans_id)
if err != nil {
return err
}
msg_ptr.msg_len = uint32(len(msg_data))
msg_ptr.data = msg_data
if err := chnk_wrp_ptr.WriteChunk(3, 0, msg_ptr); err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) {
publish_cmd, err := chnk_wrp_ptr.ReadChunk()
if err != nil || publish_cmd.msg_type != 20 {
return err
}
amf_obj, err := amf.DecodeAMF(&(publish_cmd.data))
if err != nil {
return err
}
err = amf_obj.ProcessPublish(&(chnk_wrp_ptr.params.trans_id), &(chnk_wrp_ptr.params.stream_key))
if err != nil {
return err
}
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) {
msg_ptr := new(Message)
msg_ptr.msg_type = 20
msg_data, err := amf.EncodePublishResponse(chnk_wrp_ptr.params.trans_id)
if err != nil {
return err
}
msg_ptr.msg_len = uint32(len(msg_data))
msg_ptr.data = msg_data
if err := chnk_wrp_ptr.WriteChunk(3, 1, msg_ptr); err != nil {
return err
}
return nil
}