more peer migration

This commit is contained in:
Arceliar 2019-08-24 12:55:49 -05:00
parent 775fb535dc
commit 88161009e9
3 changed files with 11 additions and 7 deletions

View File

@ -387,7 +387,8 @@ func (intf *linkInterface) handler() error {
for { for {
msg, err := intf.msgIO.readMsg() msg, err := intf.msgIO.readMsg()
if len(msg) > 0 { if len(msg) > 0 {
intf.peer.handlePacket(msg) // TODO rewrite this if the link becomes an actor
<-intf.peer.SyncExec(func() { intf.peer._handlePacket(msg) })
} }
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {

View File

@ -219,9 +219,15 @@ func (p *peer) linkLoop() {
} }
} }
func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) {
p.EnqueueFrom(from, func() {
p._handlePacket(packet)
})
}
// Called to handle incoming packets. // Called to handle incoming packets.
// Passes the packet to a handler for that packet type. // Passes the packet to a handler for that packet type.
func (p *peer) handlePacket(packet []byte) { func (p *peer) _handlePacket(packet []byte) {
// FIXME this is off by stream padding and msg length overhead, should be done in tcp.go // FIXME this is off by stream padding and msg length overhead, should be done in tcp.go
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) atomic.AddUint64(&p.bytesRecvd, uint64(len(packet)))
pType, pTypeLen := wire_decode_uint64(packet) pType, pTypeLen := wire_decode_uint64(packet)

View File

@ -64,11 +64,8 @@ func (r *router) init(core *Core) {
}, },
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packets [][]byte) { p.out = func(packets [][]byte) { r.handlePackets(p, packets) }
// TODO make peers and/or the switch into actors, have them pass themselves as the from field r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.handlePackets(nil, packets)
}
r.out = p.handlePacket // TODO if the peer becomes its own actor, then send a message here
r.nodeinfo.init(r.core) r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock() r.core.config.Mutex.RLock()
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy) r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)