From fc4560076cf06850d4a03eb347dc3147d3bb1c65 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Fri, 18 Aug 2023 13:24:54 +0500 Subject: [PATCH] metadata handling, errors for invalid message type but successful read, flv byte write index removed since unneeded --- rtmp/chunk_wrap.go | 21 +++++++++++++++++---- rtmp/data_loop.go | 5 ++++- rtmp/flv/writer.go | 25 +++++++++++++++++++------ 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/rtmp/chunk_wrap.go b/rtmp/chunk_wrap.go index 399a9b6..71130c7 100644 --- a/rtmp/chunk_wrap.go +++ b/rtmp/chunk_wrap.go @@ -4,6 +4,8 @@ import ( "net" "encoding/binary" "stream_server/rtmp/amf" + "stream_server/rtmp/flv" + "errors" ) type ChunkWrapper struct { @@ -67,7 +69,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 { - return err + return errors.New("Bad chunksize msg") } chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data) return nil @@ -76,7 +78,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || connect_cmd_msg.msg_type != 20 { - return err + return errors.New("Bad connect") } amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data)) if err != nil { @@ -144,7 +146,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadCreateStream() (error) { create_stream_msg, err := chnk_wrp_ptr.ReadChunk() if err != nil || create_stream_msg.msg_type != 20 { - return err + return errors.New("Bad create stream") } amf_obj, err := amf.DecodeAMF(&(create_stream_msg.data)) if err != nil { @@ -176,7 +178,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteCreateStreamResponse() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) { publish_cmd, err := chnk_wrp_ptr.ReadChunk() if err != nil || publish_cmd.msg_type != 20 { - return err + return errors.New("Bad publish command") } amf_obj, err := amf.DecodeAMF(&(publish_cmd.data)) if err != nil { @@ -204,3 +206,14 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) { } return nil } + +func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) { + metadata, err := chnk_wrp_ptr.ReadChunk() + if err != nil || metadata.msg_type != 18 { + return errors.New("Bad metadata msg") + } + if err = writer.WriteMetadataTag(&(metadata.data)); err != nil { + return err + } + return nil +} diff --git a/rtmp/data_loop.go b/rtmp/data_loop.go index 39c31a0..e889944 100644 --- a/rtmp/data_loop.go +++ b/rtmp/data_loop.go @@ -7,9 +7,13 @@ import ( func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) + defer file_writer.Close() if err != nil { return } + if err = chnk_wrp_ptr.ReadMetadata(file_writer); err != nil { + return + } for { p, err := chnk_wrp_ptr.ReadChunk() if err != nil { @@ -19,5 +23,4 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { fmt.Println(p.msg_type, p.msg_len) } } - file_writer.Close() } diff --git a/rtmp/flv/writer.go b/rtmp/flv/writer.go index e8a0364..e70d842 100644 --- a/rtmp/flv/writer.go +++ b/rtmp/flv/writer.go @@ -3,11 +3,11 @@ package flv import ( "io" "os" + "encoding/binary" ) type FLVWriter struct { - W io.Writer - curr_offset int + w io.Writer } func NewFLVWriter(stream_dir string) (*FLVWriter, error) { @@ -17,7 +17,7 @@ func NewFLVWriter(stream_dir string) (*FLVWriter, error) { if err != nil { return nil, err } - writer.W = filepipe + writer.w = filepipe if err = writer.write_flv_header(); err != nil { return nil, err } @@ -25,7 +25,21 @@ func NewFLVWriter(stream_dir string) (*FLVWriter, error) { } func (writer *FLVWriter) Close() (error) { - return writer.W.(*os.File).Close() + return writer.w.(*os.File).Close() +} + +func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) { + uint24_buf := make([]byte, 4) + tag_header := make([]byte, 11) + tag_header[0] = 18 + binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:]))) + copy(tag_header[1:4], uint24_buf[1:]) + + _, err = writer.w.Write(tag_header) + if err == nil { + _, err = writer.w.Write((*data)[16:]) + } + return } func (writer *FLVWriter) write_flv_header() (err error) { @@ -35,8 +49,7 @@ func (writer *FLVWriter) write_flv_header() (err error) { header[4] = 5 header[8] = 9 - n, err := writer.W.Write(header) - writer.curr_offset += n + _, err = writer.w.Write(header) return }