metadata handling, errors for invalid message type but successful read, flv byte write index removed since unneeded

This commit is contained in:
Muaz Ahmad 2023-08-18 13:24:54 +05:00
parent cbf1492b0e
commit fc4560076c
3 changed files with 40 additions and 11 deletions

View file

@ -4,6 +4,8 @@ import (
"net" "net"
"encoding/binary" "encoding/binary"
"stream_server/rtmp/amf" "stream_server/rtmp/amf"
"stream_server/rtmp/flv"
"errors"
) )
type ChunkWrapper struct { type ChunkWrapper struct {
@ -67,7 +69,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteChunk(csid uint32, msg_strmid uint32, msg
func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk() set_chunk_size_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 { if err != nil || set_chunk_size_msg.msg_type != 1 || set_chunk_size_msg.msg_len != 4 {
return err return errors.New("Bad chunksize msg")
} }
chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data) chnk_wrp_ptr.params.peer_chunk_size = binary.BigEndian.Uint32(set_chunk_size_msg.data)
return nil return nil
@ -76,7 +78,7 @@ func (chnk_wrp_ptr *ChunkWrapper) ReadPeerChunkSize() (error) {
func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadConnectCommand() (error) {
connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk() connect_cmd_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || connect_cmd_msg.msg_type != 20 { if err != nil || connect_cmd_msg.msg_type != 20 {
return err return errors.New("Bad connect")
} }
amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data)) amf_obj, err := amf.DecodeAMF(&(connect_cmd_msg.data))
if err != nil { if err != nil {
@ -144,7 +146,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteConnectResponse() (error) {
func (chnk_wrp_ptr *ChunkWrapper) ReadCreateStream() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadCreateStream() (error) {
create_stream_msg, err := chnk_wrp_ptr.ReadChunk() create_stream_msg, err := chnk_wrp_ptr.ReadChunk()
if err != nil || create_stream_msg.msg_type != 20 { if err != nil || create_stream_msg.msg_type != 20 {
return err return errors.New("Bad create stream")
} }
amf_obj, err := amf.DecodeAMF(&(create_stream_msg.data)) amf_obj, err := amf.DecodeAMF(&(create_stream_msg.data))
if err != nil { if err != nil {
@ -176,7 +178,7 @@ func (chnk_wrp_ptr *ChunkWrapper) WriteCreateStreamResponse() (error) {
func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) { func (chnk_wrp_ptr *ChunkWrapper) ReadPublish() (error) {
publish_cmd, err := chnk_wrp_ptr.ReadChunk() publish_cmd, err := chnk_wrp_ptr.ReadChunk()
if err != nil || publish_cmd.msg_type != 20 { if err != nil || publish_cmd.msg_type != 20 {
return err return errors.New("Bad publish command")
} }
amf_obj, err := amf.DecodeAMF(&(publish_cmd.data)) amf_obj, err := amf.DecodeAMF(&(publish_cmd.data))
if err != nil { if err != nil {
@ -204,3 +206,14 @@ func (chnk_wrp_ptr *ChunkWrapper) WritePublishResponse() (error) {
} }
return nil return nil
} }
func (chnk_wrp_ptr *ChunkWrapper) ReadMetadata(writer *flv.FLVWriter) (error) {
metadata, err := chnk_wrp_ptr.ReadChunk()
if err != nil || metadata.msg_type != 18 {
return errors.New("Bad metadata msg")
}
if err = writer.WriteMetadataTag(&(metadata.data)); err != nil {
return err
}
return nil
}

View file

@ -7,9 +7,13 @@ import (
func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) { func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) {
file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key) file_writer, err := flv.NewFLVWriter(chnk_wrp_ptr.params.stream_key)
defer file_writer.Close()
if err != nil { if err != nil {
return return
} }
if err = chnk_wrp_ptr.ReadMetadata(file_writer); err != nil {
return
}
for { for {
p, err := chnk_wrp_ptr.ReadChunk() p, err := chnk_wrp_ptr.ReadChunk()
if err != nil { if err != nil {
@ -19,5 +23,4 @@ func HandleDataLoop(chnk_wrp_ptr *ChunkWrapper) {
fmt.Println(p.msg_type, p.msg_len) fmt.Println(p.msg_type, p.msg_len)
} }
} }
file_writer.Close()
} }

View file

@ -3,11 +3,11 @@ package flv
import ( import (
"io" "io"
"os" "os"
"encoding/binary"
) )
type FLVWriter struct { type FLVWriter struct {
W io.Writer w io.Writer
curr_offset int
} }
func NewFLVWriter(stream_dir string) (*FLVWriter, error) { func NewFLVWriter(stream_dir string) (*FLVWriter, error) {
@ -17,7 +17,7 @@ func NewFLVWriter(stream_dir string) (*FLVWriter, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
writer.W = filepipe writer.w = filepipe
if err = writer.write_flv_header(); err != nil { if err = writer.write_flv_header(); err != nil {
return nil, err return nil, err
} }
@ -25,7 +25,21 @@ func NewFLVWriter(stream_dir string) (*FLVWriter, error) {
} }
func (writer *FLVWriter) Close() (error) { func (writer *FLVWriter) Close() (error) {
return writer.W.(*os.File).Close() return writer.w.(*os.File).Close()
}
func (writer *FLVWriter) WriteMetadataTag(data *[]byte) (err error) {
uint24_buf := make([]byte, 4)
tag_header := make([]byte, 11)
tag_header[0] = 18
binary.BigEndian.PutUint32(uint24_buf, uint32(len((*data)[16:])))
copy(tag_header[1:4], uint24_buf[1:])
_, err = writer.w.Write(tag_header)
if err == nil {
_, err = writer.w.Write((*data)[16:])
}
return
} }
func (writer *FLVWriter) write_flv_header() (err error) { func (writer *FLVWriter) write_flv_header() (err error) {
@ -35,8 +49,7 @@ func (writer *FLVWriter) write_flv_header() (err error) {
header[4] = 5 header[4] = 5
header[8] = 9 header[8] = 9
n, err := writer.W.Write(header) _, err = writer.w.Write(header)
writer.curr_offset += n
return return
} }