mirror of
https://github.com/yggdrasil-network/yggdrasil-go
synced 2024-11-10 07:20:39 +03:00
switch refactoring, setup for a better approximation of local backpressure
This commit is contained in:
parent
988f4ad265
commit
52a0027aea
@ -12,7 +12,6 @@ package yggdrasil
|
|||||||
// A little annoying to do with constant changes from backpressure
|
// A little annoying to do with constant changes from backpressure
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -139,7 +138,7 @@ type tableElem struct {
|
|||||||
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
|
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
|
||||||
type lookupTable struct {
|
type lookupTable struct {
|
||||||
self switchLocator
|
self switchLocator
|
||||||
elems []tableElem
|
elems map[switchPort]tableElem
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
|
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
|
||||||
@ -162,9 +161,10 @@ type switchTable struct {
|
|||||||
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
||||||
mutex sync.RWMutex // Lock for reads/writes of switchData
|
mutex sync.RWMutex // Lock for reads/writes of switchData
|
||||||
data switchData
|
data switchData
|
||||||
updater atomic.Value //*sync.Once
|
updater atomic.Value //*sync.Once
|
||||||
table atomic.Value //lookupTable
|
table atomic.Value //lookupTable
|
||||||
packetIn chan []byte // Incoming packets for the worker to handle
|
packetIn chan []byte // Incoming packets for the worker to handle
|
||||||
|
idleIn chan switchPort // Incoming idle notifications from peer links
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the switchTable struct.
|
// Initializes the switchTable struct.
|
||||||
@ -179,6 +179,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
|
|||||||
t.table.Store(lookupTable{})
|
t.table.Store(lookupTable{})
|
||||||
t.drop = make(map[sigPubKey]int64)
|
t.drop = make(map[sigPubKey]int64)
|
||||||
t.packetIn = make(chan []byte, 1024)
|
t.packetIn = make(chan []byte, 1024)
|
||||||
|
t.idleIn = make(chan switchPort, 1024)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safely gets a copy of this node's locator.
|
// Safely gets a copy of this node's locator.
|
||||||
@ -458,7 +459,7 @@ func (t *switchTable) updateTable() {
|
|||||||
defer t.mutex.RUnlock()
|
defer t.mutex.RUnlock()
|
||||||
newTable := lookupTable{
|
newTable := lookupTable{
|
||||||
self: t.data.locator.clone(),
|
self: t.data.locator.clone(),
|
||||||
elems: make([]tableElem, 0, len(t.data.peers)),
|
elems: make(map[switchPort]tableElem, len(t.data.peers)),
|
||||||
}
|
}
|
||||||
for _, pinfo := range t.data.peers {
|
for _, pinfo := range t.data.peers {
|
||||||
//if !pinfo.forward { continue }
|
//if !pinfo.forward { continue }
|
||||||
@ -467,17 +468,20 @@ func (t *switchTable) updateTable() {
|
|||||||
}
|
}
|
||||||
loc := pinfo.locator.clone()
|
loc := pinfo.locator.clone()
|
||||||
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
|
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
|
||||||
newTable.elems = append(newTable.elems, tableElem{
|
newTable.elems[pinfo.port] = tableElem{
|
||||||
locator: loc,
|
locator: loc,
|
||||||
port: pinfo.port,
|
port: pinfo.port,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
sort.SliceStable(newTable.elems, func(i, j int) bool {
|
|
||||||
return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
|
|
||||||
})
|
|
||||||
t.table.Store(newTable)
|
t.table.Store(newTable)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a copy of the atomically-updated table used for switch lookups
|
||||||
|
func (t *switchTable) getTable() lookupTable {
|
||||||
|
t.updater.Load().(*sync.Once).Do(t.updateTable)
|
||||||
|
return t.table.Load().(lookupTable)
|
||||||
|
}
|
||||||
|
|
||||||
// This does the switch layer lookups that decide how to route traffic.
|
// This does the switch layer lookups that decide how to route traffic.
|
||||||
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree.
|
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree.
|
||||||
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
|
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
|
||||||
@ -485,8 +489,7 @@ func (t *switchTable) updateTable() {
|
|||||||
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
|
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
|
||||||
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
|
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
|
||||||
func (t *switchTable) lookup(dest []byte) switchPort {
|
func (t *switchTable) lookup(dest []byte) switchPort {
|
||||||
t.updater.Load().(*sync.Once).Do(t.updateTable)
|
table := t.getTable()
|
||||||
table := t.table.Load().(lookupTable)
|
|
||||||
myDist := table.self.dist(dest)
|
myDist := table.self.dist(dest)
|
||||||
if myDist == 0 {
|
if myDist == 0 {
|
||||||
return 0
|
return 0
|
||||||
@ -520,7 +523,7 @@ func (t *switchTable) start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *switchTable) handleIn(packet []byte) {
|
func (t *switchTable) handleIn_old(packet []byte) {
|
||||||
// Get the coords, skipping the first byte (the pType)
|
// Get the coords, skipping the first byte (the pType)
|
||||||
_, pTypeLen := wire_decode_uint64(packet)
|
_, pTypeLen := wire_decode_uint64(packet)
|
||||||
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
|
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
|
||||||
@ -537,9 +540,89 @@ func (t *switchTable) handleIn(packet []byte) {
|
|||||||
to.sendPacket(packet)
|
to.sendPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The switch worker does routing lookups and sends packets to where they need to be
|
// Check if a packet should go to the self node
|
||||||
func (t *switchTable) doWorker() {
|
// This means there's no node closer to the destination than us
|
||||||
for packet := range t.packetIn {
|
// This is mainly used to identify packets addressed to us, or that hit a blackhole
|
||||||
t.handleIn(packet)
|
func (t *switchTable) selfIsClosest(dest []byte) bool {
|
||||||
|
table := t.getTable()
|
||||||
|
myDist := table.self.dist(dest)
|
||||||
|
if myDist == 0 {
|
||||||
|
// Skip the iteration step if it's impossible to be closer
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, info := range table.elems {
|
||||||
|
dist := info.locator.dist(dest)
|
||||||
|
if dist < myDist {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if the peer is closer to the destination than ourself
|
||||||
|
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
|
||||||
|
table := t.getTable()
|
||||||
|
if info, isIn := table.elems[port]; isIn {
|
||||||
|
theirDist := info.locator.dist(dest)
|
||||||
|
myDist := table.self.dist(dest)
|
||||||
|
return theirDist < myDist
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle an incoming packet
|
||||||
|
// Either send it to ourself, or to the first idle peer that's free
|
||||||
|
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
||||||
|
// Get the coords, skipping the first byte (the pType)
|
||||||
|
_, pTypeLen := wire_decode_uint64(packet)
|
||||||
|
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
|
||||||
|
if coordLen >= len(packet) {
|
||||||
|
util_putBytes(packet)
|
||||||
|
return true
|
||||||
|
} // No payload
|
||||||
|
ports := t.core.peers.getPorts()
|
||||||
|
if t.selfIsClosest(coords) {
|
||||||
|
ports[0].sendPacket(packet)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for port := range idle {
|
||||||
|
if to := ports[port]; to != nil {
|
||||||
|
if t.portIsCloser(coords, port) {
|
||||||
|
delete(idle, port)
|
||||||
|
to.sendPacket(packet)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Didn't find anyone idle to send it to
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// The switch worker does routing lookups and sends packets to where they need to be
|
||||||
|
func (t *switchTable) doWorker() {
|
||||||
|
var packets [][]byte // Should really be a linked list
|
||||||
|
idle := make(map[switchPort]struct{}) // this is to deduplicate things
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case packet := <-t.packetIn:
|
||||||
|
idle = make(map[switchPort]struct{})
|
||||||
|
for port := range t.getTable().elems {
|
||||||
|
idle[port] = struct{}{}
|
||||||
|
}
|
||||||
|
// TODO correcty fill idle, so the above can be removed
|
||||||
|
if !t.handleIn(packet, idle) {
|
||||||
|
// There's nobody free to take it now, so queue it
|
||||||
|
packets = append(packets, packet)
|
||||||
|
for len(packets) > 32 {
|
||||||
|
util_putBytes(packets[0])
|
||||||
|
packets = packets[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case port := <-t.idleIn:
|
||||||
|
// TODO the part that loops over packets and finds something to send
|
||||||
|
// Didn't find anything to send, so add this port to the idle list
|
||||||
|
idle[port] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user