From a7c8be4d69b644dce77b7caa06d40c511a09bbb5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 5 Jul 2018 23:07:01 -0500 Subject: [PATCH] base backpressure decisions on queue size in bytes, instead of packet counts --- src/yggdrasil/switch.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 5b72620c..e7b25a29 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -595,7 +595,7 @@ type switch_packetInfo struct { // Used to keep track of buffered packets type switch_buffer struct { packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large - count uint64 // Total queue size, including dropped packets + size uint64 // Total queue size in bytes } func (b *switch_buffer) dropTimedOut() { @@ -603,8 +603,10 @@ func (b *switch_buffer) dropTimedOut() { const timeout = 25 * time.Millisecond now := time.Now() for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { - util_putBytes(b.packets[0].bytes) - b.packets = b.packets[1:] + var packet switch_packetInfo + packet, b.packets = b.packets[0], b.packets[1:] + b.size -= uint64(len(packet.bytes)) + util_putBytes(packet.bytes) } } @@ -629,9 +631,9 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer buffs[streamID] = buf packet := buf.packets[0] coords := switch_getPacketCoords(packet.bytes) - if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) { + if (bestSize == 0 || buf.size < bestSize) && t.portIsCloser(coords, port) { best = streamID - bestSize = buf.count + bestSize = buf.size } } if bestSize != 0 { @@ -639,7 +641,7 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.count-- + buf.size -= uint64(len(packet.bytes)) if len(buf.packets) == 0 { delete(buffs, best) } else { @@ -658,16 +660,16 @@ func (t *switchTable) doWorker() { idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { - case packet := <-t.packetIn: + case bytes := <-t.packetIn: // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t.handleIn(packet, idle) { + if !t.handleIn(bytes, idle) { // There's nobody free to take it right now, so queue it for later - streamID := switch_getPacketStreamID(packet) + packet := switch_packetInfo{bytes, time.Now()} + streamID := switch_getPacketStreamID(packet.bytes) buf := buffs[streamID] buf.dropTimedOut() - pinfo := switch_packetInfo{packet, time.Now()} - buf.packets = append(buf.packets, pinfo) - buf.count++ + buf.packets = append(buf.packets, packet) + buf.size += uint64(len(packet.bytes)) buffs[streamID] = buf } case port := <-t.idleIn: