stream-server/srt/data_chain.go

197 lines
4.4 KiB
Go
Raw Normal View History

2023-09-20 15:06:32 +05:00
package srt
import (
"math"
"sort"
2023-09-21 16:11:27 +05:00
"io"
2023-09-20 15:06:32 +05:00
)
type Datum struct {
seq_num uint32
timestamp uint32
2023-09-20 15:06:32 +05:00
data []byte
next *Datum
}
type DatumLink struct {
queued int
root *Datum
end *Datum
}
type chains []*DatumLink
func (c chains) Len() (int) {
return len(c)
}
func (c chains) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}
func (c chains) Less(i, j int) (bool) {
x_1 := c[i].end.seq_num
x_2 := c[j].root.seq_num
if serial_less(x_1, x_2, 31) {
2023-09-20 15:06:32 +05:00
return true
}
return false
}
type DatumStorage struct {
main *DatumLink
offshoots chains
}
func (buffer *DatumLink) NewDatum(pkt *Packet) {
datum := new(Datum)
datum.seq_num = pkt.header_info.(*DataHeader).seq_num
datum.timestamp = pkt.timestamp
2023-09-20 15:06:32 +05:00
datum.data = pkt.cif.([]byte)
buffer.queued += 1
buffer.end.next = datum
buffer.end = datum
}
func NewDatumLink(pkt *Packet) (*DatumLink) {
buffer := new(DatumLink)
root_datum := new(Datum)
root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num
root_datum.timestamp = pkt.timestamp
2023-09-20 15:06:32 +05:00
root_datum.data = pkt.cif.([]byte)
buffer.root = root_datum
buffer.end = root_datum
buffer.queued = 1
return buffer
}
func NewDatumStorage(packet *Packet) (*DatumStorage) {
storage := new(DatumStorage)
storage.main = NewDatumLink(packet)
return storage
}
2023-09-21 16:11:27 +05:00
func (storage *DatumStorage) Expunge(output io.WriteCloser) (error) {
curr_datum := storage.main.root
seq_num_end := storage.main.end.seq_num
for curr_datum.seq_num != seq_num_end {
_, err := output.Write(curr_datum.data)
if err != nil {
return err
}
curr_datum = curr_datum.next
}
storage.main.root = curr_datum
return nil
}
func (buffer *DatumLink) Holds(new_seq_num uint32) (bool) {
2023-09-21 16:11:27 +05:00
start := buffer.root.seq_num
end := buffer.end.seq_num
if new_seq_num == start || new_seq_num == end {
return true
}
if buffer.Before(new_seq_num) || buffer.After(new_seq_num) {
2023-09-21 16:11:27 +05:00
return false
}
return true
}
func (buffer *DatumLink) After(new_seq_num uint32) (bool) {
start := buffer.root.seq_num
if serial_less(new_seq_num, start, 31) {
return true
}
return false
}
func (buffer *DatumLink) Before(new_seq_num uint32) (bool) {
end := buffer.end.seq_num
if serial_less(end, new_seq_num, 31) {
return true
}
return false
}
2023-09-20 15:06:32 +05:00
func (storage *DatumStorage) NewDatum(pkt *Packet) {
2023-09-21 16:11:27 +05:00
new_pkt_num := pkt.header_info.(*DataHeader).seq_num
prev_num := (new_pkt_num - 1) % uint32(1 << 31)
2023-09-20 15:06:32 +05:00
if storage.main.end.seq_num == prev_num {
storage.main.NewDatum(pkt)
} else if storage.main.Holds(new_pkt_num) {
2023-09-21 16:11:27 +05:00
return
2023-09-20 15:06:32 +05:00
} else {
2023-09-21 16:11:27 +05:00
oldest := storage.main.root.seq_num
if serial_less(new_pkt_num, oldest, 31) {
2023-09-21 16:11:27 +05:00
return
}
2023-09-20 15:06:32 +05:00
for _, v := range storage.offshoots {
if v.end.seq_num == prev_num {
v.NewDatum(pkt)
2023-09-21 16:11:27 +05:00
return
} else if v.Holds(new_pkt_num) {
2023-09-21 16:11:27 +05:00
return
2023-09-20 15:06:32 +05:00
}
}
2023-09-21 16:11:27 +05:00
new_link := NewDatumLink(pkt)
storage.offshoots = append(storage.offshoots, new_link)
2023-09-20 15:06:32 +05:00
}
}
func (buffer *DatumLink) Link(buffer_next *DatumLink) {
buffer.end.next = buffer_next.root
buffer.end = buffer_next.end
buffer.queued += buffer_next.queued
}
func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink, curr_time uint32) (bool) {
2023-09-20 15:06:32 +05:00
seq_1 := buffer.end.seq_num
seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31))
if buffer_next.root.seq_num == seq_2 || buffer.end.timestamp + 500000 < curr_time {
2023-09-20 15:06:32 +05:00
buffer.Link(buffer_next)
return true
}
return false
}
2023-09-21 16:11:27 +05:00
func (storage *DatumStorage) GenNACKCIF() (*NACKCIF, bool) {
if len(storage.offshoots) == 0 {
return nil, false
}
cif := new(NACKCIF)
init_range := new(pckts_range)
init_range.start = (storage.main.end.seq_num + 1) % (1 << 31)
init_range.end = (storage.offshoots[0].root.seq_num - 1) % (1 << 31)
cif.lost_pkts = append(cif.lost_pkts, init_range)
for i := 0; i < len(storage.offshoots) - 1; i++ {
new_range := new(pckts_range)
new_range.start = (storage.offshoots[i].end.seq_num + 1) % (1 << 31)
new_range.end = (storage.offshoots[i + 1].root.seq_num - 1) % (1 << 31)
cif.lost_pkts = append(cif.lost_pkts, new_range)
}
return cif, true
}
func (storage *DatumStorage) Relink(curr_time uint32) {
2023-09-20 15:06:32 +05:00
sort.Sort(storage.offshoots)
buffer := storage.main
i := 0
for i < len(storage.offshoots) {
if check_append_serial_next(buffer, storage.offshoots[i], curr_time) {
2023-09-20 15:06:32 +05:00
storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...)
} else {
buffer = storage.offshoots[i]
i++
}
}
}
func serial_less(a uint32, b uint32, bits int) (bool) {
if (a < b && b-a < (1 << (bits - 1))) || (a > b && a-b > (1 << (bits - 1))) {
return true
}
return false
}