stream-server/rtmp/chunk.go

286 lines
7 KiB
Go
Raw Normal View History

2023-08-10 01:02:54 +05:00
package rtmp
import (
"net"
"time"
"encoding/binary"
)
type chnk_stream struct {
timestamp uint32
last_msg_strm_id uint32
2023-08-10 13:09:26 +05:00
last_msg_len uint32
last_msg_type uint8
2023-08-10 01:02:54 +05:00
timedelta uint32
}
type message struct {
data []byte
curr_bytes_read uint32
timestamp uint32
msg_type uint8
msg_len uint32
}
type chunk_bufs struct {
time []byte
msg_len []byte
msg_typeid []byte
msg_streamid []byte
fmt_csid_byte []byte
csid_true []byte
}
func new_chunk_bufs() (*chunk_bufs) {
new_chunk_bufs := chunk_bufs{
make([]byte, 4),
make([]byte, 4),
make([]byte, 1),
make([]byte, 4),
make([]byte, 1),
make([]byte, 2),
}
return &new_chunk_bufs
}
func OpenStreamsMapInit() (map[uint32]*chnk_stream, map[uint32]*message, *chunk_bufs) {
open_chnkstrms := make(map[uint32]*chnk_stream)
open_msgs := make(map[uint32]*message)
chunk_bufs_ptr := new_chunk_bufs()
return open_chnkstrms, open_msgs, chunk_bufs_ptr
}
2023-08-10 01:02:54 +05:00
func read_basic_header(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (format uint8, csid uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil {
return
2023-08-10 01:02:54 +05:00
}
format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6)
switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f {
2023-08-10 01:02:54 +05:00
case 0:
if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil {
return
2023-08-10 01:02:54 +05:00
}
csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64
2023-08-10 01:02:54 +05:00
case 1:
if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil {
return
2023-08-10 01:02:54 +05:00
}
csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64
2023-08-10 01:02:54 +05:00
default:
csid = uint32(chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f)
2023-08-10 01:02:54 +05:00
}
return
2023-08-10 01:02:54 +05:00
}
func read_time(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, extended_time bool, err error) {
chunk_bufs_ptr.time[0] = 0
if _, err = conn.Read(chunk_bufs_ptr.time[1:]); err != nil {
return
2023-08-10 01:02:54 +05:00
}
time = binary.BigEndian.Uint32(chunk_bufs_ptr.time)
if time ^ 0xffffff == 0 {
2023-08-10 01:02:54 +05:00
extended_time = true
}
return
}
func read_msg_len(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_len uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_len[1:]); err != nil {
return
}
msg_len = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_len)
return
}
2023-08-10 01:02:54 +05:00
func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_type uint8, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_typeid); err != nil {
return
}
msg_type = chunk_bufs_ptr.msg_typeid[0]
return
}
func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_streamid uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_streamid); err != nil {
return
}
msg_streamid = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_streamid)
return
}
func read_time_extd(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.time); err != nil {
return
}
time = binary.BigEndian.Uint32(chunk_bufs_ptr.time)
return
}
func read_message_header_0(conn net.Conn, chnk_stream_ptr *chnk_stream, msg_ptr *message, chunk_bufs_ptr *chunk_bufs) (error) {
var extended_time bool
var err error
chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr)
if err != nil {
return err
}
msg_ptr.msg_len, err = read_msg_len(conn, chunk_bufs_ptr)
if err != nil {
2023-08-10 01:02:54 +05:00
return err
}
msg_ptr.data = make([]byte, msg_ptr.msg_len)
2023-08-10 13:09:26 +05:00
chnk_stream_ptr.last_msg_len = msg_ptr.msg_len
2023-08-10 01:02:54 +05:00
msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr)
if err != nil {
2023-08-10 01:02:54 +05:00
return err
}
2023-08-10 13:09:26 +05:00
chnk_stream_ptr.last_msg_type = msg_ptr.msg_type
2023-08-10 01:02:54 +05:00
chnk_stream_ptr.last_msg_strm_id, err = read_msg_streamid(conn, chunk_bufs_ptr)
if err != nil {
2023-08-10 01:02:54 +05:00
return err
}
if extended_time {
chnk_stream_ptr.timestamp, err = read_time_extd(conn, chunk_bufs_ptr)
if err != nil {
2023-08-10 01:02:54 +05:00
return err
}
}
msg_ptr.timestamp = chnk_stream_ptr.timestamp
return nil
}
func read_message_header_1(conn net.Conn, chnk_stream_ptr *chnk_stream, msg_ptr *message, chunk_bufs_ptr *chunk_bufs) (error) {
var extended_time bool
var err error
chnk_stream_ptr.timedelta, extended_time, err = read_time(conn, chunk_bufs_ptr)
if err != nil {
return err
}
msg_ptr.msg_len, err = read_msg_len(conn, chunk_bufs_ptr)
if err != nil {
return err
}
msg_ptr.data = make([]byte, msg_ptr.msg_len)
2023-08-10 13:09:26 +05:00
chnk_stream_ptr.last_msg_len = msg_ptr.msg_len
msg_ptr.msg_type, err = read_msg_typeid(conn, chunk_bufs_ptr)
if err != nil {
return err
}
2023-08-10 13:09:26 +05:00
chnk_stream_ptr.last_msg_type = msg_ptr.msg_type
if extended_time {
chnk_stream_ptr.timedelta, err = read_time_extd(conn, chunk_bufs_ptr)
if err != nil {
return err
}
}
chnk_stream_ptr.timestamp += chnk_stream_ptr.timedelta
msg_ptr.timestamp = chnk_stream_ptr.timestamp
return nil
}
2023-08-10 13:09:26 +05:00
func read_message_header_2(conn net.Conn, chnk_stream_ptr *chnk_stream, msg_ptr *message, chunk_bufs_ptr *chunk_bufs) (error) {
var extended_time bool
var err error
chnk_stream_ptr.timedelta, extended_time, err = read_time(conn, chunk_bufs_ptr)
if err != nil {
return err
}
msg_ptr.msg_len = chnk_stream_ptr.last_msg_len
msg_ptr.data = make([]byte, msg_ptr.msg_len)
msg_ptr.msg_type = chnk_stream_ptr.last_msg_type
if extended_time {
chnk_stream_ptr.timedelta, err = read_time_extd(conn, chunk_bufs_ptr)
if err != nil {
return err
}
}
chnk_stream_ptr.timestamp += chnk_stream_ptr.timedelta
msg_ptr.timestamp = chnk_stream_ptr.timestamp
return nil
}
2023-08-10 01:02:54 +05:00
func read_chunk_data(conn net.Conn, msg_ptr *message, chnk_size uint32) (error) {
bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read
var buffer_end uint32
if bytes_left < chnk_size {
buffer_end = bytes_left + msg_ptr.curr_bytes_read
2023-08-10 01:02:54 +05:00
} else {
buffer_end = chnk_size + msg_ptr.curr_bytes_read
2023-08-10 01:02:54 +05:00
}
n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end])
if err != nil {
return err
}
msg_ptr.curr_bytes_read += uint32(n)
return nil
}
func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*chnk_stream, open_msgs map[uint32]*message, chnk_size uint32, chunk_bufs_ptr *chunk_bufs) (*message, error){
2023-08-10 01:02:54 +05:00
conn.SetDeadline(time.Now().Add(10 * time.Second))
var chnkstream_ptr *chnk_stream
var msg_ptr *message
2023-08-10 01:02:54 +05:00
format, csid, err := read_basic_header(conn, chunk_bufs_ptr)
2023-08-10 01:02:54 +05:00
if err != nil {
return nil, err
}
switch format {
case 0:
chnkstream_ptr = new(chnk_stream)
2023-08-10 01:02:54 +05:00
open_chnkstrms[csid] = chnkstream_ptr
msg_ptr = new(message)
2023-08-10 01:02:54 +05:00
if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
2023-08-10 01:02:54 +05:00
return nil, err
}
case 1:
chnkstream_ptr = open_chnkstrms[csid]
msg_ptr = new(message)
if err := read_message_header_1(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
2023-08-10 01:02:54 +05:00
return nil, err
}
2023-08-10 13:09:26 +05:00
case 2:
chnkstream_ptr = open_chnkstrms[csid]
msg_ptr = new(message)
if err := read_message_header_2(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
return nil, err
}
}
if err := read_chunk_data(conn, msg_ptr, chnk_size); err != nil {
return nil, err
}
if msg_ptr.curr_bytes_read < msg_ptr.msg_len {
open_msgs[chnkstream_ptr.last_msg_strm_id] = msg_ptr
return nil, nil
} else {
delete(open_msgs, chnkstream_ptr.last_msg_strm_id)
return msg_ptr, nil
2023-08-10 01:02:54 +05:00
}
conn.SetDeadline(time.Time{})
return nil, nil
}