2023-09-20 15:06:32 +05:00
|
|
|
package srt
|
|
|
|
|
|
|
|
import (
|
|
|
|
"math"
|
|
|
|
"sort"
|
2023-09-21 16:11:27 +05:00
|
|
|
"io"
|
2023-09-20 15:06:32 +05:00
|
|
|
)
|
|
|
|
|
|
|
|
type Datum struct {
|
|
|
|
seq_num uint32
|
2023-09-22 11:22:13 +05:00
|
|
|
timestamp uint32
|
2023-09-20 15:06:32 +05:00
|
|
|
data []byte
|
|
|
|
next *Datum
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// linked chain of datums, specifically to store continuous segments
|
|
|
|
// when a packet is missed, start and end can be used to generate lost
|
|
|
|
// packet reports, and easily link when missing is received
|
|
|
|
// 1..2..3..4 6..7..8..9
|
|
|
|
// chain_1 chain_2
|
|
|
|
// nack: get 5
|
2023-09-20 15:06:32 +05:00
|
|
|
type DatumLink struct {
|
2023-09-22 14:58:33 +05:00
|
|
|
queued int // remove eventually, was to be used for ACK recv rate calcs, not needed
|
2023-09-20 15:06:32 +05:00
|
|
|
root *Datum
|
|
|
|
end *Datum
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// data type and function to allow sorting so order can be ignored during
|
|
|
|
// linking since each is sequential on outset
|
2023-09-20 15:06:32 +05:00
|
|
|
type chains []*DatumLink
|
|
|
|
|
|
|
|
func (c chains) Len() (int) {
|
|
|
|
return len(c)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c chains) Swap(i, j int) {
|
|
|
|
c[i], c[j] = c[j], c[i]
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// chain_1 is less then chain_2 when chain_1 ends before chain_2 starts
|
2023-09-20 15:06:32 +05:00
|
|
|
func (c chains) Less(i, j int) (bool) {
|
|
|
|
x_1 := c[i].end.seq_num
|
|
|
|
x_2 := c[j].root.seq_num
|
2023-09-22 11:22:13 +05:00
|
|
|
if serial_less(x_1, x_2, 31) {
|
2023-09-20 15:06:32 +05:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
type DatumStorage struct {
|
|
|
|
main *DatumLink
|
|
|
|
offshoots chains
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// append new packet to end of buffer
|
2023-09-20 15:06:32 +05:00
|
|
|
func (buffer *DatumLink) NewDatum(pkt *Packet) {
|
|
|
|
datum := new(Datum)
|
|
|
|
datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
2023-09-22 11:22:13 +05:00
|
|
|
datum.timestamp = pkt.timestamp
|
2023-09-20 15:06:32 +05:00
|
|
|
datum.data = pkt.cif.([]byte)
|
|
|
|
buffer.queued += 1
|
|
|
|
buffer.end.next = datum
|
|
|
|
buffer.end = datum
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// create a new datumlink with root and end at the given packet
|
2023-09-20 15:06:32 +05:00
|
|
|
func NewDatumLink(pkt *Packet) (*DatumLink) {
|
|
|
|
buffer := new(DatumLink)
|
|
|
|
root_datum := new(Datum)
|
|
|
|
root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
2023-09-22 11:22:13 +05:00
|
|
|
root_datum.timestamp = pkt.timestamp
|
2023-09-20 15:06:32 +05:00
|
|
|
root_datum.data = pkt.cif.([]byte)
|
|
|
|
|
|
|
|
buffer.root = root_datum
|
|
|
|
buffer.end = root_datum
|
|
|
|
buffer.queued = 1
|
|
|
|
return buffer
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// initialize storage with the given data packet, in the main chain
|
2023-09-20 15:06:32 +05:00
|
|
|
func NewDatumStorage(packet *Packet) (*DatumStorage) {
|
|
|
|
storage := new(DatumStorage)
|
|
|
|
storage.main = NewDatumLink(packet)
|
|
|
|
return storage
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// purge all packets in the main chain except the last for future linkage
|
2023-09-21 16:11:27 +05:00
|
|
|
func (storage *DatumStorage) Expunge(output io.WriteCloser) (error) {
|
|
|
|
curr_datum := storage.main.root
|
|
|
|
seq_num_end := storage.main.end.seq_num
|
|
|
|
for curr_datum.seq_num != seq_num_end {
|
|
|
|
_, err := output.Write(curr_datum.data)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
curr_datum = curr_datum.next
|
|
|
|
}
|
|
|
|
storage.main.root = curr_datum
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// check if the given sequence number should already be inside the given buffer
|
2023-09-22 11:22:13 +05:00
|
|
|
func (buffer *DatumLink) Holds(new_seq_num uint32) (bool) {
|
2023-09-21 16:11:27 +05:00
|
|
|
start := buffer.root.seq_num
|
|
|
|
end := buffer.end.seq_num
|
|
|
|
if new_seq_num == start || new_seq_num == end {
|
|
|
|
return true
|
|
|
|
}
|
2023-09-22 11:22:13 +05:00
|
|
|
if buffer.Before(new_seq_num) || buffer.After(new_seq_num) {
|
2023-09-21 16:11:27 +05:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// check if the given seq num lies before the given buffer starts
|
|
|
|
// buffer is After seq num?
|
2023-09-22 11:22:13 +05:00
|
|
|
func (buffer *DatumLink) After(new_seq_num uint32) (bool) {
|
|
|
|
start := buffer.root.seq_num
|
|
|
|
if serial_less(new_seq_num, start, 31) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// check if the given seq num lies after the given buffer starts
|
|
|
|
// buffer is Before seq num?
|
2023-09-22 11:22:13 +05:00
|
|
|
func (buffer *DatumLink) Before(new_seq_num uint32) (bool) {
|
|
|
|
end := buffer.end.seq_num
|
|
|
|
if serial_less(end, new_seq_num, 31) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// add the given packet to the appropriate buffer or add a new offshoot buffer
|
2023-09-20 15:06:32 +05:00
|
|
|
func (storage *DatumStorage) NewDatum(pkt *Packet) {
|
2023-09-21 16:11:27 +05:00
|
|
|
new_pkt_num := pkt.header_info.(*DataHeader).seq_num
|
|
|
|
prev_num := (new_pkt_num - 1) % uint32(1 << 31)
|
2023-09-20 15:06:32 +05:00
|
|
|
if storage.main.end.seq_num == prev_num {
|
|
|
|
storage.main.NewDatum(pkt)
|
2023-09-22 11:22:13 +05:00
|
|
|
} else if storage.main.Holds(new_pkt_num) {
|
2023-09-21 16:11:27 +05:00
|
|
|
return
|
2023-09-20 15:06:32 +05:00
|
|
|
} else {
|
2023-09-21 16:11:27 +05:00
|
|
|
oldest := storage.main.root.seq_num
|
2023-09-22 11:22:13 +05:00
|
|
|
if serial_less(new_pkt_num, oldest, 31) {
|
2023-09-21 16:11:27 +05:00
|
|
|
return
|
|
|
|
}
|
2023-09-20 15:06:32 +05:00
|
|
|
for _, v := range storage.offshoots {
|
|
|
|
if v.end.seq_num == prev_num {
|
|
|
|
v.NewDatum(pkt)
|
2023-09-21 16:11:27 +05:00
|
|
|
return
|
2023-09-22 11:22:13 +05:00
|
|
|
} else if v.Holds(new_pkt_num) {
|
2023-09-21 16:11:27 +05:00
|
|
|
return
|
2023-09-20 15:06:32 +05:00
|
|
|
}
|
|
|
|
}
|
2023-09-21 16:11:27 +05:00
|
|
|
new_link := NewDatumLink(pkt)
|
|
|
|
storage.offshoots = append(storage.offshoots, new_link)
|
2023-09-20 15:06:32 +05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// Link 2 buffers to each other
|
2023-09-20 15:06:32 +05:00
|
|
|
func (buffer *DatumLink) Link(buffer_next *DatumLink) {
|
|
|
|
buffer.end.next = buffer_next.root
|
|
|
|
buffer.end = buffer_next.end
|
|
|
|
buffer.queued += buffer_next.queued
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// check if the start of the buffer_next is sequentially next to the end of buffer
|
|
|
|
// or it is too late to get the true next packets to link and the packets must be linked
|
|
|
|
// given a 31-bit wrap around
|
2023-09-22 11:22:13 +05:00
|
|
|
func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink, curr_time uint32) (bool) {
|
2023-09-20 15:06:32 +05:00
|
|
|
seq_1 := buffer.end.seq_num
|
|
|
|
seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31))
|
2023-09-22 11:22:13 +05:00
|
|
|
if buffer_next.root.seq_num == seq_2 || buffer.end.timestamp + 500000 < curr_time {
|
2023-09-20 15:06:32 +05:00
|
|
|
buffer.Link(buffer_next)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// Given the current storage state, check what packets need to added to fully link main
|
2023-09-21 16:11:27 +05:00
|
|
|
func (storage *DatumStorage) GenNACKCIF() (*NACKCIF, bool) {
|
|
|
|
if len(storage.offshoots) == 0 {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
cif := new(NACKCIF)
|
|
|
|
init_range := new(pckts_range)
|
|
|
|
init_range.start = (storage.main.end.seq_num + 1) % (1 << 31)
|
|
|
|
init_range.end = (storage.offshoots[0].root.seq_num - 1) % (1 << 31)
|
|
|
|
cif.lost_pkts = append(cif.lost_pkts, init_range)
|
|
|
|
for i := 0; i < len(storage.offshoots) - 1; i++ {
|
|
|
|
new_range := new(pckts_range)
|
|
|
|
new_range.start = (storage.offshoots[i].end.seq_num + 1) % (1 << 31)
|
|
|
|
new_range.end = (storage.offshoots[i + 1].root.seq_num - 1) % (1 << 31)
|
|
|
|
cif.lost_pkts = append(cif.lost_pkts, new_range)
|
|
|
|
}
|
|
|
|
return cif, true
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// Try to relink all chains wherever possible or necessary due to TLPKTDROP
|
2023-09-22 11:22:13 +05:00
|
|
|
func (storage *DatumStorage) Relink(curr_time uint32) {
|
2023-09-20 15:06:32 +05:00
|
|
|
sort.Sort(storage.offshoots)
|
|
|
|
buffer := storage.main
|
|
|
|
i := 0
|
|
|
|
for i < len(storage.offshoots) {
|
2023-09-22 11:22:13 +05:00
|
|
|
if check_append_serial_next(buffer, storage.offshoots[i], curr_time) {
|
2023-09-22 14:58:33 +05:00
|
|
|
storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...) // nuke the chain since it is not needed anymore
|
2023-09-20 15:06:32 +05:00
|
|
|
} else {
|
|
|
|
buffer = storage.offshoots[i]
|
|
|
|
i++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-22 14:58:33 +05:00
|
|
|
// check if a is less than b under serial arithmetic (modulo operations)
|
2023-09-22 11:22:13 +05:00
|
|
|
func serial_less(a uint32, b uint32, bits int) (bool) {
|
|
|
|
if (a < b && b-a < (1 << (bits - 1))) || (a > b && a-b > (1 << (bits - 1))) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|