96 lines
2.2 KiB
Go
96 lines
2.2 KiB
Go
package rtmp
|
|
|
|
import (
|
|
"net"
|
|
"encoding/binary"
|
|
"stream_server/rtmp/amf"
|
|
"errors"
|
|
"strings"
|
|
"fmt"
|
|
"os"
|
|
)
|
|
|
|
type ChunkWrapper struct {
|
|
conn net.Conn
|
|
params *ProtocolParams
|
|
open_chnkstrms map[uint32]*ChunkStream
|
|
open_msgs map[uint32]*Message
|
|
chunk_buffs *ChunkBuffers
|
|
}
|
|
|
|
func NewChunkWrapper(conn net.Conn) (chnk_wrp_ptr *ChunkWrapper) {
|
|
chnk_wrp_ptr = new(ChunkWrapper)
|
|
|
|
chnk_wrp_ptr.conn = conn
|
|
chnk_wrp_ptr.params = &ProtocolParams{1024, 512, 0, 0, ""}
|
|
chnk_wrp_ptr.open_chnkstrms = make(map[uint32]*ChunkStream)
|
|
chnk_wrp_ptr.open_msgs = make(map[uint32]*Message)
|
|
buffers := ChunkBuffers{
|
|
make([]byte, 4),
|
|
make([]byte, 4),
|
|
make([]byte, 1),
|
|
make([]byte, 4),
|
|
make([]byte, 1),
|
|
make([]byte, 2),
|
|
}
|
|
chnk_wrp_ptr.chunk_buffs = &buffers
|
|
return
|
|
}
|
|
|
|
func (chnk_wrp_ptr *ChunkWrapper) ReadChunk() (*Message, error) {
|
|
full_msg_ptr, err := ReadChunk(
|
|
chnk_wrp_ptr.conn,
|
|
chnk_wrp_ptr.open_chnkstrms,
|
|
chnk_wrp_ptr.open_msgs,
|
|
chnk_wrp_ptr.params.peer_chunk_size,
|
|
chnk_wrp_ptr.chunk_buffs,
|
|
&(chnk_wrp_ptr.params.curr_read),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return full_msg_ptr, nil
|
|
}
|
|
|
|
func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
|
|
set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk()
|
|
if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 {
|
|
return err
|
|
}
|
|
chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data)
|
|
return nil
|
|
}
|
|
|
|
func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
|
|
connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk()
|
|
if err != nil || connect_cmd_msg.msg_type != 20 {
|
|
return err
|
|
}
|
|
amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tcUrl, err := amf_obj.ProcessConnect()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if chnk_wrp_ptr.params.stream_key, err = check_stream_key(tcUrl); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func check_stream_key(tcUrl string) (string, error) {
|
|
err := errors.New("bad stream key")
|
|
tcUrl_split := strings.Split(tcUrl, "/")
|
|
if len(tcUrl_split) != 5 {
|
|
return "", err
|
|
}
|
|
base_dir, _ := os.UserHomeDir()
|
|
stream_dir := base_dir + "/live/" + tcUrl_split[4]
|
|
fileinfo, err := os.Stat(stream_dir)
|
|
if err == nil && fileinfo.IsDir() {
|
|
return tcUrl_split[4], nil
|
|
}
|
|
return "", err
|
|
}
|