package srt import ( "math" "sort" "io" ) type Datum struct { seq_num uint32 timestamp uint32 data []byte next *Datum } // linked chain of datums, specifically to store continuous segments // when a packet is missed, start and end can be used to generate lost // packet reports, and easily link when missing is received // 1..2..3..4 6..7..8..9 // chain_1 chain_2 // nack: get 5 type DatumLink struct { queued int // remove eventually, was to be used for ACK recv rate calcs, not needed root *Datum end *Datum } // data type and function to allow sorting so order can be ignored during // linking since each is sequential on outset type chains []*DatumLink func (c chains) Len() (int) { return len(c) } func (c chains) Swap(i, j int) { c[i], c[j] = c[j], c[i] } // chain_1 is less then chain_2 when chain_1 ends before chain_2 starts func (c chains) Less(i, j int) (bool) { x_1 := c[i].end.seq_num x_2 := c[j].root.seq_num if serial_less(x_1, x_2, 31) { return true } return false } type DatumStorage struct { main *DatumLink offshoots chains } // append new packet to end of buffer func (buffer *DatumLink) NewDatum(pkt *Packet) { datum := new(Datum) datum.seq_num = pkt.header_info.(*DataHeader).seq_num datum.timestamp = pkt.timestamp datum.data = pkt.cif.([]byte) buffer.queued += 1 buffer.end.next = datum buffer.end = datum } // create a new datumlink with root and end at the given packet func NewDatumLink(pkt *Packet) (*DatumLink) { buffer := new(DatumLink) root_datum := new(Datum) root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num root_datum.timestamp = pkt.timestamp root_datum.data = pkt.cif.([]byte) buffer.root = root_datum buffer.end = root_datum buffer.queued = 1 return buffer } // initialize storage with the given data packet, in the main chain func NewDatumStorage(packet *Packet) (*DatumStorage) { storage := new(DatumStorage) storage.main = NewDatumLink(packet) return storage } // purge all packets in the main chain except the last for future linkage func (storage *DatumStorage) Expunge(output io.WriteCloser) (error) { curr_datum := storage.main.root seq_num_end := storage.main.end.seq_num for curr_datum.seq_num != seq_num_end { _, err := output.Write(curr_datum.data) if err != nil { return err } curr_datum = curr_datum.next } storage.main.root = curr_datum return nil } // check if the given sequence number should already be inside the given buffer func (buffer *DatumLink) Holds(new_seq_num uint32) (bool) { start := buffer.root.seq_num end := buffer.end.seq_num if new_seq_num == start || new_seq_num == end { return true } if buffer.Before(new_seq_num) || buffer.After(new_seq_num) { return false } return true } // check if the given seq num lies before the given buffer starts // buffer is After seq num? func (buffer *DatumLink) After(new_seq_num uint32) (bool) { start := buffer.root.seq_num if serial_less(new_seq_num, start, 31) { return true } return false } // check if the given seq num lies after the given buffer starts // buffer is Before seq num? func (buffer *DatumLink) Before(new_seq_num uint32) (bool) { end := buffer.end.seq_num if serial_less(end, new_seq_num, 31) { return true } return false } // add the given packet to the appropriate buffer or add a new offshoot buffer func (storage *DatumStorage) NewDatum(pkt *Packet) { new_pkt_num := pkt.header_info.(*DataHeader).seq_num prev_num := (new_pkt_num - 1) % uint32(1 << 31) if storage.main.end.seq_num == prev_num { storage.main.NewDatum(pkt) } else if storage.main.Holds(new_pkt_num) { return } else { oldest := storage.main.root.seq_num if serial_less(new_pkt_num, oldest, 31) { return } for _, v := range storage.offshoots { if v.end.seq_num == prev_num { v.NewDatum(pkt) return } else if v.Holds(new_pkt_num) { return } } new_link := NewDatumLink(pkt) storage.offshoots = append(storage.offshoots, new_link) } } // Link 2 buffers to each other func (buffer *DatumLink) Link(buffer_next *DatumLink) { buffer.end.next = buffer_next.root buffer.end = buffer_next.end buffer.queued += buffer_next.queued } // check if the start of the buffer_next is sequentially next to the end of buffer // or it is too late to get the true next packets to link and the packets must be linked // given a 31-bit wrap around func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink, curr_time uint32) (bool) { seq_1 := buffer.end.seq_num seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31)) if buffer_next.root.seq_num == seq_2 || buffer.end.timestamp + 500000 < curr_time { buffer.Link(buffer_next) return true } return false } // Given the current storage state, check what packets need to added to fully link main func (storage *DatumStorage) GenNACKCIF() (*NACKCIF, bool) { if len(storage.offshoots) == 0 { return nil, false } cif := new(NACKCIF) init_range := new(pckts_range) init_range.start = (storage.main.end.seq_num + 1) % (1 << 31) init_range.end = (storage.offshoots[0].root.seq_num - 1) % (1 << 31) cif.lost_pkts = append(cif.lost_pkts, init_range) for i := 0; i < len(storage.offshoots) - 1; i++ { new_range := new(pckts_range) new_range.start = (storage.offshoots[i].end.seq_num + 1) % (1 << 31) new_range.end = (storage.offshoots[i + 1].root.seq_num - 1) % (1 << 31) cif.lost_pkts = append(cif.lost_pkts, new_range) } return cif, true } // Try to relink all chains wherever possible or necessary due to TLPKTDROP func (storage *DatumStorage) Relink(curr_time uint32) { sort.Sort(storage.offshoots) buffer := storage.main i := 0 for i < len(storage.offshoots) { if check_append_serial_next(buffer, storage.offshoots[i], curr_time) { storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...) // nuke the chain since it is not needed anymore } else { buffer = storage.offshoots[i] i++ } } } // check if a is less than b under serial arithmetic (modulo operations) func serial_less(a uint32, b uint32, bits int) (bool) { if (a < b && b-a < (1 << (bits - 1))) || (a > b && a-b > (1 << (bits - 1))) { return true } return false }