2017-12-29 07:16:20 +03:00
package yggdrasil
// This part constructs a spanning tree of the network
// It routes packets based on distance on the spanning tree
// In general, this is *not* equivalent to routing on the tree
// It falls back to the tree in the worst case, but it can take shortcuts too
// This is the part that makse routing reasonably efficient on scale-free graphs
// TODO document/comment everything in a lot more detail
// TODO? use a pre-computed lookup table (python version had this)
2018-06-07 22:24:02 +03:00
// A little annoying to do with constant changes from backpressure
2017-12-29 07:16:20 +03:00
import "time"
2018-05-27 21:37:35 +03:00
import "sort"
2017-12-29 07:16:20 +03:00
import "sync"
import "sync/atomic"
//import "fmt"
const switch_timeout = time . Minute
2018-02-18 08:57:24 +03:00
const switch_updateInterval = switch_timeout / 2
const switch_throttle = switch_updateInterval / 2
2017-12-29 07:16:20 +03:00
// You should be able to provide crypto signatures for this
// 1 signature per coord, from the *sender* to that coord
// E.g. A->B->C has sigA(A->B) and sigB(A->B->C)
type switchLocator struct {
2018-01-05 01:37:51 +03:00
root sigPubKey
tstamp int64
coords [ ] switchPort
2017-12-29 07:16:20 +03:00
}
func firstIsBetter ( first , second * sigPubKey ) bool {
2018-01-05 01:37:51 +03:00
// Higher TreeID is better
ftid := getTreeID ( first )
stid := getTreeID ( second )
for idx := 0 ; idx < len ( ftid ) ; idx ++ {
if ftid [ idx ] == stid [ idx ] {
continue
}
return ftid [ idx ] > stid [ idx ]
}
// Edge case, when comparing identical IDs
return false
2017-12-29 07:16:20 +03:00
}
func ( l * switchLocator ) clone ( ) switchLocator {
2018-01-05 01:37:51 +03:00
// Used to create a deep copy for use in messages
// Copy required because we need to mutate coords before sending
// (By appending the port from us to the destination)
loc := * l
loc . coords = make ( [ ] switchPort , len ( l . coords ) , len ( l . coords ) + 1 )
copy ( loc . coords , l . coords )
return loc
2017-12-29 07:16:20 +03:00
}
func ( l * switchLocator ) dist ( dest [ ] byte ) int {
2018-01-05 01:37:51 +03:00
// Returns distance (on the tree) from these coords
offset := 0
fdc := 0
for {
if fdc >= len ( l . coords ) {
break
}
coord , length := wire_decode_uint64 ( dest [ offset : ] )
if length == 0 {
break
}
if l . coords [ fdc ] != switchPort ( coord ) {
break
}
fdc ++
offset += length
}
dist := len ( l . coords [ fdc : ] )
for {
_ , length := wire_decode_uint64 ( dest [ offset : ] )
if length == 0 {
break
}
dist ++
offset += length
}
return dist
2017-12-29 07:16:20 +03:00
}
func ( l * switchLocator ) getCoords ( ) [ ] byte {
2018-01-05 01:37:51 +03:00
bs := make ( [ ] byte , 0 , len ( l . coords ) )
for _ , coord := range l . coords {
c := wire_encode_uint64 ( uint64 ( coord ) )
bs = append ( bs , c ... )
}
return bs
2017-12-29 07:16:20 +03:00
}
func ( x * switchLocator ) isAncestorOf ( y * switchLocator ) bool {
2018-01-05 01:37:51 +03:00
if x . root != y . root {
return false
}
if len ( x . coords ) > len ( y . coords ) {
return false
}
for idx := range x . coords {
if x . coords [ idx ] != y . coords [ idx ] {
return false
}
}
return true
2017-12-29 07:16:20 +03:00
}
type peerInfo struct {
2018-01-05 01:37:51 +03:00
key sigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree
time time . Time // Time this node was last seen
firstSeen time . Time
port switchPort // Interface number of this peer
2018-06-07 22:13:31 +03:00
msg switchMsg // The wire switchMsg used
2017-12-29 07:16:20 +03:00
}
type switchPort uint64
type tableElem struct {
2018-05-17 01:48:53 +03:00
port switchPort
2018-05-16 07:57:00 +03:00
locator switchLocator
2017-12-29 07:16:20 +03:00
}
type lookupTable struct {
2018-01-05 01:37:51 +03:00
self switchLocator
2018-01-19 03:48:34 +03:00
elems [ ] tableElem
2017-12-29 07:16:20 +03:00
}
type switchData struct {
2018-01-05 01:37:51 +03:00
// All data that's mutable and used by exported Table methods
// To be read/written with atomic.Value Store/Load calls
locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map [ switchPort ] peerInfo
2018-06-07 21:56:11 +03:00
msg * switchMsg
2017-12-29 07:16:20 +03:00
}
type switchTable struct {
2018-01-05 01:37:51 +03:00
core * Core
key sigPubKey // Our own key
time time . Time // Time when locator.tstamp was last updated
parent switchPort // Port of whatever peer is our parent, or self if we're root
drop map [ sigPubKey ] int64 // Tstamp associated with a dropped root
mutex sync . RWMutex // Lock for reads/writes of switchData
data switchData
updater atomic . Value //*sync.Once
table atomic . Value //lookupTable
2017-12-29 07:16:20 +03:00
}
func ( t * switchTable ) init ( core * Core , key sigPubKey ) {
2018-01-05 01:37:51 +03:00
now := time . Now ( )
t . core = core
t . key = key
locator := switchLocator { root : key , tstamp : now . Unix ( ) }
peers := make ( map [ switchPort ] peerInfo )
t . data = switchData { locator : locator , peers : peers }
t . updater . Store ( & sync . Once { } )
2018-01-19 03:48:34 +03:00
t . table . Store ( lookupTable { } )
2018-01-05 01:37:51 +03:00
t . drop = make ( map [ sigPubKey ] int64 )
2018-05-28 00:13:37 +03:00
}
2017-12-29 07:16:20 +03:00
func ( t * switchTable ) getLocator ( ) switchLocator {
2018-01-05 01:37:51 +03:00
t . mutex . RLock ( )
defer t . mutex . RUnlock ( )
return t . data . locator . clone ( )
2017-12-29 07:16:20 +03:00
}
2018-06-07 07:10:33 +03:00
func ( t * switchTable ) doMaintenance ( ) {
2018-01-05 01:37:51 +03:00
// Periodic maintenance work to keep things internally consistent
t . mutex . Lock ( ) // Write lock
defer t . mutex . Unlock ( ) // Release lock when we're done
t . cleanRoot ( )
t . cleanDropped ( )
2017-12-29 07:16:20 +03:00
}
func ( t * switchTable ) cleanRoot ( ) {
2018-01-05 01:37:51 +03:00
// TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out
now := time . Now ( )
doUpdate := false
//fmt.Println("DEBUG clean root:", now.Sub(t.time))
if now . Sub ( t . time ) > switch_timeout {
//fmt.Println("root timed out", t.data.locator)
dropped := t . data . peers [ t . parent ]
dropped . time = t . time
t . drop [ t . data . locator . root ] = t . data . locator . tstamp
doUpdate = true
//t.core.log.Println("DEBUG: switch root timeout", len(t.drop))
}
// Or, if we're better than our root, root ourself
if firstIsBetter ( & t . key , & t . data . locator . root ) {
//fmt.Println("root is worse than us", t.data.locator.Root)
doUpdate = true
//t.core.log.Println("DEBUG: switch root replace with self", t.data.locator.Root)
}
// Or, if we are the root, possibly update our timestamp
if t . data . locator . root == t . key &&
2018-02-18 08:57:24 +03:00
now . Sub ( t . time ) > switch_updateInterval {
2018-01-05 01:37:51 +03:00
//fmt.Println("root is self and old, updating", t.data.locator.Root)
doUpdate = true
}
if doUpdate {
t . parent = switchPort ( 0 )
t . time = now
if t . data . locator . root != t . key {
t . data . seq ++
t . updater . Store ( & sync . Once { } )
select {
case t . core . router . reset <- struct { } { } :
default :
}
}
t . data . locator = switchLocator { root : t . key , tstamp : now . Unix ( ) }
2018-06-07 08:16:47 +03:00
t . core . peers . sendSwitchMsgs ( )
2018-01-05 01:37:51 +03:00
}
2017-12-29 07:16:20 +03:00
}
2018-06-07 07:23:16 +03:00
func ( t * switchTable ) removePeer ( port switchPort ) {
delete ( t . data . peers , port )
t . updater . Store ( & sync . Once { } )
2018-06-07 18:58:24 +03:00
// TODO if parent, find a new peer to use as parent instead
2018-06-08 00:49:51 +03:00
for _ , info := range t . data . peers {
t . unlockedHandleMsg ( & info . msg , info . port )
}
2017-12-29 07:16:20 +03:00
}
func ( t * switchTable ) cleanDropped ( ) {
2018-01-27 02:30:51 +03:00
// TODO? only call this after root changes, not periodically
2018-01-05 01:37:51 +03:00
for root := range t . drop {
if ! firstIsBetter ( & root , & t . data . locator . root ) {
delete ( t . drop , root )
}
}
2017-12-29 07:16:20 +03:00
}
2018-06-07 21:56:11 +03:00
type switchMsg struct {
Root sigPubKey
TStamp int64
Hops [ ] switchMsgHop
}
type switchMsgHop struct {
Port switchPort
Next sigPubKey
Sig sigBytes
}
func ( t * switchTable ) getMsg ( ) * switchMsg {
t . mutex . RLock ( )
defer t . mutex . RUnlock ( )
if t . parent == 0 {
return & switchMsg { Root : t . key , TStamp : t . data . locator . tstamp }
} else if parent , isIn := t . data . peers [ t . parent ] ; isIn {
2018-06-07 22:13:31 +03:00
msg := parent . msg
2018-06-07 21:56:11 +03:00
msg . Hops = append ( [ ] switchMsgHop ( nil ) , msg . Hops ... )
return & msg
} else {
return nil
}
}
2018-06-09 01:33:16 +03:00
func ( t * switchTable ) checkRoot ( msg * switchMsg ) bool {
// returns false if it's a dropped root, not a better root, or has an older timestamp
// returns true otherwise
// used elsewhere to keep inserting peers into the dht only if root info is OK
t . mutex . RLock ( )
defer t . mutex . RUnlock ( )
dropTstamp , isIn := t . drop [ msg . Root ]
switch {
case isIn && dropTstamp >= msg . TStamp :
return false
case firstIsBetter ( & msg . Root , & t . data . locator . root ) :
return true
case t . data . locator . root != msg . Root :
return false
case t . data . locator . tstamp > msg . TStamp :
return false
default :
return true
}
}
2018-06-07 22:13:31 +03:00
func ( t * switchTable ) handleMsg ( msg * switchMsg , fromPort switchPort ) {
2018-01-05 01:37:51 +03:00
t . mutex . Lock ( )
defer t . mutex . Unlock ( )
2018-06-08 00:49:51 +03:00
t . unlockedHandleMsg ( msg , fromPort )
}
func ( t * switchTable ) unlockedHandleMsg ( msg * switchMsg , fromPort switchPort ) {
// TODO directly use a switchMsg instead of switchMessage + sigs
2018-01-05 01:37:51 +03:00
now := time . Now ( )
2018-06-07 22:13:31 +03:00
// Set up the sender peerInfo
var sender peerInfo
sender . locator . root = msg . Root
sender . locator . tstamp = msg . TStamp
prevKey := msg . Root
for _ , hop := range msg . Hops {
2018-06-07 21:56:11 +03:00
// Build locator and signatures
var sig sigInfo
sig . next = hop . Next
sig . sig = hop . Sig
2018-06-07 22:13:31 +03:00
sender . locator . coords = append ( sender . locator . coords , hop . Port )
sender . key = prevKey
2018-06-07 21:56:11 +03:00
prevKey = hop . Next
}
2018-06-07 22:13:31 +03:00
sender . msg = * msg
2018-01-05 01:37:51 +03:00
oldSender , isIn := t . data . peers [ fromPort ]
if ! isIn {
oldSender . firstSeen = now
}
2018-06-07 22:13:31 +03:00
sender . firstSeen = oldSender . firstSeen
sender . port = fromPort
sender . time = now
// Decide what to do
2018-01-05 01:37:51 +03:00
equiv := func ( x * switchLocator , y * switchLocator ) bool {
if x . root != y . root {
return false
}
if len ( x . coords ) != len ( y . coords ) {
return false
}
for idx := range x . coords {
if x . coords [ idx ] != y . coords [ idx ] {
return false
}
}
return true
}
doUpdate := false
2018-06-07 22:13:31 +03:00
if ! equiv ( & sender . locator , & oldSender . locator ) {
2018-01-05 01:37:51 +03:00
doUpdate = true
2018-06-07 18:58:24 +03:00
//sender.firstSeen = now // TODO? uncomment to prevent flapping?
2018-01-05 01:37:51 +03:00
}
t . data . peers [ fromPort ] = sender
updateRoot := false
oldParent , isIn := t . data . peers [ t . parent ]
noParent := ! isIn
noLoop := func ( ) bool {
2018-06-07 22:13:31 +03:00
for idx := 0 ; idx < len ( msg . Hops ) - 1 ; idx ++ {
if msg . Hops [ idx ] . Next == t . core . sigPub {
2018-01-05 01:37:51 +03:00
return false
}
}
2018-06-07 22:13:31 +03:00
if sender . locator . root == t . core . sigPub {
2018-01-05 01:37:51 +03:00
return false
}
return true
} ( )
sTime := now . Sub ( sender . firstSeen )
pTime := oldParent . time . Sub ( oldParent . firstSeen ) + switch_timeout
// Really want to compare sLen/sTime and pLen/pTime
// Cross multiplied to avoid divide-by-zero
2018-06-07 22:13:31 +03:00
cost := len ( sender . locator . coords ) * int ( pTime . Seconds ( ) )
2018-01-05 01:37:51 +03:00
pCost := len ( t . data . locator . coords ) * int ( sTime . Seconds ( ) )
2018-06-07 22:13:31 +03:00
dropTstamp , isIn := t . drop [ sender . locator . root ]
2018-01-05 01:37:51 +03:00
// Here be dragons
switch {
case ! noLoop : // do nothing
2018-06-07 22:13:31 +03:00
case isIn && dropTstamp >= sender . locator . tstamp : // do nothing
case firstIsBetter ( & sender . locator . root , & t . data . locator . root ) :
2018-01-05 01:37:51 +03:00
updateRoot = true
2018-06-07 22:13:31 +03:00
case t . data . locator . root != sender . locator . root : // do nothing
case t . data . locator . tstamp > sender . locator . tstamp : // do nothing
2018-01-05 01:37:51 +03:00
case noParent :
updateRoot = true
case cost < pCost :
updateRoot = true
2018-02-18 08:57:24 +03:00
case sender . port != t . parent : // do nothing
2018-06-07 22:13:31 +03:00
case ! equiv ( & sender . locator , & t . data . locator ) :
2018-02-18 08:57:24 +03:00
updateRoot = true
case now . Sub ( t . time ) < switch_throttle : // do nothing
2018-06-07 22:13:31 +03:00
case sender . locator . tstamp > t . data . locator . tstamp :
2018-01-05 01:37:51 +03:00
updateRoot = true
}
if updateRoot {
2018-06-07 22:13:31 +03:00
if ! equiv ( & sender . locator , & t . data . locator ) {
2018-01-05 01:37:51 +03:00
doUpdate = true
t . data . seq ++
select {
case t . core . router . reset <- struct { } { } :
default :
}
2018-05-16 07:57:00 +03:00
//t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords)
2018-01-05 01:37:51 +03:00
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
}
2018-06-07 22:13:31 +03:00
if t . data . locator . tstamp != sender . locator . tstamp {
2018-01-05 01:37:51 +03:00
t . time = now
}
2018-06-07 22:13:31 +03:00
t . data . locator = sender . locator
2018-01-05 01:37:51 +03:00
t . parent = sender . port
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
2018-06-07 08:16:47 +03:00
t . core . peers . sendSwitchMsgs ( )
2018-01-05 01:37:51 +03:00
}
if doUpdate {
t . updater . Store ( & sync . Once { } )
}
return
2017-12-29 07:16:20 +03:00
}
func ( t * switchTable ) updateTable ( ) {
2018-01-05 01:37:51 +03:00
// WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table
// Instead of checking distance for every destination every time
// Array of structs, indexed by first coord that differs from self
// Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest
t . mutex . RLock ( )
defer t . mutex . RUnlock ( )
newTable := lookupTable {
self : t . data . locator . clone ( ) ,
2018-01-19 03:48:34 +03:00
elems : make ( [ ] tableElem , 0 , len ( t . data . peers ) ) ,
2018-01-05 01:37:51 +03:00
}
for _ , pinfo := range t . data . peers {
//if !pinfo.forward { continue }
2018-02-18 08:14:23 +03:00
if pinfo . locator . root != newTable . self . root {
continue
}
2018-01-05 01:37:51 +03:00
loc := pinfo . locator . clone ( )
loc . coords = loc . coords [ : len ( loc . coords ) - 1 ] // Remove the them->self link
2018-01-19 03:48:34 +03:00
newTable . elems = append ( newTable . elems , tableElem {
2018-01-05 01:37:51 +03:00
locator : loc ,
2018-05-17 01:48:53 +03:00
port : pinfo . port ,
2018-01-19 03:48:34 +03:00
} )
2018-01-05 01:37:51 +03:00
}
2018-05-27 21:37:35 +03:00
sort . SliceStable ( newTable . elems , func ( i , j int ) bool {
return t . data . peers [ newTable . elems [ i ] . port ] . firstSeen . Before ( t . data . peers [ newTable . elems [ j ] . port ] . firstSeen )
} )
2018-01-05 01:37:51 +03:00
t . table . Store ( newTable )
2017-12-29 07:16:20 +03:00
}
2018-06-08 04:29:22 +03:00
func ( t * switchTable ) lookup ( dest [ ] byte ) switchPort {
2018-01-05 01:37:51 +03:00
t . updater . Load ( ) . ( * sync . Once ) . Do ( t . updateTable )
table := t . table . Load ( ) . ( lookupTable )
2018-06-08 04:29:22 +03:00
myDist := table . self . dist ( dest )
2018-06-08 04:18:13 +03:00
if myDist == 0 {
2018-06-08 04:29:22 +03:00
return 0
2018-01-05 01:37:51 +03:00
}
2018-05-27 21:37:35 +03:00
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
ports := t . core . peers . getPorts ( )
var best switchPort
bestCost := int64 ( ^ uint64 ( 0 ) >> 1 )
2018-01-19 03:48:34 +03:00
for _ , info := range table . elems {
2018-06-08 04:29:22 +03:00
dist := info . locator . dist ( dest )
2018-01-05 01:37:51 +03:00
if ! ( dist < myDist ) {
continue
}
2018-05-27 21:37:35 +03:00
p , isIn := ports [ info . port ]
if ! isIn {
continue
}
cost := int64 ( dist ) + p . getQueueSize ( )
if cost < bestCost {
2018-01-19 03:48:34 +03:00
best = info . port
2018-05-27 21:37:35 +03:00
bestCost = cost
2018-01-05 01:37:51 +03:00
}
}
2018-06-08 00:49:51 +03:00
//t.core.log.Println("DEBUG: sending to", best, "cost", bestCost)
2018-06-08 04:29:22 +03:00
return best
2017-12-29 07:16:20 +03:00
}
////////////////////////////////////////////////////////////////////////////////
//Signature stuff
type sigInfo struct {
2018-01-05 01:37:51 +03:00
next sigPubKey
sig sigBytes
2017-12-29 07:16:20 +03:00
}
////////////////////////////////////////////////////////////////////////////////