simplify switch parent selection and minor source routing improvements

This commit is contained in:
Arceliar 2020-11-09 19:01:11 -06:00
parent 144d42c773
commit 428789f24c
3 changed files with 19 additions and 67 deletions

View File

@ -94,7 +94,6 @@ func (t *dht) reset() {
t.ping(info, nil) t.ping(info, nil)
} }
} }
t.reqs = make(map[dhtReqKey]time.Time)
t.table = make(map[crypto.NodeID]*dhtInfo) t.table = make(map[crypto.NodeID]*dhtInfo)
t.imp = nil t.imp = nil
} }

View File

@ -92,13 +92,15 @@ func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool {
if !bytes.Equal(sinfo.coords, p.Coords) { if !bytes.Equal(sinfo.coords, p.Coords) {
// allocate enough space for additional coords // allocate enough space for additional coords
sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...)
} path := switch_reverseCoordBytes(rpath)
sinfo.time = time.Now() sinfo.path = append(sinfo.path[:0], path...)
sinfo.tstamp = p.Tstamp defer sinfo._sendPingPong(false, nil)
if p.IsPong { } else if p.IsPong {
path := switch_reverseCoordBytes(rpath) path := switch_reverseCoordBytes(rpath)
sinfo.path = append(sinfo.path[:0], path...) sinfo.path = append(sinfo.path[:0], path...)
} }
sinfo.time = time.Now()
sinfo.tstamp = p.Tstamp
sinfo.reset = false sinfo.reset = false
defer func() { recover() }() // Recover if the below panics defer func() { recover() }() // Recover if the below panics
select { select {
@ -423,6 +425,8 @@ func (ss *sessions) reset() {
sinfo := _sinfo // So we can safely put it in a closure sinfo := _sinfo // So we can safely put it in a closure
sinfo.Act(ss.router, func() { sinfo.Act(ss.router, func() {
sinfo.reset = true sinfo.reset = true
sinfo._sendPingPong(false, sinfo.path)
sinfo._sendPingPong(false, nil)
}) })
} }
} }

View File

@ -20,10 +20,9 @@ import (
) )
const ( const (
switch_timeout = time.Minute switch_timeout = time.Minute
switch_updateInterval = switch_timeout / 2 switch_updateInterval = switch_timeout / 2
switch_throttle = switch_updateInterval / 2 switch_throttle = switch_updateInterval / 2
switch_faster_threshold = 240 //Number of switch updates before switching to a faster parent
) )
// The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it. // The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it.
@ -136,15 +135,14 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool {
// Information about a peer, used by the switch to build the tree and eventually make routing decisions. // Information about a peer, used by the switch to build the tree and eventually make routing decisions.
type peerInfo struct { type peerInfo struct {
key crypto.SigPubKey // ID of this peer key crypto.SigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree degree uint64 // Self-reported degree
time time.Time // Time this node was last seen time time.Time // Time this node was last seen
faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower port switchPort // Interface number of this peer
port switchPort // Interface number of this peer msg switchMsg // The wire switchMsg used
msg switchMsg // The wire switchMsg used readBlock bool // True if the link notified us of a read that blocked too long
readBlock bool // True if the link notified us of a read that blocked too long writeBlock bool // True of the link notified us of a write that blocked too long
writeBlock bool // True of the link notified us of a write that blocked too long
} }
func (pinfo *peerInfo) blocked() bool { func (pinfo *peerInfo) blocked() bool {
@ -427,37 +425,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
doUpdate := false doUpdate := false
oldSender := t.data.peers[fromPort] oldSender := t.data.peers[fromPort]
if !equiv(&sender.locator, &oldSender.locator) { if !equiv(&sender.locator, &oldSender.locator) {
// Reset faster info, we'll start refilling it right after this
sender.faster = nil
doUpdate = true doUpdate = true
} }
// Update the matrix of peer "faster" thresholds
if reprocessing { if reprocessing {
sender.faster = oldSender.faster
sender.time = oldSender.time sender.time = oldSender.time
sender.readBlock = oldSender.readBlock sender.readBlock = oldSender.readBlock
sender.writeBlock = oldSender.writeBlock sender.writeBlock = oldSender.writeBlock
} else {
sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
for port, peer := range t.data.peers {
if port == fromPort {
continue
} else if sender.locator.root != peer.locator.root || sender.locator.tstamp > peer.locator.tstamp {
// We were faster than this node, so increment, as long as we don't overflow because of it
if oldSender.faster[peer.port] < switch_faster_threshold {
sender.faster[port] = oldSender.faster[peer.port] + 1
} else {
sender.faster[port] = switch_faster_threshold
}
} else {
// Slower than this node, penalize (more than the reward amount)
if oldSender.faster[port] > 1 {
sender.faster[port] = oldSender.faster[peer.port] - 2
} else {
sender.faster[port] = 0
}
}
}
} }
if sender.blocked() != oldSender.blocked() { if sender.blocked() != oldSender.blocked() {
doUpdate = true doUpdate = true
@ -496,35 +469,11 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
case noParent: case noParent:
// We currently have no working parent, and at this point in the switch statement, anything is better than nothing. // We currently have no working parent, and at this point in the switch statement, anything is better than nothing.
updateRoot = true updateRoot = true
case sender.faster[t.parent] >= switch_faster_threshold:
// The is reliably faster than the current parent.
updateRoot = true
case !sender.blocked() && oldParent.blocked(): case !sender.blocked() && oldParent.blocked():
// Replace a blocked parent // Replace a blocked parent
updateRoot = true updateRoot = true
case reprocessing && sender.blocked() && !oldParent.blocked(): case reprocessing && sender.blocked() && !oldParent.blocked():
// Don't replace an unblocked parent when reprocessing // Don't replace an unblocked parent when reprocessing
case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
// The sender seems to be reliably faster than the current parent, so switch to them instead.
updateRoot = true
case sender.port != t.parent:
// Ignore further cases if the sender isn't our parent.
case !reprocessing && !equiv(&sender.locator, &t.data.locator):
// Special case:
// If coords changed, then we need to penalize this node somehow, to prevent flapping.
// First, reset all faster-related info to 0.
// Then, de-parent the node and reprocess all messages to find a new parent.
t.parent = 0
for _, peer := range t.data.peers {
if peer.port == sender.port {
continue
}
t._handleMsg(&peer.msg, peer.port, true)
}
// Process the sender last, to avoid keeping them as a parent if at all possible.
t._handleMsg(&sender.msg, sender.port, true)
case now.Sub(t.time) < switch_throttle:
// We've already gotten an update from this root recently, so ignore this one to avoid flooding.
case sender.locator.tstamp > t.data.locator.tstamp: case sender.locator.tstamp > t.data.locator.tstamp:
// The timestamp was updated, so we need to update locally and send to our peers. // The timestamp was updated, so we need to update locally and send to our peers.
updateRoot = true updateRoot = true