diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index bd7fecec..6d130d81 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } + if err := c.switchTable.start(); err != nil { + c.log.Println("Failed to start switch") + return err + } + if err := c.router.start(); err != nil { c.log.Println("Failed to start router") return err diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f95bfa3d..c029ae9f 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -49,6 +49,7 @@ func (c *Core) Init() { bpub, bpriv := newBoxKeys() spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) + c.switchTable.start() c.router.start() } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad99750a..d5bdc5a3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -229,19 +229,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { // Drop traffic until the peer manages to send us at least one good switchMsg return } - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - return - } // No payload - toPort := p.core.switchTable.lookup(coords) - if toPort == p.port { - return - } - to := p.core.peers.getPorts()[toPort] - if to == nil { - return - } - to.sendPacket(packet) + p.core.switchTable.packetIn <- packet } // This just calls p.out(packet) for now. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d3b28710..d2026aec 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -155,15 +155,16 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { - core *Core - key sigPubKey // Our own key - time time.Time // Time when locator.tstamp was last updated - parent switchPort // Port of whatever peer is our parent, or self if we're root - drop map[sigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData - data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable + core *Core + key sigPubKey // Our own key + time time.Time // Time when locator.tstamp was last updated + parent switchPort // Port of whatever peer is our parent, or self if we're root + drop map[sigPubKey]int64 // Tstamp associated with a dropped root + mutex sync.RWMutex // Lock for reads/writes of switchData + data switchData + updater atomic.Value //*sync.Once + table atomic.Value //lookupTable + packetIn chan []byte // Incoming packets for the worker to handle } // Initializes the switchTable struct. @@ -177,6 +178,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[sigPubKey]int64) + t.packetIn = make(chan []byte, 1024) } // Safely gets a copy of this node's locator. @@ -438,6 +440,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { return } +//////////////////////////////////////////////////////////////////////////////// + +// The rest of these are related to the switch worker + // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. func (t *switchTable) updateTable() { // WARNING this should only be called from within t.data.updater.Do() @@ -506,3 +512,34 @@ func (t *switchTable) lookup(dest []byte) switchPort { } return best } + +// Starts the switch worker +func (t *switchTable) start() error { + t.core.log.Println("Starting switch") + go t.doWorker() + return nil +} + +func (t *switchTable) handleIn(packet []byte) { + // Get the coords, skipping the first byte (the pType) + _, pTypeLen := wire_decode_uint64(packet) + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { + util_putBytes(packet) + return + } // No payload + toPort := t.lookup(coords) + to := t.core.peers.getPorts()[toPort] + if to == nil { + util_putBytes(packet) + return + } + to.sendPacket(packet) +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { + for packet := range t.packetIn { + t.handleIn(packet) + } +}