diff --git a/rtmp/amf/command.go b/rtmp/amf/command.go index 81291bf..925b1ab 100644 --- a/rtmp/amf/command.go +++ b/rtmp/amf/command.go @@ -6,27 +6,22 @@ import ( func (amf_obj_root AMFObj) ProcessConnect() (err error) { err = errors.New("Bad AMF connect command") - if _, ok := amf_obj_root[0]; !ok { - return - } else if command_string, ok := amf_obj_root[0].(string); !ok || command_string != "connect" { + if !check_object(amf_obj_root, 0, "connect") { return } - if _, ok := amf_obj_root[1]; !ok { - return - } else if transac_id_float, ok := amf_obj_root[1].(float64); !ok || transac_id_float != 1.0 { - return + if !check_object(amf_obj_root, 1, 1.0) { + return } + if _, ok := amf_obj_root[2]; !ok { return } else if _, ok := amf_obj_root[2].(AMFObj); !ok { return - } else if _, ok := amf_obj_root[2].(AMFObj)["app"]; !ok { - return - } else if _, ok := amf_obj_root[2].(AMFObj)["app"].(string); !ok { - return - } else if amf_obj_root[2].(AMFObj)["app"].(string) != "live" { + } + + if !check_object(amf_obj_root[2].(AMFObj), "app", "live") { return } err = nil @@ -35,9 +30,7 @@ func (amf_obj_root AMFObj) ProcessConnect() (err error) { func (amf_obj_root AMFObj) ProcessCreateStream(trans_id *float64) (err error) { err = errors.New("Bad AMF create stream") - if _, ok := amf_obj_root[0]; !ok { - return - } else if command_string, ok := amf_obj_root[0].(string); !ok || command_string != "createStream" { + if !check_object(amf_obj_root, 0, "createStream") { return } @@ -53,6 +46,34 @@ func (amf_obj_root AMFObj) ProcessCreateStream(trans_id *float64) (err error) { return } +func (amf_obj_root AMFObj) ProcessPublish(trans_id *float64, stream_key *string) (err error) { + err = errors.New("Bad publish") + if !check_object(amf_obj_root, 0, "publish") { + return + } + if _, ok := amf_obj_root[1]; !ok { + return + } else if transac_id_float, ok := amf_obj_root[1].(float64); ok { + *trans_id = transac_id_float + } else { + return + } + + if _, ok := amf_obj_root[3]; !ok { + return + } else if stream_key_val, ok := amf_obj_root[3].(string); ok { + *stream_key = stream_key_val + } else { + return + } + + if !check_object(amf_obj_root, 4, "live") { + return + } + err = nil + return +} + func EncodeConnectResponse() ([]byte, error) { amf_root_obj := make(AMFObj) amf_root_obj[0] = "_result" @@ -82,3 +103,25 @@ func EncodeCreateStreamResponse(trans_id float64) ([]byte, error) { return Encode(amf_root_obj) } + +func EncodePublishResponse(trans_id float64) ([]byte, error) { + amf_root_obj := make(AMFObj) + amf_root_obj[0] = "onStatus" + amf_root_obj[1] = trans_id + amf_root_obj[2] = nil + amf_root_obj[3] = make(AMFObj) + + amf_event_obj := amf_root_obj[3].(AMFObj) + amf_event_obj["level"] = "status" + amf_event_obj["code"] = "NetStream.Publish.Start" + amf_event_obj["description"] = "Start Publishing" + + return Encode(amf_root_obj) +} + +func check_object(amf_obj AMFObj, key interface{}, target interface{}) (bool) { + if val, ok := amf_obj[key]; ok && val == target{ + return true + } + return false +} diff --git a/rtmp/chunk_wrap.go b/rtmp/chunk_wrap.go index d2ed0b7..399a9b6 100644 --- a/rtmp/chunk_wrap.go +++ b/rtmp/chunk_wrap.go @@ -172,3 +172,35 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteCreateStreamResponse() (error) { } return nil } + +func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) { + publish_cmd, err := chnk_wrp_ptr.ReadChunk() + if err != nil || publish_cmd.msg_type != 20 { + return err + } + amf_obj, err := amf.DecodeAMF(&(publish_cmd.data)) + if err != nil { + return err + } + err = amf_obj.ProcessPublish(&(chnk_wrp_ptr.params.trans_id), &(chnk_wrp_ptr.params.stream_key)) + if err != nil { + return err + } + return nil +} + +func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) { + msg_ptr := new(Message) + msg_ptr.msg_type = 20 + msg_data, err := amf.EncodePublishResponse(chnk_wrp_ptr.params.trans_id) + if err != nil { + return err + } + msg_ptr.msg_len = uint32(len(msg_data)) + msg_ptr.data = msg_data + + if err := chnk_wrp_ptr.WriteChunk(3, 1, msg_ptr); err != nil { + return err + } + return nil +}