diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index d91a18ef..358aaeb1 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,15 +1,13 @@ package yggdrasil -/* import ( - "math/rand" + "container/heap" "time" ) -*/ // TODO separate queues per e.g. traffic flow // For now, we put everything in queue -/* + type pqStreamID string type pqPacketInfo struct { @@ -18,15 +16,13 @@ type pqPacketInfo struct { } type pqStream struct { - id string + id pqStreamID infos []pqPacketInfo - size int + size uint64 } -*/ type packetQueue struct { - //streams []pqStream - packets [][]byte + streams []pqStream size uint64 } @@ -36,24 +32,79 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint64(len(packet)) - pool_putBytes(packet) + var longestIdx int + for idx := range q.streams { + if q.streams[idx].size > q.streams[longestIdx].size { + longestIdx = idx + } + } + stream := heap.Remove(q, longestIdx).(pqStream) + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + heap.Push(q, stream) + } + pool_putBytes(info.packet) return true } func (q *packetQueue) push(packet []byte) { - q.packets = append(q.packets, packet) - q.size += uint64(len(packet)) + id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + info := pqPacketInfo{packet: packet, time: time.Now()} + for idx := range q.streams { + if q.streams[idx].id == id { + q.streams[idx].infos = append(q.streams[idx].infos, info) + q.streams[idx].size += uint64(len(packet)) + q.size += uint64(len(packet)) + return + } + } + stream := pqStream{id: id, size: uint64(len(packet))} + stream.infos = append(stream.infos, info) + heap.Push(q, stream) } func (q *packetQueue) pop() ([]byte, bool) { if q.size > 0 { - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint64(len(packet)) - return packet, true + stream := heap.Pop(q).(pqStream) + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + heap.Push(q, stream) + } + return info.packet, true } return nil, false } + +//////////////////////////////////////////////////////////////////////////////// + +// Interface methods for packetQueue to satisfy heap.Interface + +func (q *packetQueue) Len() int { + return len(q.streams) +} + +func (q *packetQueue) Less(i, j int) bool { + return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time) +} + +func (q *packetQueue) Swap(i, j int) { + q.streams[i], q.streams[j] = q.streams[j], q.streams[i] +} + +func (q *packetQueue) Push(x interface{}) { + stream := x.(pqStream) + q.streams = append(q.streams, stream) + q.size += stream.size +} + +func (q *packetQueue) Pop() interface{} { + idx := len(q.streams) - 1 + stream := q.streams[idx] + q.streams = q.streams[:idx] + q.size -= stream.size + return stream +}