stream-server/srt/protocol.go

146 lines
3.1 KiB
Go

package srt
import (
"time"
"net"
"crypto/sha256"
"fmt"
"errors"
)
const (
INDUCTION uint8 = iota
CONCLUSION
DATA_LOOP
)
type SRTManager struct {
state uint8
init time.Time
syn_cookie uint32
socket net.PacketConn
ctrl_sock_peer uint32
}
func NewSRTManager(l net.PacketConn) (*SRTManager) {
agent := new(SRTManager)
agent.init = time.Now()
agent.socket = l
return agent
}
func (agent *SRTManager) create_basic_header() (*Packet) {
packet := new(Packet)
packet.timestamp = uint32(time.Now().Sub(agent.init).Microseconds())
packet.dest_sock = agent.ctrl_sock_peer
return packet
}
func (agent *SRTManager) create_induction_resp() (*Packet) {
packet := agent.create_basic_header()
packet.packet_type = HANDSHAKE
info := new(ControlHeader)
packet.header_info = info
cif := new(HandshakeCIF)
cif.version = 5
cif.ext_field = 0x4a17
cif.hs_type = 1
cif.syn_cookie = agent.syn_cookie
cif.sock_id = 1
cif.mtu = 1500
cif.max_flow = 8192
ip := agent.socket.LocalAddr().(*net.UDPAddr).IP
for i := 0; i < len(ip); i++ {
cif.peer_ip[i] = ip[i]
}
packet.cif = cif
return packet
}
func (agent *SRTManager) make_syn_cookie(peer net.Addr) {
t := uint32(time.Now().Unix()) >> 6
s := sha256.New()
s.Write([]byte(peer.String() + fmt.Sprintf("%d", t)))
agent.syn_cookie = (agent.syn_cookie + t % 32) << 3
for _, v := range s.Sum(nil)[29:] {
agent.syn_cookie = (agent.syn_cookie << 8) + uint32(v)
}
}
func (agent *SRTManager) process_induction(packet *Packet) (*Packet) {
if packet.packet_type == HANDSHAKE {
hs_cif := packet.cif.(*HandshakeCIF)
if hs_cif.hs_type == 1 {
agent.state = CONCLUSION
agent.ctrl_sock_peer = hs_cif.sock_id
return agent.create_induction_resp()
}
}
return nil
}
func (agent *SRTManager) create_conclusion_resp() (*Packet) {
packet := agent.create_basic_header()
packet.packet_type = HANDSHAKE
info := new(ControlHeader)
packet.header_info = info
cif := new(HandshakeCIF)
cif.version = 5
cif.ext_field = 0x1
cif.sock_id = 1
cif.mtu = 1500
cif.max_flow = 8192
ip := agent.socket.LocalAddr().(*net.UDPAddr).IP
for i := 0; i < len(ip); i++ {
cif.peer_ip[i] = ip[i]
}
hs_ext := new(HandshakeExtension)
hs_ext.ext_type = 2
hs_ext.ext_len = 12
hs_msg := new(HSEMSG)
hs_msg.flags = uint32(0x01 | 0x02 | 0x04 | 0x08 | 0x20)
hs_msg.version = uint32(0x00010000)
hs_msg.recv_delay = 120
hs_msg.send_delay = 120
hs_ext.ext_contents = hs_msg
cif.hs_extensions = append(cif.hs_extensions, hs_ext)
packet.cif = cif
return packet
}
func (agent *SRTManager) process_conclusion(packet *Packet) (*Packet) {
if packet.packet_type == HANDSHAKE {
hs_cif := packet.cif.(*HandshakeCIF)
if hs_cif.hs_type == 0xffffffff && hs_cif.syn_cookie == agent.syn_cookie {
agent.state = DATA_LOOP
return agent.create_conclusion_resp()
}
}
return nil
}
func (agent *SRTManager) Process(packet *Packet) (*Packet, error) {
switch agent.state {
case INDUCTION:
return agent.process_induction(packet), nil
case CONCLUSION:
return agent.process_conclusion(packet), nil
case DATA_LOOP:
fmt.Println(packet)
return nil, nil
default:
return nil, errors.New("State not implemented")
}
}