From 88161009e94740511fb25b73dbb5b19c68f00af1 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 24 Aug 2019 12:55:49 -0500 Subject: [PATCH] more peer migration --- src/yggdrasil/link.go | 3 ++- src/yggdrasil/peer.go | 8 +++++++- src/yggdrasil/router.go | 7 ++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 4ce374b4..824afd34 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -387,7 +387,8 @@ func (intf *linkInterface) handler() error { for { msg, err := intf.msgIO.readMsg() if len(msg) > 0 { - intf.peer.handlePacket(msg) + // TODO rewrite this if the link becomes an actor + <-intf.peer.SyncExec(func() { intf.peer._handlePacket(msg) }) } if err != nil { if err != io.EOF { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 851a376a..22db88d1 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -219,9 +219,15 @@ func (p *peer) linkLoop() { } } +func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) { + p.EnqueueFrom(from, func() { + p._handlePacket(packet) + }) +} + // Called to handle incoming packets. // Passes the packet to a handler for that packet type. -func (p *peer) handlePacket(packet []byte) { +func (p *peer) _handlePacket(packet []byte) { // FIXME this is off by stream padding and msg length overhead, should be done in tcp.go atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) pType, pTypeLen := wire_decode_uint64(packet) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 464a4778..adf1b1d4 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -64,11 +64,8 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packets [][]byte) { - // TODO make peers and/or the switch into actors, have them pass themselves as the from field - r.handlePackets(nil, packets) - } - r.out = p.handlePacket // TODO if the peer becomes its own actor, then send a message here + p.out = func(packets [][]byte) { r.handlePackets(p, packets) } + r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)