From 184dac910a2cc91e5ef3729c5a4623667fff0e9c Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Mon, 18 Sep 2023 11:51:55 +0500 Subject: [PATCH] Basic UDP tunneling intake --- main.go | 6 ++++- srt/intake.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++ srt/packet.go | 8 +++++++ srt/server.go | 21 +++++++++++++++++ srt/tunnel.go | 12 ++++++++++ 5 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 srt/intake.go create mode 100644 srt/packet.go create mode 100644 srt/server.go create mode 100644 srt/tunnel.go diff --git a/main.go b/main.go index a1620f2..f540bf6 100644 --- a/main.go +++ b/main.go @@ -3,14 +3,16 @@ package main import ( "stream_server/rtmp" "stream_server/http" + "stream_server/srt" ) const ( SRVTYPE_RTMP uint8 = iota + SRVTYPE_SRT ) func main() { - err := NewIngestServer(SRVTYPE_RTMP, "7878") + err := NewIngestServer(SRVTYPE_SRT, "7878") if err != nil { panic(err) } @@ -27,6 +29,8 @@ func NewIngestServer(srvr_type uint8, port string) (error) { switch srvr_type { case 0: err = rtmp.NewServer(port) + case 1: + err = srt.NewServer(port) } return err } diff --git a/srt/intake.go b/srt/intake.go new file mode 100644 index 0000000..ab0b0e1 --- /dev/null +++ b/srt/intake.go @@ -0,0 +1,65 @@ +package srt + +import ( + "net" + "fmt" +) + +type Intake struct { + max_conns int + tunnels []*Tunnel + socket net.PacketConn + buffer []byte +} + +func NewIntake(l net.PacketConn, max_conns int) (*Intake) { + intake := new(Intake) + intake.max_conns = max_conns + intake.tunnels = make([]*Tunnel, 0) + intake.buffer = make([]byte, 1500) + intake.socket = l + + return intake +} + +func (intake *Intake) NewTunnel(l net.PacketConn, peer net.Addr) (*Tunnel) { + if len(intake.tunnels) < intake.max_conns { + tunnel := new(Tunnel) + tunnel.socket = l + tunnel.peer = peer + tunnel.queue = make(chan *Packet, 10) + return tunnel + } + return nil +} + +func (intake *Intake) get_tunnel(peer net.Addr) (*Tunnel) { + var tunnel *Tunnel + for i := 0; i < len(intake.tunnels); i++ { + if intake.tunnels[i].broken { + intake.tunnels = append(intake.tunnels[:i], intake.tunnels[i+1:]...) + i-- + continue + } + if intake.tunnels[i].peer.String() == peer.String() { + tunnel = intake.tunnels[i] + } + } + if tunnel == nil { + tunnel = intake.NewTunnel(intake.socket, peer) + } + return tunnel +} + +func (intake *Intake) Read() { + n, peer, err := intake.socket.ReadFrom(intake.buffer) + if err != nil { + return + } + tunnel := intake.get_tunnel(peer) + if tunnel == nil { + return + } + fmt.Println(string(intake.buffer[:n])) + tunnel.queue <- &(Packet{intake.buffer[:n]}) +} diff --git a/srt/packet.go b/srt/packet.go new file mode 100644 index 0000000..512f481 --- /dev/null +++ b/srt/packet.go @@ -0,0 +1,8 @@ +package srt + +import ( +) + +type Packet struct { + raw []byte +} diff --git a/srt/server.go b/srt/server.go new file mode 100644 index 0000000..556d1b6 --- /dev/null +++ b/srt/server.go @@ -0,0 +1,21 @@ +package srt + +import ( + "net" +) + +func NewServer(port string) (error) { + l, err := net.ListenPacket("udp", ":" + port) + if err != nil { + return err + } + go start(l) + return nil +} + +func start(l net.PacketConn) { + intake := NewIntake(l, 1) + for { + intake.Read() + } +} diff --git a/srt/tunnel.go b/srt/tunnel.go new file mode 100644 index 0000000..e2427ee --- /dev/null +++ b/srt/tunnel.go @@ -0,0 +1,12 @@ +package srt + +import ( + "net" +) + +type Tunnel struct { + socket net.PacketConn + peer net.Addr + queue chan *Packet + broken bool +}