stream-server/srt/intake.go

81 lines
1.9 KiB
Go
Raw Permalink Normal View History

2023-09-18 11:51:55 +05:00
package srt
import (
"net"
)
type Intake struct {
max_conns int
tunnels []*Tunnel
socket net.PacketConn
buffer []byte
}
func NewIntake(l net.PacketConn, max_conns int) (*Intake) {
intake := new(Intake)
intake.max_conns = max_conns
intake.tunnels = make([]*Tunnel, 0)
2023-09-22 14:58:33 +05:00
intake.buffer = make([]byte, 1500) // each packet is restricted to a max size of 1500
2023-09-18 11:51:55 +05:00
intake.socket = l
return intake
}
func (intake *Intake) NewTunnel(l net.PacketConn, peer net.Addr) (*Tunnel) {
if len(intake.tunnels) < intake.max_conns {
tunnel := new(Tunnel)
tunnel.socket = l
tunnel.peer = peer
2023-09-22 14:58:33 +05:00
tunnel.queue = make(chan []byte, 10) // packet buffer, will cause packet loss if low
intake.tunnels = append(intake.tunnels, tunnel)
2023-09-22 14:58:33 +05:00
go tunnel.Start() // start the tunnel SRT processing
2023-09-18 11:51:55 +05:00
return tunnel
}
return nil
}
func (intake *Intake) get_tunnel(peer net.Addr) (*Tunnel) {
var tunnel *Tunnel
for i := 0; i < len(intake.tunnels); i++ {
2023-09-22 14:58:33 +05:00
// check if tunnels are broken and remove
2023-09-18 11:51:55 +05:00
if intake.tunnels[i].broken {
2023-09-20 15:06:32 +05:00
intake.tunnels[i].Shutdown()
2023-09-18 11:51:55 +05:00
intake.tunnels = append(intake.tunnels[:i], intake.tunnels[i+1:]...)
i--
continue
}
if intake.tunnels[i].peer.String() == peer.String() {
tunnel = intake.tunnels[i]
}
}
2023-09-22 14:58:33 +05:00
// if no tunnel was found, make one
// should be after conclusion handshake, but wanted to keep all protocol
// related actions separate from UDP handling
2023-09-18 11:51:55 +05:00
if tunnel == nil {
tunnel = intake.NewTunnel(intake.socket, peer)
}
return tunnel
}
2023-09-20 15:06:32 +05:00
2023-09-18 11:51:55 +05:00
func (intake *Intake) Read() {
n, peer, err := intake.socket.ReadFrom(intake.buffer)
if err != nil {
2023-09-22 14:58:33 +05:00
return // ignore UDP errors
2023-09-18 11:51:55 +05:00
}
2023-09-22 14:58:33 +05:00
// find the SRT/UDT tunnel corresponding to the given peer
2023-09-18 11:51:55 +05:00
tunnel := intake.get_tunnel(peer)
if tunnel == nil {
return
}
pkt := make([]byte, n)
copy(pkt, intake.buffer[:n])
2023-09-22 14:58:33 +05:00
// send a copy to the corresponding tunnels packet queue if not full
select {
case tunnel.queue <- pkt:
default:
2023-09-21 16:11:27 +05:00
//tunnel.broken = true
}
2023-09-18 11:51:55 +05:00
}