Compare commits

...

6 commits

4 changed files with 21 additions and 12 deletions

View file

@ -8,6 +8,8 @@ Uses the std lib http server implementation for the http serving side.
**Not intended for actual use**. The stream key use is not secure and is used to handle directories without a user db system, than to provide auth. Same goes for the SRT passphrase. Also just accepts connections so will get DDOS'd immediately. **Not intended for actual use**. The stream key use is not secure and is used to handle directories without a user db system, than to provide auth. Same goes for the SRT passphrase. Also just accepts connections so will get DDOS'd immediately.
**Update**: main branch has been tested over a network. RTMP should now work fine, SRT "works" with a few modifications. The RTT ping log-buffer slice must be extended depending on network latency (try ~2x latency/10ms, could adjust automatically, but eh). Also will inevitably crash on poor connections due to the lack of DROPSEQ handling, but will work perfectly fine during the initial 3-10s you get.
Limits to a single stream at a time, mostly for the lack of db to handle connections and user information rather than concurrency problems. Limits to a single stream at a time, mostly for the lack of db to handle connections and user information rather than concurrency problems.
Currently always transcodes to vp9 + opus, segments to fragmented mp4. Creates one segment playlist, no manifest. Uses ffmpeg Currently always transcodes to vp9 + opus, segments to fragmented mp4. Creates one segment playlist, no manifest. Uses ffmpeg

View file

@ -1,6 +1,7 @@
package rtmp package rtmp
import ( import (
"io"
"net" "net"
"time" "time"
"encoding/binary" "encoding/binary"
@ -42,18 +43,18 @@ type ChunkBuffers struct {
// reads the initial variable size header that defines the chunk's format and chunkstream id // reads the initial variable size header that defines the chunk's format and chunkstream id
// 5.3.1.1 // 5.3.1.1
func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) { func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.fmt_csid_byte); err != nil {
return return
} }
format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) // get first 2 bits for format 0-3 format = uint8(chunk_bufs_ptr.fmt_csid_byte[0] >> 6) // get first 2 bits for format 0-3
switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { // last 6 bits 0-63 switch chunk_bufs_ptr.fmt_csid_byte[0] & 0x3f { // last 6 bits 0-63
case 0: // csid 0 is invalid, means true csid is the next byte, + 64 for the 6 bits prior case 0: // csid 0 is invalid, means true csid is the next byte, + 64 for the 6 bits prior
if _, err = conn.Read(chunk_bufs_ptr.csid_true[1:]); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.csid_true[1:]); err != nil {
return return
} }
csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64 csid = uint32(chunk_bufs_ptr.csid_true[1]) + 64
case 1: // csid 1 is invalid, means true csid is in the next 2 bytes, reverse order (little endian) and the 64, reconstruct case 1: // csid 1 is invalid, means true csid is in the next 2 bytes, reverse order (little endian) and the 64, reconstruct
if _, err = conn.Read(chunk_bufs_ptr.csid_true); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.csid_true); err != nil {
return return
} }
csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64 csid = uint32(binary.LittleEndian.Uint16(chunk_bufs_ptr.csid_true)) + 64
@ -65,7 +66,7 @@ func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint
func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extended_time bool, err error) { func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extended_time bool, err error) {
chunk_bufs_ptr.time[0] = 0 chunk_bufs_ptr.time[0] = 0
if _, err = conn.Read(chunk_bufs_ptr.time[1:]); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.time[1:]); err != nil {
return return
} }
time = binary.BigEndian.Uint32(chunk_bufs_ptr.time) time = binary.BigEndian.Uint32(chunk_bufs_ptr.time)
@ -76,7 +77,7 @@ func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extend
} }
func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32, err error) { func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_len[1:]); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.msg_len[1:]); err != nil {
return return
} }
msg_len = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_len) msg_len = binary.BigEndian.Uint32(chunk_bufs_ptr.msg_len)
@ -84,7 +85,7 @@ func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32,
} }
func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint8, err error) { func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint8, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_typeid); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.msg_typeid); err != nil {
return return
} }
msg_type = chunk_bufs_ptr.msg_typeid[0] msg_type = chunk_bufs_ptr.msg_typeid[0]
@ -92,7 +93,7 @@ func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint
} }
func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streamid uint32, err error) { func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streamid uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.msg_streamid); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.msg_streamid); err != nil {
return return
} }
msg_streamid = binary.LittleEndian.Uint32(chunk_bufs_ptr.msg_streamid) msg_streamid = binary.LittleEndian.Uint32(chunk_bufs_ptr.msg_streamid)
@ -100,7 +101,7 @@ func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streami
} }
func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, err error) { func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, err error) {
if _, err = conn.Read(chunk_bufs_ptr.time); err != nil { if _, err = io.ReadFull(conn, chunk_bufs_ptr.time); err != nil {
return return
} }
time = binary.BigEndian.Uint32(chunk_bufs_ptr.time) time = binary.BigEndian.Uint32(chunk_bufs_ptr.time)
@ -225,7 +226,7 @@ func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32,
} else { } else {
buffer_end = chnk_size + msg_ptr.curr_bytes_read buffer_end = chnk_size + msg_ptr.curr_bytes_read
} }
n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end]) n, err := io.ReadFull(conn, msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end])
if err != nil { if err != nil {
return 0, err return 0, err
} }

View file

@ -1,6 +1,7 @@
package rtmp package rtmp
import ( import (
"io"
"net" "net"
"time" "time"
"encoding/binary" "encoding/binary"
@ -17,7 +18,7 @@ func DoHandshake(conn net.Conn) (hs_success bool) {
// force handshake to finish in under 15 seconds (aribtrary) or throw an error // force handshake to finish in under 15 seconds (aribtrary) or throw an error
conn.SetDeadline(time.Now().Add(15 * time.Second)) conn.SetDeadline(time.Now().Add(15 * time.Second))
if _, err := conn.Read(C0C1C2); err != nil || C0C1C2[0] != 3 { if _, err := io.ReadFull(conn, C0C1C2[:1+1536]); err != nil || C0C1C2[0] != 3 {
return return
} }
copy(C0C1C2[1:1536], S0S1S2[1+1536:]) copy(C0C1C2[1:1536], S0S1S2[1+1536:])
@ -26,7 +27,7 @@ func DoHandshake(conn net.Conn) (hs_success bool) {
if _, err := conn.Write(S0S1S2); err != nil { // specs say only send S0S1 and wait for C2 before sending S2, obs doesn't care apparently if _, err := conn.Write(S0S1S2); err != nil { // specs say only send S0S1 and wait for C2 before sending S2, obs doesn't care apparently
return return
} }
if _, err := conn.Read(C0C1C2[1+1536:]); err != nil { if _, err := io.ReadFull(conn, C0C1C2[1+1536:]); err != nil {
return return
} }
hs_success = true hs_success = true

View file

@ -147,7 +147,12 @@ func (agent *SRTManager) process_conclusion(packet *Packet) (*Packet) {
resp_packet := agent.create_conclusion_resp() resp_packet := agent.create_conclusion_resp()
if packet.packet_type == HANDSHAKE { if packet.packet_type == HANDSHAKE {
hs_cif := packet.cif.(*HandshakeCIF) hs_cif := packet.cif.(*HandshakeCIF)
if hs_cif.hs_type == 0xffffffff && hs_cif.syn_cookie == agent.syn_cookie {
// allow previous shotgunned induction requests to dissipate
if hs_cif.hs_type != 0xffffffff {
return nil
}
if hs_cif.syn_cookie == agent.syn_cookie {
for _, v := range hs_cif.hs_extensions { for _, v := range hs_cif.hs_extensions {
// force client to add a stream_id for output location // force client to add a stream_id for output location
// to do: add encryption handling // to do: add encryption handling