diff --git a/srt/data_chain.go b/srt/data_chain.go index 9576b73..94e34fc 100644 --- a/srt/data_chain.go +++ b/srt/data_chain.go @@ -8,6 +8,7 @@ import ( type Datum struct { seq_num uint32 + timestamp uint32 data []byte next *Datum } @@ -31,8 +32,7 @@ func (c chains) Swap(i, j int) { func (c chains) Less(i, j int) (bool) { x_1 := c[i].end.seq_num x_2 := c[j].root.seq_num - serial_add_limit := uint32(math.Pow(2, 30)) - if (x_1 < x_2 && x_2 - x_1 < serial_add_limit) || (x_1 > x_2 && x_1 - x_2 > serial_add_limit) { + if serial_less(x_1, x_2, 31) { return true } return false @@ -46,8 +46,8 @@ type DatumStorage struct { func (buffer *DatumLink) NewDatum(pkt *Packet) { datum := new(Datum) datum.seq_num = pkt.header_info.(*DataHeader).seq_num + datum.timestamp = pkt.timestamp datum.data = pkt.cif.([]byte) - buffer.queued += 1 buffer.end.next = datum buffer.end = datum @@ -57,6 +57,7 @@ func NewDatumLink(pkt *Packet) (*DatumLink) { buffer := new(DatumLink) root_datum := new(Datum) root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num + root_datum.timestamp = pkt.timestamp root_datum.data = pkt.cif.([]byte) buffer.root = root_datum @@ -85,38 +86,51 @@ func (storage *DatumStorage) Expunge(output io.WriteCloser) (error) { return nil } -func (buffer *DatumLink) InRange(new_seq_num uint32) (bool) { +func (buffer *DatumLink) Holds(new_seq_num uint32) (bool) { start := buffer.root.seq_num end := buffer.end.seq_num if new_seq_num == start || new_seq_num == end { return true } - if !((start < new_seq_num && new_seq_num - start < (1 << 30)) || (start > new_seq_num && start - new_seq_num > (1 << 30))) { - return false - } - if !((new_seq_num < end && end - new_seq_num < (1 << 30)) || (new_seq_num > end && new_seq_num - end > (1 << 30))) { + if buffer.Before(new_seq_num) || buffer.After(new_seq_num) { return false } return true } +func (buffer *DatumLink) After(new_seq_num uint32) (bool) { + start := buffer.root.seq_num + if serial_less(new_seq_num, start, 31) { + return true + } + return false +} + +func (buffer *DatumLink) Before(new_seq_num uint32) (bool) { + end := buffer.end.seq_num + if serial_less(end, new_seq_num, 31) { + return true + } + return false +} + func (storage *DatumStorage) NewDatum(pkt *Packet) { new_pkt_num := pkt.header_info.(*DataHeader).seq_num prev_num := (new_pkt_num - 1) % uint32(1 << 31) if storage.main.end.seq_num == prev_num { storage.main.NewDatum(pkt) - } else if storage.main.InRange(new_pkt_num) { + } else if storage.main.Holds(new_pkt_num) { return } else { oldest := storage.main.root.seq_num - if (new_pkt_num < oldest && oldest - new_pkt_num < (1 << 30)) || (new_pkt_num > oldest && new_pkt_num - oldest > (1 << 30)) { + if serial_less(new_pkt_num, oldest, 31) { return } for _, v := range storage.offshoots { if v.end.seq_num == prev_num { v.NewDatum(pkt) return - } else if v.InRange(new_pkt_num) { + } else if v.Holds(new_pkt_num) { return } } @@ -131,10 +145,10 @@ func (buffer *DatumLink) Link(buffer_next *DatumLink) { buffer.queued += buffer_next.queued } -func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink) (bool) { +func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink, curr_time uint32) (bool) { seq_1 := buffer.end.seq_num seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31)) - if buffer_next.root.seq_num == seq_2 { + if buffer_next.root.seq_num == seq_2 || buffer.end.timestamp + 500000 < curr_time { buffer.Link(buffer_next) return true } @@ -159,12 +173,12 @@ func (storage *DatumStorage) GenNACKCIF() (*NACKCIF, bool) { return cif, true } -func (storage *DatumStorage) Relink() { +func (storage *DatumStorage) Relink(curr_time uint32) { sort.Sort(storage.offshoots) buffer := storage.main i := 0 for i < len(storage.offshoots) { - if check_append_serial_next(buffer, storage.offshoots[i]) { + if check_append_serial_next(buffer, storage.offshoots[i], curr_time) { storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...) } else { buffer = storage.offshoots[i] @@ -173,3 +187,10 @@ func (storage *DatumStorage) Relink() { } } +func serial_less(a uint32, b uint32, bits int) (bool) { + if (a < b && b-a < (1 << (bits - 1))) || (a > b && a-b > (1 << (bits - 1))) { + return true + } + return false +} + diff --git a/srt/packet.go b/srt/packet.go index 936a875..d958e0e 100644 --- a/srt/packet.go +++ b/srt/packet.go @@ -3,7 +3,6 @@ package srt import ( "errors" "encoding/binary" - "fmt" ) const ( @@ -144,10 +143,8 @@ func marshall_ctrl_packet(packet *Packet, header []byte) ([]byte, error) { } func marshall_nack_cif(data *NACKCIF) ([]byte) { - fmt.Println("new NAK") var loss_list_bytes []byte for _, pkts := range data.lost_pkts { - fmt.Println(pkts) if pkts.start == pkts.end { curr_bytes := make([]byte, 4) binary.BigEndian.PutUint32(curr_bytes, pkts.start) diff --git a/srt/protocol.go b/srt/protocol.go index 4522bda..819ab38 100644 --- a/srt/protocol.go +++ b/srt/protocol.go @@ -265,7 +265,7 @@ func (agent *SRTManager) handle_data_storage(packet *Packet) { agent.storage.NewDatum(packet) } if len(agent.storage.offshoots) != 0 { - agent.storage.Relink() + agent.storage.Relink(packet.timestamp) } agent.storage.Expunge(agent.output) }