From 0c054b67c0716342177136b54e86098452169a22 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Fri, 11 Aug 2023 13:09:36 +0500 Subject: [PATCH] connect object processing, stream key checking --- rtmp/amf/decode.go | 38 ++++++++++++++++++++++++++++++++++---- rtmp/chunk_wrap.go | 27 ++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/rtmp/amf/decode.go b/rtmp/amf/decode.go index d90b501..053e69e 100644 --- a/rtmp/amf/decode.go +++ b/rtmp/amf/decode.go @@ -1,15 +1,45 @@ package amf -import "fmt" +import ( + "errors" +) type AMFObj map[interface{}]interface{} func DecodeAMF(data *[]byte) (AMFObj, error) { obj := make(AMFObj) test := make(AMFObj) - fmt.Println("a") + obj[0] = "connect" + obj[1] = float64(1) obj[2] = test - fmt.Println(obj) - test["tcUrl"] = "test" + test["tcUrl"] = "rtmp://test-domain.nil/Desktop/tmp" return obj, nil } + +func (amf_obj_root AMFObj) ProcessConnect() (string, error) { + err := errors.New("Bad AMF connect command") + if _, ok := amf_obj_root[0]; !ok { + return "", err + } else if command_string, ok := amf_obj_root[0].(string); !ok || command_string != "connect" { + return "", err + } + + if _, ok := amf_obj_root[1]; !ok { + return "", err + } else if transac_id_float, ok := amf_obj_root[1].(float64); !ok || transac_id_float != 1.0 { + return "", err + } + + if _, ok := amf_obj_root[2]; !ok { + return "", err + } else if _, ok := amf_obj_root[2].(AMFObj); !ok { + return "", err + } else if _, ok := amf_obj_root[2].(AMFObj)["tcUrl"]; !ok { + return "", err + } else if tcUrl, ok := amf_obj_root[2].(AMFObj)["tcUrl"].(string); ok { + return tcUrl, nil + } else { + return "", err + } + +} diff --git a/rtmp/chunk_wrap.go b/rtmp/chunk_wrap.go index 490eb42..ff46e52 100644 --- a/rtmp/chunk_wrap.go +++ b/rtmp/chunk_wrap.go @@ -4,6 +4,10 @@ import ( "net" "encoding/binary" "stream_server/rtmp/amf" + "errors" + "strings" + "fmt" + "os" ) type ChunkWrapper struct { @@ -66,6 +70,27 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { if err != nil { return err } - chnk_wrp_ptr.params.stream_key = amf_obj[2].(amf.AMFObj)["tcUrl"].(string) + tcUrl, err := amf_obj.ProcessConnect() + if err != nil { + return err + } + if chnk_wrp_ptr.params.stream_key, err = check_stream_key(tcUrl); err != nil { + return err + } return nil } + +func check_stream_key(tcUrl string) (string, error) { + err := errors.New("bad stream key") + tcUrl_split := strings.Split(tcUrl, "/") + if len(tcUrl_split) != 5 { + return "", err + } + base_dir, _ := os.UserHomeDir() + stream_dir := base_dir + "/live/" + tcUrl_split[4] + fileinfo, err := os.Stat(stream_dir) + if err == nil && fileinfo.IsDir() { + return tcUrl_split[4], nil + } + return "", err +}