stream-server/rtmp/chunk_wrap.go

97 lines
2.2 KiB
Go
Raw Normal View History

package rtmp
import (
"net"
"encoding/binary"
"stream_server/rtmp/amf"
"errors"
"strings"
"fmt"
"os"
)
type ChunkWrapper struct {
conn net.Conn
params *ProtocolParams
open_chnkstrms map[uint32]*ChunkStream
open_msgs map[uint32]*Message
chunk_buffs *ChunkBuffers
}
func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) {
chnk_wrp_ptr = new(ChunkWrapper)
chnk_wrp_ptr.conn = conn
chnk_wrp_ptr.params = &ProtocolParams{1024, 512, 0, 0, ""}
chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream)
chnk_wrp_ptr.open_msgs = make(map[uint32]*Message)
buffers := ChunkBuffers{
make([]byte, 4),
make([]byte, 4),
make([]byte, 1),
make([]byte, 4),
make([]byte, 1),
make([]byte, 2),
}
chnk_wrp_ptr.chunk_buffs = &buffers
return
}
func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) {
full_msg_ptr, err := ReadChunk(
chnk_wrp_ptr.conn,
chnk_wrp_ptr.open_chnkstrms,
chnk_wrp_ptr.open_msgs,
chnk_wrp_ptr.params.peer_chunk_size,
chnk_wrp_ptr.chunk_buffs,
&(chnk_wrp_ptr.params.curr_read),
)
if err != nil {
return nil, err
}
return full_msg_ptr, nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 {
return err
}
chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data)
return nil
}
func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || connect_cmd_msg.msg_type != 20 {
return err
}
amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data))
if err != nil {
return err
}
tcUrl, err := amf_obj.ProcessConnect()
if err != nil {
return err
}
if chnk_wrp_ptr.params.stream_key, err = check_stream_key(tcUrl); err != nil {
return err
}
return nil
}
func check_stream_key(tcUrl string) (string, error) {
err := errors.New("bad stream key")
tcUrl_split := strings.Split(tcUrl, "/")
if len(tcUrl_split) != 5 {
return "", err
}
base_dir, _ := os.UserHomeDir()
stream_dir := base_dir + "/live/" + tcUrl_split[4]
fileinfo, err := os.Stat(stream_dir)
if err == nil && fileinfo.IsDir() {
return tcUrl_split[4], nil
}
return "", err
}