From e926a3be6d2b9d475dd6b7a5677ec4442033ff74 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 00:23:38 -0500 Subject: [PATCH] work in progress actorizing core.peers and replacing switch worker with per-peer switch-generated lookupTable --- contrib/ansible/genkeys.go | 4 +- src/yggdrasil/api.go | 23 ++-- src/yggdrasil/link.go | 7 +- src/yggdrasil/nodeinfo.go | 4 +- src/yggdrasil/peer.go | 119 ++++++++++-------- src/yggdrasil/router.go | 6 +- src/yggdrasil/switch.go | 244 +++++++++++++++++++------------------ 7 files changed, 221 insertions(+), 186 deletions(-) diff --git a/contrib/ansible/genkeys.go b/contrib/ansible/genkeys.go index 1d7c222d..681431b5 100644 --- a/contrib/ansible/genkeys.go +++ b/contrib/ansible/genkeys.go @@ -12,9 +12,9 @@ import ( "net" "os" + "github.com/cheggaaa/pb/v3" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/cheggaaa/pb/v3" ) var numHosts = flag.Int("hosts", 1, "number of host vars to generate") @@ -30,7 +30,7 @@ type keySet struct { func main() { flag.Parse() - bar := pb.StartNew(*keyTries * 2 + *numHosts) + bar := pb.StartNew(*keyTries*2 + *numHosts) if *numHosts > *keyTries { println("Can't generate less keys than hosts.") diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 4a6ae417..15e2acd6 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -110,7 +110,8 @@ type Session struct { // there is exactly one entry then this node is not connected to any other nodes // and is therefore isolated. func (c *Core) GetPeers() []Peer { - ports := c.peers.ports.Load().(map[switchPort]*peer) + var ports map[switchPort]*peer + phony.Block(&c.peers, func() { ports = c.peers.ports }) var peers []Peer var ps []switchPort for port := range ports { @@ -143,10 +144,14 @@ func (c *Core) GetPeers() []Peer { // isolated or not connected to any peers. func (c *Core) GetSwitchPeers() []SwitchPeer { var switchpeers []SwitchPeer - table := c.switchTable.table.Load().(lookupTable) - peers := c.peers.ports.Load().(map[switchPort]*peer) + var table *lookupTable + var ports map[switchPort]*peer + phony.Block(&c.peers, func() { + table = c.peers.table + ports = c.peers.ports + }) for _, elem := range table.elems { - peer, isIn := peers[elem.port] + peer, isIn := ports[elem.port] if !isIn { continue } @@ -325,8 +330,8 @@ func (c *Core) EncryptionPublicKey() string { // connected to any other nodes (effectively making you the root of a // single-node network). func (c *Core) Coords() []uint64 { - table := c.switchTable.table.Load().(lookupTable) - return wire_coordsBytestoUint64s(table.self.getCoords()) + loc := c.switchTable.getLocator() + return wire_coordsBytestoUint64s(loc.getCoords()) } // Address gets the IPv6 address of the Yggdrasil node. This is always a /128 @@ -490,7 +495,11 @@ func (c *Core) CallPeer(addr string, sintf string) error { // DisconnectPeer disconnects a peer once. This should be specified as a port // number. func (c *Core) DisconnectPeer(port uint64) error { - c.peers.removePeer(switchPort(port)) + c.peers.Act(nil, func() { + if p, isIn := c.peers.ports[switchPort(port)]; isIn { + p.Act(&c.peers, p._removeSelf) + } + }) return nil } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index fb40fc08..fa6563f1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -217,13 +217,16 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + phony.Block(&intf.link.core.peers, func() { + // FIXME don't use phony.Block, it's bad practice, even if it's safe here + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + }) if intf.peer == nil { return errors.New("failed to create peer") } defer func() { // More cleanup can go here - intf.link.core.peers.removePeer(intf.peer.port) + intf.peer.Act(nil, intf.peer._removeSelf) }() intf.peer.out = func(msgs [][]byte) { intf.writer.sendFrom(intf.peer, msgs, false) diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index fc6250d6..745756fe 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -187,9 +187,9 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse } func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { - table := m.core.switchTable.table.Load().(lookupTable) + loc := m.core.switchTable.getLocator() nodeinfo := nodeinfoReqRes{ - SendCoords: table.self.getCoords(), + SendCoords: loc.getCoords(), IsResponse: isResponse, NodeInfo: m._getNodeInfo(), } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4206857e..7fa2b317 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -6,8 +6,6 @@ package yggdrasil import ( "encoding/hex" - "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -21,17 +19,17 @@ import ( // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { + phony.Inbox core *Core - mutex sync.Mutex // Synchronize writes to atomic - ports atomic.Value //map[switchPort]*peer, use CoW semantics + ports map[switchPort]*peer // use CoW semantics, share updated version with each peer + table *lookupTable // Sent from switch, share updated version with each peer } // Initializes the peers struct. func (ps *peers) init(c *Core) { - ps.mutex.Lock() - defer ps.mutex.Unlock() - ps.putPorts(make(map[switchPort]*peer)) ps.core = c + ps.ports = make(map[switchPort]*peer) + ps.table = new(lookupTable) } func (ps *peers) reconfigure() { @@ -80,16 +78,6 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } -// Atomically gets a map[switchPort]*peer of known peers. -func (ps *peers) getPorts() map[switchPort]*peer { - return ps.ports.Load().(map[switchPort]*peer) -} - -// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time). -func (ps *peers) putPorts(ports map[switchPort]*peer) { - ps.ports.Store(ports) -} - // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox @@ -110,10 +98,31 @@ type peer struct { // The below aren't actually useful internally, they're just gathered for getPeers statistics bytesSent uint64 bytesRecvd uint64 + ports map[switchPort]*peer + table *lookupTable +} + +func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { + ps.Act(from, func() { + ps.table = table + ps._updatePeers() + }) +} + +func (ps *peers) _updatePeers() { + ports := ps.ports + table := ps.table + for _, peer := range ps.ports { + p := peer // peer is mutated during iteration + p.Act(ps, func() { + p.ports = ports + p.table = table + }) + } } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -125,9 +134,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare core: ps.core, intf: intf, } - ps.mutex.Lock() - defer ps.mutex.Unlock() - oldPorts := ps.getPorts() + oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { newPorts[k] = v @@ -139,46 +146,49 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare break } } - ps.putPorts(newPorts) + ps.ports = newPorts + ps._updatePeers() return &p } -// Removes a peer for a given port, if one exists. -func (ps *peers) removePeer(port switchPort) { - if port == 0 { - return - } // Can't remove self peer - phony.Block(&ps.core.router, func() { - ps.core.switchTable.forgetPeer(port) +func (p *peer) _removeSelf() { + p.core.peers.Act(p, func() { + p.core.peers._removePeer(p) }) - ps.mutex.Lock() - oldPorts := ps.getPorts() - p, isIn := oldPorts[port] +} + +// Removes a peer for a given port, if one exists. +func (ps *peers) _removePeer(p *peer) { + if q := ps.ports[p.port]; p.port == 0 || q != p { + return + } // Can't remove self peer or nonexistant peer + ps.core.switchTable.forgetPeer(p.port) + oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { newPorts[k] = v } - delete(newPorts, port) - ps.putPorts(newPorts) - ps.mutex.Unlock() - if isIn { - if p.close != nil { - p.close() - } - close(p.done) + delete(newPorts, p.port) + if p.close != nil { + p.close() } + close(p.done) + ps.ports = newPorts + ps._updatePeers() } // If called, sends a notification to each peer that they should send a new switch message. // Mainly called by the switch after an update. func (ps *peers) sendSwitchMsgs(from phony.Actor) { - ports := ps.getPorts() - for _, p := range ports { - if p.port == 0 { - continue + ps.Act(from, func() { + for _, peer := range ps.ports { + p := peer + if p.port == 0 { + continue + } + p.Act(ps, p._sendSwitchMsg) } - p.Act(from, p._sendSwitchMsg) - } + }) } // This must be launched in a separate goroutine by whatever sets up the peer struct. @@ -236,12 +246,16 @@ func (p *peer) _handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { - table := p.core.switchTable.getTable() - if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + if _, isIn := p.table.elems[p.port]; !isIn && p.port != 0 { // Drop traffic if the peer isn't in the switch return } - p.core.switchTable.packetInFrom(p, packet) + coords := switch_getPacketCoords(packet) + next := p.table.lookup(coords) + if nPeer, isIn := p.ports[next]; isIn { + nPeer.sendPacketsFrom(p, [][]byte{packet}) + } + //p.core.switchTable.packetInFrom(p, packet) } func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { @@ -259,6 +273,7 @@ func (p *peer) _sendPackets(packets [][]byte) { size += len(packet) } p.bytesSent += uint64(size) + // FIXME need to manage queues here or else things can block! p.out(packets) } @@ -335,7 +350,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) { return } if len(msg.Hops) < 1 { - p.core.peers.removePeer(p.port) + p._removeSelf() + return } var loc switchLocator prevKey := msg.Root @@ -346,7 +362,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) { loc.coords = append(loc.coords, hop.Port) bs := getBytesForSig(&hop.Next, &sigMsg) if !crypto.Verify(&prevKey, bs, &hop.Sig) { - p.core.peers.removePeer(p.port) + p._removeSelf() + return } prevKey = hop.Next } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b08a12d3..ac4d655d 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -61,7 +61,11 @@ func (r *router) init(core *Core) { linkType: "self", }, } - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) + var p *peer + phony.Block(&r.core.peers, func() { + // FIXME don't block here! + p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) + }) p.out = func(packets [][]byte) { r.handlePackets(p, packets) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ce5e3db6..33f2a1bd 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,13 +12,12 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - "math/rand" + //"math/rand" "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" + //"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -172,8 +171,6 @@ type switchTable struct { mutex sync.RWMutex // Lock for reads/writes of switchData parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // - updater atomic.Value // *sync.Once - table atomic.Value // lookupTable phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor @@ -190,8 +187,6 @@ func (t *switchTable) init(core *Core) { locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} - t.updater.Store(&sync.Once{}) - t.table.Store(lookupTable{}) t.drop = make(map[crypto.SigPubKey]int64) phony.Block(t, func() { core.config.Mutex.RLock() @@ -204,6 +199,7 @@ func (t *switchTable) init(core *Core) { t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) }) + t.updateTable() } func (t *switchTable) reconfigure() { @@ -254,7 +250,7 @@ func (t *switchTable) cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - t.updater.Store(&sync.Once{}) + t.updateTable() t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} @@ -292,7 +288,7 @@ func (t *switchTable) forgetPeer(port switchPort) { t.mutex.Lock() defer t.mutex.Unlock() delete(t.data.peers, port) - t.updater.Store(&sync.Once{}) + defer t.updateTable() if port != t.parent { return } @@ -528,7 +524,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { - t.updater.Store(&sync.Once{}) + t.updateTable() } return } @@ -566,13 +562,7 @@ func (t *switchTable) updateTable() { time: pinfo.time, } } - t.table.Store(newTable) -} - -// Returns a copy of the atomically-updated table used for switch lookups -func (t *switchTable) getTable() lookupTable { - t.updater.Load().(*sync.Once).Do(t.updateTable) - return t.table.Load().(lookupTable) + t.core.peers.updateTables(nil, &newTable) // TODO not be from nil } // Starts the switch worker @@ -589,6 +579,7 @@ type closerInfo struct { // Return a map of ports onto distance, keeping only ports closer to the destination than this node // If the map is empty (or nil), then no peer is closer +/* func (t *switchTable) getCloser(dest []byte) []closerInfo { table := t.getTable() myDist := table.self.dist(dest) @@ -605,8 +596,10 @@ func (t *switchTable) getCloser(dest []byte) []closerInfo { } return closer } +*/ // Returns true if the peer is closer to the destination than ourself +/* func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { table := t.getTable() if info, isIn := table.elems[port]; isIn { @@ -617,6 +610,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { return false } } +*/ // Get the coords of a packet without decoding func switch_getPacketCoords(packet []byte) []byte { @@ -686,23 +680,26 @@ func (t *lookupTable) lookup(coords []byte) switchPort { // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { - coords := switch_getPacketCoords(packet) - table := t.getTable() - port := table.lookup(coords) - ports := t.core.peers.getPorts() - peer := ports[port] - if peer == nil { - // FIXME hack, if the peer disappeared durring a race then don't buffer - return true, 0 - } - if _, isIdle := idle[port]; isIdle || port == 0 { - // Either no closer peers, or the closest peer is idle - delete(idle, port) - peer.sendPacketsFrom(t, [][]byte{packet}) - return true, port - } - // There's a closer peer, but it's not idle, so buffer it - return false, port + /* + coords := switch_getPacketCoords(packet) + table := t.getTable() + port := table.lookup(coords) + ports := t.core.peers.getPorts() + peer := ports[port] + if peer == nil { + // FIXME hack, if the peer disappeared durring a race then don't buffer + return true, 0 + } + if _, isIdle := idle[port]; isIdle || port == 0 { + // Either no closer peers, or the closest peer is idle + delete(idle, port) + peer.sendPacketsFrom(t, [][]byte{packet}) + return true, port + } + // There's a closer peer, but it's not idle, so buffer it + return false, port + */ + return true, 0 } // Info about a buffered packet @@ -726,52 +723,54 @@ type switch_buffers struct { } func (b *switch_buffers) _cleanup(t *switchTable) { - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) - } - b.size -= buf.size - delete(pbufs, streamID) - } - } - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } - - for b.size > b.totalMaxSize { - // Drop a random queue - target := rand.Uint64() % b.size - var size uint64 // running total + /* for port, pbufs := range b.bufs { for streamID, buf := range pbufs { - size += buf.size - if size < target { - continue - } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(pbufs, streamID) - if len(pbufs) == 0 { - delete(b.bufs, port) + // Remove queues for which we have no next hop + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if len(t.getCloser(coords)) == 0 { + for _, packet := range buf.packets { + util.PutBytes(packet.bytes) } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[streamID] = buf + b.size -= buf.size + delete(pbufs, streamID) } - break + } + if len(pbufs) == 0 { + delete(b.bufs, port) } } - } + + for b.size > b.totalMaxSize { + // Drop a random queue + target := rand.Uint64() % b.size + var size uint64 // running total + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + size += buf.size + if size < target { + continue + } + var packet switch_packetInfo + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + b.size -= uint64(len(packet.bytes)) + util.PutBytes(packet.bytes) + if len(buf.packets) == 0 { + delete(pbufs, streamID) + if len(pbufs) == 0 { + delete(b.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[streamID] = buf + } + break + } + } + } + */ } // Handles incoming idle notifications @@ -779,57 +778,60 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Returns true if the peer is no longer idle, false if it should be added to the idle list func (t *switchTable) _handleIdle(port switchPort) bool { // TODO? only send packets for which this is the best next hop that isn't currently blocked sending - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - var packets [][]byte - var psize int - t.queues._cleanup(t) - now := time.Now() - pbufs := t.queues.bufs[port] - for psize < 65535 { - var best *string - var bestPriority float64 - for streamID, buf := range pbufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority { - b := streamID // copy since streamID is mutated in the loop - best = &b - bestPriority = priority - } + /* + to := t.core.peers.getPorts()[port] + if to == nil { + return true } - if best != nil { - buf := pbufs[*best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(pbufs, *best) - if len(pbufs) == 0 { - delete(t.queues.bufs, port) + var packets [][]byte + var psize int + t.queues._cleanup(t) + now := time.Now() + pbufs := t.queues.bufs[port] + for psize < 65535 { + var best *string + var bestPriority float64 + for streamID, buf := range pbufs { + // Filter over the streams that this node is closer to + // Keep the one with the smallest queue + packet := buf.packets[0] + priority := float64(now.Sub(packet.time)) / float64(buf.size) + if priority >= bestPriority { + b := streamID // copy since streamID is mutated in the loop + best = &b + bestPriority = priority } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[*best] = buf - } - packets = append(packets, packet.bytes) - psize += len(packet.bytes) - } else { - // Finished finding packets - break + if best != nil { + buf := pbufs[*best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + t.queues.size -= uint64(len(packet.bytes)) + if len(buf.packets) == 0 { + delete(pbufs, *best) + if len(pbufs) == 0 { + delete(t.queues.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[*best] = buf + + } + packets = append(packets, packet.bytes) + psize += len(packet.bytes) + } else { + // Finished finding packets + break + } } - } - if len(packets) > 0 { - to.sendPacketsFrom(t, packets) - return true - } + if len(packets) > 0 { + to.sendPacketsFrom(t, packets) + return true + } + return false + */ return false }