TLPKTDROP, serial arithmetic refactoring
This commit is contained in:
parent
c2d9ebbeb4
commit
1cc2a1e3f4
3 changed files with 37 additions and 19 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
type Datum struct {
|
type Datum struct {
|
||||||
seq_num uint32
|
seq_num uint32
|
||||||
|
timestamp uint32
|
||||||
data []byte
|
data []byte
|
||||||
next *Datum
|
next *Datum
|
||||||
}
|
}
|
||||||
|
@ -31,8 +32,7 @@ func (c chains) Swap(i, j int) {
|
||||||
func (c chains) Less(i, j int) (bool) {
|
func (c chains) Less(i, j int) (bool) {
|
||||||
x_1 := c[i].end.seq_num
|
x_1 := c[i].end.seq_num
|
||||||
x_2 := c[j].root.seq_num
|
x_2 := c[j].root.seq_num
|
||||||
serial_add_limit := uint32(math.Pow(2, 30))
|
if serial_less(x_1, x_2, 31) {
|
||||||
if (x_1 < x_2 && x_2 - x_1 < serial_add_limit) || (x_1 > x_2 && x_1 - x_2 > serial_add_limit) {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -46,8 +46,8 @@ type DatumStorage struct {
|
||||||
func (buffer *DatumLink) NewDatum(pkt *Packet) {
|
func (buffer *DatumLink) NewDatum(pkt *Packet) {
|
||||||
datum := new(Datum)
|
datum := new(Datum)
|
||||||
datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
||||||
|
datum.timestamp = pkt.timestamp
|
||||||
datum.data = pkt.cif.([]byte)
|
datum.data = pkt.cif.([]byte)
|
||||||
|
|
||||||
buffer.queued += 1
|
buffer.queued += 1
|
||||||
buffer.end.next = datum
|
buffer.end.next = datum
|
||||||
buffer.end = datum
|
buffer.end = datum
|
||||||
|
@ -57,6 +57,7 @@ func NewDatumLink(pkt *Packet) (*DatumLink) {
|
||||||
buffer := new(DatumLink)
|
buffer := new(DatumLink)
|
||||||
root_datum := new(Datum)
|
root_datum := new(Datum)
|
||||||
root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
root_datum.seq_num = pkt.header_info.(*DataHeader).seq_num
|
||||||
|
root_datum.timestamp = pkt.timestamp
|
||||||
root_datum.data = pkt.cif.([]byte)
|
root_datum.data = pkt.cif.([]byte)
|
||||||
|
|
||||||
buffer.root = root_datum
|
buffer.root = root_datum
|
||||||
|
@ -85,38 +86,51 @@ func (storage *DatumStorage) Expunge(output io.WriteCloser) (error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *DatumLink) InRange(new_seq_num uint32) (bool) {
|
func (buffer *DatumLink) Holds(new_seq_num uint32) (bool) {
|
||||||
start := buffer.root.seq_num
|
start := buffer.root.seq_num
|
||||||
end := buffer.end.seq_num
|
end := buffer.end.seq_num
|
||||||
if new_seq_num == start || new_seq_num == end {
|
if new_seq_num == start || new_seq_num == end {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !((start < new_seq_num && new_seq_num - start < (1 << 30)) || (start > new_seq_num && start - new_seq_num > (1 << 30))) {
|
if buffer.Before(new_seq_num) || buffer.After(new_seq_num) {
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !((new_seq_num < end && end - new_seq_num < (1 << 30)) || (new_seq_num > end && new_seq_num - end > (1 << 30))) {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (buffer *DatumLink) After(new_seq_num uint32) (bool) {
|
||||||
|
start := buffer.root.seq_num
|
||||||
|
if serial_less(new_seq_num, start, 31) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (buffer *DatumLink) Before(new_seq_num uint32) (bool) {
|
||||||
|
end := buffer.end.seq_num
|
||||||
|
if serial_less(end, new_seq_num, 31) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (storage *DatumStorage) NewDatum(pkt *Packet) {
|
func (storage *DatumStorage) NewDatum(pkt *Packet) {
|
||||||
new_pkt_num := pkt.header_info.(*DataHeader).seq_num
|
new_pkt_num := pkt.header_info.(*DataHeader).seq_num
|
||||||
prev_num := (new_pkt_num - 1) % uint32(1 << 31)
|
prev_num := (new_pkt_num - 1) % uint32(1 << 31)
|
||||||
if storage.main.end.seq_num == prev_num {
|
if storage.main.end.seq_num == prev_num {
|
||||||
storage.main.NewDatum(pkt)
|
storage.main.NewDatum(pkt)
|
||||||
} else if storage.main.InRange(new_pkt_num) {
|
} else if storage.main.Holds(new_pkt_num) {
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
oldest := storage.main.root.seq_num
|
oldest := storage.main.root.seq_num
|
||||||
if (new_pkt_num < oldest && oldest - new_pkt_num < (1 << 30)) || (new_pkt_num > oldest && new_pkt_num - oldest > (1 << 30)) {
|
if serial_less(new_pkt_num, oldest, 31) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, v := range storage.offshoots {
|
for _, v := range storage.offshoots {
|
||||||
if v.end.seq_num == prev_num {
|
if v.end.seq_num == prev_num {
|
||||||
v.NewDatum(pkt)
|
v.NewDatum(pkt)
|
||||||
return
|
return
|
||||||
} else if v.InRange(new_pkt_num) {
|
} else if v.Holds(new_pkt_num) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,10 +145,10 @@ func (buffer *DatumLink) Link(buffer_next *DatumLink) {
|
||||||
buffer.queued += buffer_next.queued
|
buffer.queued += buffer_next.queued
|
||||||
}
|
}
|
||||||
|
|
||||||
func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink) (bool) {
|
func check_append_serial_next(buffer *DatumLink, buffer_next *DatumLink, curr_time uint32) (bool) {
|
||||||
seq_1 := buffer.end.seq_num
|
seq_1 := buffer.end.seq_num
|
||||||
seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31))
|
seq_2 := (seq_1 + 1) % uint32(math.Pow(2, 31))
|
||||||
if buffer_next.root.seq_num == seq_2 {
|
if buffer_next.root.seq_num == seq_2 || buffer.end.timestamp + 500000 < curr_time {
|
||||||
buffer.Link(buffer_next)
|
buffer.Link(buffer_next)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -159,12 +173,12 @@ func (storage *DatumStorage) GenNACKCIF() (*NACKCIF, bool) {
|
||||||
return cif, true
|
return cif, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *DatumStorage) Relink() {
|
func (storage *DatumStorage) Relink(curr_time uint32) {
|
||||||
sort.Sort(storage.offshoots)
|
sort.Sort(storage.offshoots)
|
||||||
buffer := storage.main
|
buffer := storage.main
|
||||||
i := 0
|
i := 0
|
||||||
for i < len(storage.offshoots) {
|
for i < len(storage.offshoots) {
|
||||||
if check_append_serial_next(buffer, storage.offshoots[i]) {
|
if check_append_serial_next(buffer, storage.offshoots[i], curr_time) {
|
||||||
storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...)
|
storage.offshoots = append(storage.offshoots[:i], storage.offshoots[i + 1:]...)
|
||||||
} else {
|
} else {
|
||||||
buffer = storage.offshoots[i]
|
buffer = storage.offshoots[i]
|
||||||
|
@ -173,3 +187,10 @@ func (storage *DatumStorage) Relink() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serial_less(a uint32, b uint32, bits int) (bool) {
|
||||||
|
if (a < b && b-a < (1 << (bits - 1))) || (a > b && a-b > (1 << (bits - 1))) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package srt
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -144,10 +143,8 @@ func marshall_ctrl_packet(packet *Packet, header []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func marshall_nack_cif(data *NACKCIF) ([]byte) {
|
func marshall_nack_cif(data *NACKCIF) ([]byte) {
|
||||||
fmt.Println("new NAK")
|
|
||||||
var loss_list_bytes []byte
|
var loss_list_bytes []byte
|
||||||
for _, pkts := range data.lost_pkts {
|
for _, pkts := range data.lost_pkts {
|
||||||
fmt.Println(pkts)
|
|
||||||
if pkts.start == pkts.end {
|
if pkts.start == pkts.end {
|
||||||
curr_bytes := make([]byte, 4)
|
curr_bytes := make([]byte, 4)
|
||||||
binary.BigEndian.PutUint32(curr_bytes, pkts.start)
|
binary.BigEndian.PutUint32(curr_bytes, pkts.start)
|
||||||
|
|
|
@ -265,7 +265,7 @@ func (agent *SRTManager) handle_data_storage(packet *Packet) {
|
||||||
agent.storage.NewDatum(packet)
|
agent.storage.NewDatum(packet)
|
||||||
}
|
}
|
||||||
if len(agent.storage.offshoots) != 0 {
|
if len(agent.storage.offshoots) != 0 {
|
||||||
agent.storage.Relink()
|
agent.storage.Relink(packet.timestamp)
|
||||||
}
|
}
|
||||||
agent.storage.Expunge(agent.output)
|
agent.storage.Expunge(agent.output)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue