stream-server/srt/tunnel.go

73 lines
1.5 KiB
Go
Raw Normal View History

2023-09-18 11:51:55 +05:00
package srt
import (
"net"
"fmt"
2023-09-18 11:51:55 +05:00
)
type Tunnel struct {
socket net.PacketConn
peer net.Addr
queue chan []byte
2023-09-18 11:51:55 +05:00
broken bool
state *SRTManager
}
func (tunnel *Tunnel) Start() {
defer func(a *bool) {
if r := recover(); r != nil {
fmt.Println(r)
}
*a = true
2023-09-22 14:58:33 +05:00
}(&(tunnel.broken)) // force mark tunnel for deletion if any error occurs
tunnel.state = NewSRTManager(tunnel.socket)
2023-09-22 14:58:33 +05:00
// central tunnel loop, read incoming, process and generate response
// write response if any
for {
packet, err := tunnel.ReadPacket()
if err != nil {
2023-09-21 12:40:16 +05:00
fmt.Println(err)
tunnel.broken = true
}
response, err := tunnel.state.Process(packet)
if err != nil {
2023-09-20 15:06:32 +05:00
fmt.Println(err)
tunnel.broken = true
}
if response != nil {
tunnel.WritePacket(response)
}
}
}
2023-09-22 14:58:33 +05:00
// send a shutdown command, for use when tunnel gets broken
// not ideal but works
2023-09-20 15:06:32 +05:00
func (tunnel *Tunnel) Shutdown() {
if tunnel.state != nil && tunnel.state.state > 1 {
packet := tunnel.state.create_basic_header()
packet.packet_type = SHUTDOWN
info := new(ControlHeader)
info.ctrl_type = 5
packet.header_info = info
tunnel.WritePacket(packet)
2023-09-21 16:11:27 +05:00
if tunnel.state.output != nil {
tunnel.state.output.Close()
}
2023-09-20 15:06:32 +05:00
}
}
func (tunnel *Tunnel) WritePacket(packet *Packet) {
buffer, err := MarshallPacket(packet, tunnel.state)
if err != nil {
tunnel.broken = true
2023-09-20 15:06:32 +05:00
fmt.Println(err)
return
}
tunnel.socket.WriteTo(buffer, tunnel.peer)
}
func (tunnel *Tunnel) ReadPacket() (*Packet, error) {
2023-09-22 14:58:33 +05:00
packet := <- tunnel.queue // blocking read, should add timeout here
return ParsePacket(packet)
2023-09-18 11:51:55 +05:00
}