diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 1b611af2..bf6b9194 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -176,6 +176,7 @@ type switchTable struct { admin chan func() // Pass a lambda for the admin socket to query stuff queues switch_buffers // Queues - not atomic so ONLY use through admin chan queueTotalMaxSize uint64 // Maximum combined size of queues + toRouter chan []byte // Packets to be sent to the router } // Minimum allowed total size of switch queues. @@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) { t.idleIn = make(chan switchPort, 1024) t.admin = make(chan func()) t.queueTotalMaxSize = SwitchQueueTotalMinSize + t.toRouter = make(chan []byte, 1) } // Safely gets a copy of this node's locator. @@ -616,17 +618,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) - ports := t.core.peers.getPorts() closer := t.getCloser(coords) if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? - ports[0].sendPacket(packet) + t.toRouter <- packet return true } table := t.getTable() var best *peer var bestDist int var bestCoordLen int + ports := t.core.peers.getPorts() for port, dist := range closer { to := ports[port] _, isIdle := idle[port] @@ -775,6 +777,33 @@ func (t *switchTable) handleIdle(port switchPort) bool { // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { + sendingToRouter := make(chan []byte, 1) + go func() { + // Keep sending packets to the router + self := t.core.peers.getPorts()[0] + for bs := range sendingToRouter { + self.sendPacket(bs) + } + }() + go func() { + // Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer + var buf [][]byte + for { + buf = append(buf, <-t.toRouter) + for len(buf) > 0 { + select { + case bs := <-t.toRouter: + buf = append(buf, bs) + for len(buf) > 32 { + util.PutBytes(buf[0]) + buf = buf[1:] + } + case sendingToRouter <- buf[0]: + buf = buf[1:] + } + } + } + }() t.queues.switchTable = t t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things