more peer migration

This commit is contained in:
Arceliar 2019-08-24 13:15:29 -05:00
parent ecd23ce9fc
commit 034fece33f
2 changed files with 21 additions and 33 deletions

View File

@ -110,9 +110,9 @@ type peer struct {
endpoint string
firstSeen time.Time // To track uptime for getPeers
linkOut (chan []byte) // used for protocol traffic (to bypass queues)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo (chan *dhtInfo) // used to keep the DHT working
dinfo *dhtInfo // used to keep the DHT working
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
done (chan struct{}) // closed to exit the linkLoop
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
}
@ -124,8 +124,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
linkShared: *linkShared,
firstSeen: now,
doSend: make(chan struct{}, 1),
dinfo: make(chan *dhtInfo, 1),
done: make(chan struct{}),
close: closer,
core: ps.core,
intf: intf,
@ -170,29 +169,19 @@ func (ps *peers) removePeer(port switchPort) {
if p.close != nil {
p.close()
}
close(p.doSend)
close(p.done)
}
}
// If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update.
func (ps *peers) sendSwitchMsgs() {
func (ps *peers) sendSwitchMsgs(from phony.IActor) {
ports := ps.getPorts()
for _, p := range ports {
if p.port == 0 {
continue
}
p.doSendSwitchMsgs()
}
}
// If called, sends a notification to the peer's linkLoop to trigger a switchMsg send.
// Mainly called by sendSwitchMsgs or during linkLoop startup.
func (p *peer) doSendSwitchMsgs() {
defer func() { recover() }() // In case there's a race with close(p.doSend)
select {
case p.doSend <- struct{}{}:
default:
p.EnqueueFrom(from, p._sendSwitchMsg)
}
}
@ -201,24 +190,23 @@ func (p *peer) doSendSwitchMsgs() {
func (p *peer) linkLoop() {
tick := time.NewTicker(time.Second)
defer tick.Stop()
p.doSendSwitchMsgs()
var dinfo *dhtInfo
<-p.SyncExec(p._sendSwitchMsg) // Startup message
for {
select {
case _, ok := <-p.doSend:
if !ok {
return
}
<-p.SyncExec(p._sendSwitchMsg)
case dinfo = <-p.dinfo:
case <-p.done:
return
case _ = <-tick.C:
if dinfo != nil {
<-p.SyncExec(func() { p.core.router.insertPeer(p, dinfo) })
}
<-p.SyncExec(p._updateDHT)
}
}
}
func (p *peer) _updateDHT() {
if p.dinfo != nil {
p.core.router.insertPeer(p, p.dinfo)
}
}
func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) {
p.EnqueueFrom(from, func() {
p._handlePacket(packet)
@ -366,16 +354,16 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
p.core.switchTable.handleMsg(&msg, p.port)
if !p.core.switchTable.checkRoot(&msg) {
// Bad switch message
p.dinfo <- nil
p.dinfo = nil
return
}
// Pass a mesage to the dht informing it that this peer (still) exists
loc.coords = loc.coords[:len(loc.coords)-1]
dinfo := dhtInfo{
p.dinfo = &dhtInfo{
key: p.box,
coords: loc.getCoords(),
}
p.dinfo <- &dinfo
p._updateDHT()
}
// This generates the bytes that we sign or check the signature of for a switchMsg.

View File

@ -248,7 +248,7 @@ func (t *switchTable) cleanRoot() {
t.core.router.reset(nil)
}
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t.core.peers.sendSwitchMsgs()
t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor
}
}
@ -515,7 +515,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
}
t.data.locator = sender.locator
t.parent = sender.port
t.core.peers.sendSwitchMsgs()
t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor
}
if doUpdate {
t.updater.Store(&sync.Once{})