mirror of
https://github.com/yggdrasil-network/yggdrasil-go
synced 2024-11-13 00:40:24 +03:00
work-in-progress faster queue logic
This commit is contained in:
parent
28d6e3e605
commit
761ae531cb
@ -1,10 +1,15 @@
|
|||||||
package yggdrasil
|
package yggdrasil
|
||||||
|
|
||||||
|
/*
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
*/
|
||||||
|
|
||||||
|
// TODO separate queues per e.g. traffic flow
|
||||||
|
// For now, we put everything in queue
|
||||||
|
/*
|
||||||
type pqStreamID string
|
type pqStreamID string
|
||||||
|
|
||||||
type pqPacketInfo struct {
|
type pqPacketInfo struct {
|
||||||
@ -13,13 +18,15 @@ type pqPacketInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type pqStream struct {
|
type pqStream struct {
|
||||||
|
id string
|
||||||
infos []pqPacketInfo
|
infos []pqPacketInfo
|
||||||
size uint64
|
size int
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// TODO separate queues per e.g. traffic flow
|
|
||||||
type packetQueue struct {
|
type packetQueue struct {
|
||||||
streams map[pqStreamID]pqStream
|
//streams []pqStream
|
||||||
|
packets [][]byte
|
||||||
size uint64
|
size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,83 +36,23 @@ func (q *packetQueue) drop() bool {
|
|||||||
if q.size == 0 {
|
if q.size == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// select a random stream, odds based on stream size
|
packet := q.packets[0]
|
||||||
offset := rand.Uint64() % q.size
|
q.packets = q.packets[1:]
|
||||||
var worst pqStreamID
|
|
||||||
var size uint64
|
|
||||||
for id, stream := range q.streams {
|
|
||||||
worst = id
|
|
||||||
size += stream.size
|
|
||||||
if size >= offset {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// drop the oldest packet from the stream
|
|
||||||
worstStream := q.streams[worst]
|
|
||||||
packet := worstStream.infos[0].packet
|
|
||||||
worstStream.infos = worstStream.infos[1:]
|
|
||||||
worstStream.size -= uint64(len(packet))
|
|
||||||
q.size -= uint64(len(packet))
|
q.size -= uint64(len(packet))
|
||||||
pool_putBytes(packet)
|
pool_putBytes(packet)
|
||||||
// save the modified stream to queues
|
|
||||||
if len(worstStream.infos) > 0 {
|
|
||||||
q.streams[worst] = worstStream
|
|
||||||
} else {
|
|
||||||
delete(q.streams, worst)
|
|
||||||
}
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *packetQueue) push(packet []byte) {
|
func (q *packetQueue) push(packet []byte) {
|
||||||
if q.streams == nil {
|
q.packets = append(q.packets, packet)
|
||||||
q.streams = make(map[pqStreamID]pqStream)
|
|
||||||
}
|
|
||||||
// get stream
|
|
||||||
id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
|
|
||||||
stream := q.streams[id]
|
|
||||||
// update stream
|
|
||||||
stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()})
|
|
||||||
stream.size += uint64(len(packet))
|
|
||||||
// save update to queues
|
|
||||||
q.streams[id] = stream
|
|
||||||
q.size += uint64(len(packet))
|
q.size += uint64(len(packet))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *packetQueue) pop() ([]byte, bool) {
|
func (q *packetQueue) pop() ([]byte, bool) {
|
||||||
if len(q.streams) > 0 {
|
if q.size > 0 {
|
||||||
// get the stream that uses the least bandwidth
|
packet := q.packets[0]
|
||||||
now := time.Now()
|
q.packets = q.packets[1:]
|
||||||
var best pqStreamID
|
|
||||||
for id := range q.streams {
|
|
||||||
best = id
|
|
||||||
break // get a random ID to start
|
|
||||||
}
|
|
||||||
bestStream := q.streams[best]
|
|
||||||
bestSize := float64(bestStream.size)
|
|
||||||
bestAge := now.Sub(bestStream.infos[0].time).Seconds()
|
|
||||||
for id, stream := range q.streams {
|
|
||||||
thisSize := float64(stream.size)
|
|
||||||
thisAge := now.Sub(stream.infos[0].time).Seconds()
|
|
||||||
// cross multiply to avoid division by zero issues
|
|
||||||
if bestSize*thisAge > thisSize*bestAge {
|
|
||||||
// bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth
|
|
||||||
best = id
|
|
||||||
bestStream = stream
|
|
||||||
bestSize = thisSize
|
|
||||||
bestAge = thisAge
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// get the oldest packet from the best stream
|
|
||||||
packet := bestStream.infos[0].packet
|
|
||||||
bestStream.infos = bestStream.infos[1:]
|
|
||||||
bestStream.size -= uint64(len(packet))
|
|
||||||
q.size -= uint64(len(packet))
|
q.size -= uint64(len(packet))
|
||||||
// save the modified stream to queues
|
|
||||||
if len(bestStream.infos) > 0 {
|
|
||||||
q.streams[best] = bestStream
|
|
||||||
} else {
|
|
||||||
delete(q.streams, best)
|
|
||||||
}
|
|
||||||
return packet, true
|
return packet, true
|
||||||
}
|
}
|
||||||
return nil, false
|
return nil, false
|
||||||
|
Loading…
Reference in New Issue
Block a user