From 498bc395e2ed8adc55813f8aeda24f2827c3d4d3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 24 Aug 2019 14:56:33 -0500 Subject: [PATCH] start migrating switch to the actor model --- src/yggdrasil/switch.go | 117 ++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 70 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 0be01240..7f3c7a78 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -19,6 +19,8 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) const ( @@ -172,12 +174,13 @@ type switchTable struct { data switchData // updater atomic.Value // *sync.Once table atomic.Value // lookupTable + phony.Actor // Owns the below packetIn chan []byte // Incoming packets for the worker to handle idleIn chan switchPort // Incoming idle notifications from peer links admin chan func() // Pass a lambda for the admin socket to query stuff queues switch_buffers // Queues - not atomic so ONLY use through admin chan queueTotalMaxSize uint64 // Maximum combined size of queues - toRouter chan []byte // Packets to be sent to the router + idle map[switchPort]time.Time // idle peers } // Minimum allowed total size of switch queues. @@ -199,7 +202,7 @@ func (t *switchTable) init(core *Core) { t.idleIn = make(chan switchPort, 1024) t.admin = make(chan func()) t.queueTotalMaxSize = SwitchQueueTotalMinSize - t.toRouter = make(chan []byte, 1) + t.idle = make(map[switchPort]time.Time) } // Safely gets a copy of this node's locator. @@ -653,12 +656,13 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]time.Time) bool { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? - t.toRouter <- packet + self := t.core.peers.getPorts()[0] + self.sendPacketsFrom(t, [][]byte{packet}) return true } var best *peer @@ -731,7 +735,7 @@ type switch_buffers struct { closer []closerInfo // Scratch space } -func (b *switch_buffers) cleanup(t *switchTable) { +func (b *switch_buffers) _cleanup(t *switchTable) { for streamID, buf := range b.bufs { // Remove queues for which we have no next hop packet := buf.packets[0] @@ -773,14 +777,14 @@ func (b *switch_buffers) cleanup(t *switchTable) { // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort) bool { +func (t *switchTable) _handleIdle(port switchPort) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var packets [][]byte var psize int - t.queues.cleanup(t) + t.queues._cleanup(t) now := time.Now() for psize < 65535 { var best string @@ -824,79 +828,52 @@ func (t *switchTable) handleIdle(port switchPort) bool { return false } -// The switch worker does routing lookups and sends packets to where they need to be -func (t *switchTable) doWorker() { - sendingToRouter := make(chan []byte, 1) - go func() { - // Keep sending packets to the router - self := t.core.peers.getPorts()[0] - for bs := range sendingToRouter { - // TODO remove this ugly mess of goroutines if/when the switch becomes an actor - <-self.SyncExec(func() { self._sendPackets([][]byte{bs}) }) +func (t *switchTable) _packetIn(bytes []byte) { + // Try to send it somewhere (or drop it if it's corrupt or at a dead end) + if !t._handleIn(bytes, t.idle) { + // There's nobody free to take it right now, so queue it for later + packet := switch_packetInfo{bytes, time.Now()} + streamID := switch_getPacketStreamID(packet.bytes) + buf, bufExists := t.queues.bufs[streamID] + buf.packets = append(buf.packets, packet) + buf.size += uint64(len(packet.bytes)) + t.queues.size += uint64(len(packet.bytes)) + // Keep a track of the max total queue size + if t.queues.size > t.queues.maxsize { + t.queues.maxsize = t.queues.size } - }() - go func() { - // Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer - var buf [][]byte - var size int - for { - bs := <-t.toRouter - size += len(bs) - buf = append(buf, bs) - for len(buf) > 0 { - select { - case bs := <-t.toRouter: - size += len(bs) - buf = append(buf, bs) - for size > int(t.queueTotalMaxSize) { - size -= len(buf[0]) - util.PutBytes(buf[0]) - buf = buf[1:] - } - case sendingToRouter <- buf[0]: - size -= len(buf[0]) - buf = buf[1:] - } + t.queues.bufs[streamID] = buf + if !bufExists { + // Keep a track of the max total queue count. Only recalculate this + // when the queue is new because otherwise repeating len(dict) might + // cause unnecessary processing overhead + if len(t.queues.bufs) > t.queues.maxbufs { + t.queues.maxbufs = len(t.queues.bufs) } } - }() + t.queues._cleanup(t) + } +} + +func (t *switchTable) _idleIn(port switchPort) { + // Try to find something to send to this peer + if !t._handleIdle(port) { + // Didn't find anything ready to send yet, so stay idle + t.idle[port] = time.Now() + } +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { t.queues.switchTable = t t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) - idle := make(map[switchPort]time.Time) // this is to deduplicate things for { //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) select { case bytes := <-t.packetIn: - // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t.handleIn(bytes, idle) { - // There's nobody free to take it right now, so queue it for later - packet := switch_packetInfo{bytes, time.Now()} - streamID := switch_getPacketStreamID(packet.bytes) - buf, bufExists := t.queues.bufs[streamID] - buf.packets = append(buf.packets, packet) - buf.size += uint64(len(packet.bytes)) - t.queues.size += uint64(len(packet.bytes)) - // Keep a track of the max total queue size - if t.queues.size > t.queues.maxsize { - t.queues.maxsize = t.queues.size - } - t.queues.bufs[streamID] = buf - if !bufExists { - // Keep a track of the max total queue count. Only recalculate this - // when the queue is new because otherwise repeating len(dict) might - // cause unnecessary processing overhead - if len(t.queues.bufs) > t.queues.maxbufs { - t.queues.maxbufs = len(t.queues.bufs) - } - } - t.queues.cleanup(t) - } + <-t.SyncExec(func() { t._packetIn(bytes) }) case port := <-t.idleIn: - // Try to find something to send to this peer - if !t.handleIdle(port) { - // Didn't find anything ready to send yet, so stay idle - idle[port] = time.Now() - } + <-t.SyncExec(func() { t._idleIn(port) }) case f := <-t.admin: f() case e := <-t.reconfigure: