Export ChunkBuffers, track bytes read in total
This commit is contained in:
parent
fabcecae43
commit
05087ccbb3
1 changed files with 40 additions and 20 deletions
|
@ -22,7 +22,7 @@ type Message struct {
|
|||
msg_len uint32
|
||||
}
|
||||
|
||||
type chunk_bufs struct {
|
||||
type ChunkBuffers struct {
|
||||
time []byte
|
||||
msg_len []byte
|
||||
msg_typeid []byte
|
||||
|
@ -31,8 +31,8 @@ type chunk_bufs struct {
|
|||
csid_true []byte
|
||||
}
|
||||
|
||||
func new_chunk_bufs() (*chunk_bufs) {
|
||||
new_chunk_bufs := chunk_bufs{
|
||||
func new_chunk_bufs() (*ChunkBuffers) {
|
||||
new_chunk_bufs := ChunkBuffers{
|
||||
make([]byte, 4),
|
||||
make([]byte, 4),
|
||||
make([]byte, 1),
|
||||
|
@ -43,14 +43,14 @@ func new_chunk_bufs() (*chunk_bufs) {
|
|||
return &new_chunk_bufs
|
||||
}
|
||||
|
||||
func OpenStreamsMapInit() (map[uint32]*ChunkStream, map[uint32]*Message, *chunk_bufs) {
|
||||
func OpenStreamsMapInit() (map[uint32]*ChunkStream, map[uint32]*Message, *ChunkBuffers) {
|
||||
open_chnkstrms := make(map[uint32]*ChunkStream)
|
||||
open_msgs := make(map[uint32]*Message)
|
||||
chunk_bufs_ptr := new_chunk_bufs()
|
||||
return open_chnkstrms, open_msgs, chunk_bufs_ptr
|
||||
}
|
||||
|
||||
func read_basic_header(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (format uint8, csid uint32, err error) {
|
||||
func read_basic_header(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (format uint8, csid uint32, err error) {
|
||||
if _, err = conn.Read(chunk_bufs_ptr.fmt_csid_byte); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func read_basic_header(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (format uint8,
|
|||
return
|
||||
}
|
||||
|
||||
func read_time(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, extended_time bool, err error) {
|
||||
func read_time(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, extended_time bool, err error) {
|
||||
chunk_bufs_ptr.time[0] = 0
|
||||
if _, err = conn.Read(chunk_bufs_ptr.time[1:]); err != nil {
|
||||
return
|
||||
|
@ -84,7 +84,7 @@ func read_time(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, extended
|
|||
return
|
||||
}
|
||||
|
||||
func read_msg_len(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_len uint32, err error) {
|
||||
func read_msg_len(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_len uint32, err error) {
|
||||
if _, err = conn.Read(chunk_bufs_ptr.msg_len[1:]); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func read_msg_len(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_len uint32, er
|
|||
return
|
||||
}
|
||||
|
||||
func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_type uint8, err error) {
|
||||
func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_type uint8, err error) {
|
||||
if _, err = conn.Read(chunk_bufs_ptr.msg_typeid); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func read_msg_typeid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_type uint8,
|
|||
return
|
||||
}
|
||||
|
||||
func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_streamid uint32, err error) {
|
||||
func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (msg_streamid uint32, err error) {
|
||||
if _, err = conn.Read(chunk_bufs_ptr.msg_streamid); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func read_msg_streamid(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (msg_streamid
|
|||
return
|
||||
}
|
||||
|
||||
func read_time_extd(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, err error) {
|
||||
func read_time_extd(conn net.Conn, chunk_bufs_ptr *ChunkBuffers) (time uint32, err error) {
|
||||
if _, err = conn.Read(chunk_bufs_ptr.time); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ func read_time_extd(conn net.Conn, chunk_bufs_ptr *chunk_bufs) (time uint32, err
|
|||
return
|
||||
}
|
||||
|
||||
func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) {
|
||||
func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
|
||||
var extended_time bool
|
||||
var err error
|
||||
chnk_stream_ptr.timestamp, extended_time, err = read_time(conn, chunk_bufs_ptr)
|
||||
|
@ -153,7 +153,7 @@ func read_message_header_0(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
|
|||
return nil
|
||||
}
|
||||
|
||||
func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) {
|
||||
func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
|
||||
var extended_time bool
|
||||
var err error
|
||||
|
||||
|
@ -186,7 +186,7 @@ func read_message_header_1(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
|
|||
return nil
|
||||
}
|
||||
|
||||
func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *chunk_bufs) (error) {
|
||||
func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr *Message, chunk_bufs_ptr *ChunkBuffers) (error) {
|
||||
var extended_time bool
|
||||
var err error
|
||||
|
||||
|
@ -212,7 +212,7 @@ func read_message_header_2(conn net.Conn, chnk_stream_ptr *ChunkStream, msg_ptr
|
|||
return nil
|
||||
}
|
||||
|
||||
func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (error) {
|
||||
func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (uint32, error) {
|
||||
bytes_left := msg_ptr.msg_len - msg_ptr.curr_bytes_read
|
||||
var buffer_end uint32
|
||||
if bytes_left < chnk_size {
|
||||
|
@ -222,16 +222,16 @@ func read_chunk_data(conn net.Conn, msg_ptr *Message, chnk_size uint32) (error)
|
|||
}
|
||||
n, err := conn.Read(msg_ptr.data[msg_ptr.curr_bytes_read:buffer_end])
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
msg_ptr.curr_bytes_read += uint32(n)
|
||||
return nil
|
||||
return uint32(n), nil
|
||||
|
||||
}
|
||||
|
||||
func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *chunk_bufs) (*Message, error){
|
||||
func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs map[uint32]*Message, chnk_size uint32, chunk_bufs_ptr *ChunkBuffers, n_read *uint32) (*Message, error){
|
||||
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
|
||||
var chnkstream_ptr *ChunkStream
|
||||
var msg_ptr *Message
|
||||
|
||||
|
@ -239,6 +239,13 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if csid < 64 {
|
||||
*n_read += 1
|
||||
} else if csid < 320 {
|
||||
*n_read += 2
|
||||
} else {
|
||||
*n_read += 3
|
||||
}
|
||||
|
||||
switch format {
|
||||
case 0:
|
||||
|
@ -249,6 +256,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
|
|||
if err := read_message_header_0(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*n_read += 11
|
||||
if msg_ptr.timestamp > 0xffffff {
|
||||
*n_read += 4
|
||||
}
|
||||
case 1:
|
||||
chnkstream_ptr = open_chnkstrms[csid]
|
||||
msg_ptr = new(Message)
|
||||
|
@ -256,6 +267,10 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
|
|||
if err := read_message_header_1(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*n_read += 7
|
||||
if msg_ptr.timestamp > 0xffffff {
|
||||
*n_read += 4
|
||||
}
|
||||
case 2:
|
||||
chnkstream_ptr = open_chnkstrms[csid]
|
||||
msg_ptr = new(Message)
|
||||
|
@ -263,15 +278,20 @@ func ReadChunk(conn net.Conn, open_chnkstrms map[uint32]*ChunkStream, open_msgs
|
|||
if err := read_message_header_2(conn, chnkstream_ptr, msg_ptr, chunk_bufs_ptr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*n_read += 3
|
||||
if msg_ptr.timestamp > 0xffffff {
|
||||
*n_read += 4
|
||||
}
|
||||
case 3:
|
||||
chnkstream_ptr = open_chnkstrms[csid]
|
||||
msg_ptr = open_msgs[chnkstream_ptr.last_msg_strm_id]
|
||||
}
|
||||
|
||||
|
||||
if err := read_chunk_data(conn, msg_ptr, chnk_size); err != nil {
|
||||
n, err := read_chunk_data(conn, msg_ptr, chnk_size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*n_read += n
|
||||
|
||||
conn.SetDeadline(time.Time{})
|
||||
|
||||
|
|
Loading…
Reference in a new issue