work in progress actorizing core.peers and replacing switch worker with per-peer switch-generated lookupTable

This commit is contained in:
Arceliar 2020-03-29 00:23:38 -05:00
parent 16309d2862
commit e926a3be6d
7 changed files with 221 additions and 186 deletions

View File

@ -12,9 +12,9 @@ import (
"net" "net"
"os" "os"
"github.com/cheggaaa/pb/v3"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/cheggaaa/pb/v3"
) )
var numHosts = flag.Int("hosts", 1, "number of host vars to generate") var numHosts = flag.Int("hosts", 1, "number of host vars to generate")
@ -30,7 +30,7 @@ type keySet struct {
func main() { func main() {
flag.Parse() flag.Parse()
bar := pb.StartNew(*keyTries * 2 + *numHosts) bar := pb.StartNew(*keyTries*2 + *numHosts)
if *numHosts > *keyTries { if *numHosts > *keyTries {
println("Can't generate less keys than hosts.") println("Can't generate less keys than hosts.")

View File

@ -110,7 +110,8 @@ type Session struct {
// there is exactly one entry then this node is not connected to any other nodes // there is exactly one entry then this node is not connected to any other nodes
// and is therefore isolated. // and is therefore isolated.
func (c *Core) GetPeers() []Peer { func (c *Core) GetPeers() []Peer {
ports := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() { ports = c.peers.ports })
var peers []Peer var peers []Peer
var ps []switchPort var ps []switchPort
for port := range ports { for port := range ports {
@ -143,10 +144,14 @@ func (c *Core) GetPeers() []Peer {
// isolated or not connected to any peers. // isolated or not connected to any peers.
func (c *Core) GetSwitchPeers() []SwitchPeer { func (c *Core) GetSwitchPeers() []SwitchPeer {
var switchpeers []SwitchPeer var switchpeers []SwitchPeer
table := c.switchTable.table.Load().(lookupTable) var table *lookupTable
peers := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() {
table = c.peers.table
ports = c.peers.ports
})
for _, elem := range table.elems { for _, elem := range table.elems {
peer, isIn := peers[elem.port] peer, isIn := ports[elem.port]
if !isIn { if !isIn {
continue continue
} }
@ -325,8 +330,8 @@ func (c *Core) EncryptionPublicKey() string {
// connected to any other nodes (effectively making you the root of a // connected to any other nodes (effectively making you the root of a
// single-node network). // single-node network).
func (c *Core) Coords() []uint64 { func (c *Core) Coords() []uint64 {
table := c.switchTable.table.Load().(lookupTable) loc := c.switchTable.getLocator()
return wire_coordsBytestoUint64s(table.self.getCoords()) return wire_coordsBytestoUint64s(loc.getCoords())
} }
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128 // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
@ -490,7 +495,11 @@ func (c *Core) CallPeer(addr string, sintf string) error {
// DisconnectPeer disconnects a peer once. This should be specified as a port // DisconnectPeer disconnects a peer once. This should be specified as a port
// number. // number.
func (c *Core) DisconnectPeer(port uint64) error { func (c *Core) DisconnectPeer(port uint64) error {
c.peers.removePeer(switchPort(port)) c.peers.Act(nil, func() {
if p, isIn := c.peers.ports[switchPort(port)]; isIn {
p.Act(&c.peers, p._removeSelf)
}
})
return nil return nil
} }

View File

@ -217,13 +217,16 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Unlock() intf.link.mutex.Unlock()
// Create peer // Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link) shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) phony.Block(&intf.link.core.peers, func() {
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() })
})
if intf.peer == nil { if intf.peer == nil {
return errors.New("failed to create peer") return errors.New("failed to create peer")
} }
defer func() { defer func() {
// More cleanup can go here // More cleanup can go here
intf.link.core.peers.removePeer(intf.peer.port) intf.peer.Act(nil, intf.peer._removeSelf)
}() }()
intf.peer.out = func(msgs [][]byte) { intf.peer.out = func(msgs [][]byte) {
intf.writer.sendFrom(intf.peer, msgs, false) intf.writer.sendFrom(intf.peer, msgs, false)

View File

@ -187,9 +187,9 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
} }
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable) loc := m.core.switchTable.getLocator()
nodeinfo := nodeinfoReqRes{ nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(), SendCoords: loc.getCoords(),
IsResponse: isResponse, IsResponse: isResponse,
NodeInfo: m._getNodeInfo(), NodeInfo: m._getNodeInfo(),
} }

View File

@ -6,8 +6,6 @@ package yggdrasil
import ( import (
"encoding/hex" "encoding/hex"
"sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
@ -21,17 +19,17 @@ import (
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. // In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
type peers struct { type peers struct {
phony.Inbox
core *Core core *Core
mutex sync.Mutex // Synchronize writes to atomic ports map[switchPort]*peer // use CoW semantics, share updated version with each peer
ports atomic.Value //map[switchPort]*peer, use CoW semantics table *lookupTable // Sent from switch, share updated version with each peer
} }
// Initializes the peers struct. // Initializes the peers struct.
func (ps *peers) init(c *Core) { func (ps *peers) init(c *Core) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c ps.core = c
ps.ports = make(map[switchPort]*peer)
ps.table = new(lookupTable)
} }
func (ps *peers) reconfigure() { func (ps *peers) reconfigure() {
@ -80,16 +78,6 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
return ps.core.config.Current.AllowedEncryptionPublicKeys return ps.core.config.Current.AllowedEncryptionPublicKeys
} }
// Atomically gets a map[switchPort]*peer of known peers.
func (ps *peers) getPorts() map[switchPort]*peer {
return ps.ports.Load().(map[switchPort]*peer)
}
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
// Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
phony.Inbox phony.Inbox
@ -110,10 +98,31 @@ type peer struct {
// The below aren't actually useful internally, they're just gathered for getPeers statistics // The below aren't actually useful internally, they're just gathered for getPeers statistics
bytesSent uint64 bytesSent uint64
bytesRecvd uint64 bytesRecvd uint64
ports map[switchPort]*peer
table *lookupTable
}
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
ps.Act(from, func() {
ps.table = table
ps._updatePeers()
})
}
func (ps *peers) _updatePeers() {
ports := ps.ports
table := ps.table
for _, peer := range ps.ports {
p := peer // peer is mutated during iteration
p.Act(ps, func() {
p.ports = ports
p.table = table
})
}
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
sig: *sig, sig: *sig,
@ -125,9 +134,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
core: ps.core, core: ps.core,
intf: intf, intf: intf,
} }
ps.mutex.Lock() oldPorts := ps.ports
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
@ -139,46 +146,49 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
break break
} }
} }
ps.putPorts(newPorts) ps.ports = newPorts
ps._updatePeers()
return &p return &p
} }
// Removes a peer for a given port, if one exists. func (p *peer) _removeSelf() {
func (ps *peers) removePeer(port switchPort) { p.core.peers.Act(p, func() {
if port == 0 { p.core.peers._removePeer(p)
return
} // Can't remove self peer
phony.Block(&ps.core.router, func() {
ps.core.switchTable.forgetPeer(port)
}) })
ps.mutex.Lock() }
oldPorts := ps.getPorts()
p, isIn := oldPorts[port] // Removes a peer for a given port, if one exists.
func (ps *peers) _removePeer(p *peer) {
if q := ps.ports[p.port]; p.port == 0 || q != p {
return
} // Can't remove self peer or nonexistant peer
ps.core.switchTable.forgetPeer(p.port)
oldPorts := ps.ports
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
} }
delete(newPorts, port) delete(newPorts, p.port)
ps.putPorts(newPorts) if p.close != nil {
ps.mutex.Unlock() p.close()
if isIn {
if p.close != nil {
p.close()
}
close(p.done)
} }
close(p.done)
ps.ports = newPorts
ps._updatePeers()
} }
// If called, sends a notification to each peer that they should send a new switch message. // If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update. // Mainly called by the switch after an update.
func (ps *peers) sendSwitchMsgs(from phony.Actor) { func (ps *peers) sendSwitchMsgs(from phony.Actor) {
ports := ps.getPorts() ps.Act(from, func() {
for _, p := range ports { for _, peer := range ps.ports {
if p.port == 0 { p := peer
continue if p.port == 0 {
continue
}
p.Act(ps, p._sendSwitchMsg)
} }
p.Act(from, p._sendSwitchMsg) })
}
} }
// This must be launched in a separate goroutine by whatever sets up the peer struct. // This must be launched in a separate goroutine by whatever sets up the peer struct.
@ -236,12 +246,16 @@ func (p *peer) _handlePacket(packet []byte) {
// Called to handle traffic or protocolTraffic packets. // Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
func (p *peer) _handleTraffic(packet []byte) { func (p *peer) _handleTraffic(packet []byte) {
table := p.core.switchTable.getTable() if _, isIn := p.table.elems[p.port]; !isIn && p.port != 0 {
if _, isIn := table.elems[p.port]; !isIn && p.port != 0 {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
return return
} }
p.core.switchTable.packetInFrom(p, packet) coords := switch_getPacketCoords(packet)
next := p.table.lookup(coords)
if nPeer, isIn := p.ports[next]; isIn {
nPeer.sendPacketsFrom(p, [][]byte{packet})
}
//p.core.switchTable.packetInFrom(p, packet)
} }
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
@ -259,6 +273,7 @@ func (p *peer) _sendPackets(packets [][]byte) {
size += len(packet) size += len(packet)
} }
p.bytesSent += uint64(size) p.bytesSent += uint64(size)
// FIXME need to manage queues here or else things can block!
p.out(packets) p.out(packets)
} }
@ -335,7 +350,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
return return
} }
if len(msg.Hops) < 1 { if len(msg.Hops) < 1 {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
var loc switchLocator var loc switchLocator
prevKey := msg.Root prevKey := msg.Root
@ -346,7 +362,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
loc.coords = append(loc.coords, hop.Port) loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg) bs := getBytesForSig(&hop.Next, &sigMsg)
if !crypto.Verify(&prevKey, bs, &hop.Sig) { if !crypto.Verify(&prevKey, bs, &hop.Sig) {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
prevKey = hop.Next prevKey = hop.Next
} }

View File

@ -61,7 +61,11 @@ func (r *router) init(core *Core) {
linkType: "self", linkType: "self",
}, },
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) var p *peer
phony.Block(&r.core.peers, func() {
// FIXME don't block here!
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
})
p.out = func(packets [][]byte) { r.handlePackets(p, packets) } p.out = func(packets [][]byte) { r.handlePackets(p, packets) }
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.nodeinfo.init(r.core) r.nodeinfo.init(r.core)

View File

@ -12,13 +12,12 @@ package yggdrasil
// A little annoying to do with constant changes from backpressure // A little annoying to do with constant changes from backpressure
import ( import (
"math/rand" //"math/rand"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util" //"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -172,8 +171,6 @@ type switchTable struct {
mutex sync.RWMutex // Lock for reads/writes of switchData mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData // data switchData //
updater atomic.Value // *sync.Once
table atomic.Value // lookupTable
phony.Inbox // Owns the below phony.Inbox // Owns the below
queues switch_buffers // Queues - not atomic so ONLY use through the actor queues switch_buffers // Queues - not atomic so ONLY use through the actor
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
@ -190,8 +187,6 @@ func (t *switchTable) init(core *Core) {
locator := switchLocator{root: t.key, tstamp: now.Unix()} locator := switchLocator{root: t.key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo) peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers} t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{})
t.drop = make(map[crypto.SigPubKey]int64) t.drop = make(map[crypto.SigPubKey]int64)
phony.Block(t, func() { phony.Block(t, func() {
core.config.Mutex.RLock() core.config.Mutex.RLock()
@ -204,6 +199,7 @@ func (t *switchTable) init(core *Core) {
t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.queues.bufs = make(map[switchPort]map[string]switch_buffer)
t.idle = make(map[switchPort]struct{}) t.idle = make(map[switchPort]struct{})
}) })
t.updateTable()
} }
func (t *switchTable) reconfigure() { func (t *switchTable) reconfigure() {
@ -254,7 +250,7 @@ func (t *switchTable) cleanRoot() {
t.time = now t.time = now
if t.data.locator.root != t.key { if t.data.locator.root != t.key {
t.data.seq++ t.data.seq++
t.updater.Store(&sync.Once{}) t.updateTable()
t.core.router.reset(nil) t.core.router.reset(nil)
} }
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
@ -292,7 +288,7 @@ func (t *switchTable) forgetPeer(port switchPort) {
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
delete(t.data.peers, port) delete(t.data.peers, port)
t.updater.Store(&sync.Once{}) defer t.updateTable()
if port != t.parent { if port != t.parent {
return return
} }
@ -528,7 +524,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
t.core.peers.sendSwitchMsgs(t) t.core.peers.sendSwitchMsgs(t)
} }
if true || doUpdate { if true || doUpdate {
t.updater.Store(&sync.Once{}) t.updateTable()
} }
return return
} }
@ -566,13 +562,7 @@ func (t *switchTable) updateTable() {
time: pinfo.time, time: pinfo.time,
} }
} }
t.table.Store(newTable) t.core.peers.updateTables(nil, &newTable) // TODO not be from nil
}
// Returns a copy of the atomically-updated table used for switch lookups
func (t *switchTable) getTable() lookupTable {
t.updater.Load().(*sync.Once).Do(t.updateTable)
return t.table.Load().(lookupTable)
} }
// Starts the switch worker // Starts the switch worker
@ -589,6 +579,7 @@ type closerInfo struct {
// Return a map of ports onto distance, keeping only ports closer to the destination than this node // Return a map of ports onto distance, keeping only ports closer to the destination than this node
// If the map is empty (or nil), then no peer is closer // If the map is empty (or nil), then no peer is closer
/*
func (t *switchTable) getCloser(dest []byte) []closerInfo { func (t *switchTable) getCloser(dest []byte) []closerInfo {
table := t.getTable() table := t.getTable()
myDist := table.self.dist(dest) myDist := table.self.dist(dest)
@ -605,8 +596,10 @@ func (t *switchTable) getCloser(dest []byte) []closerInfo {
} }
return closer return closer
} }
*/
// Returns true if the peer is closer to the destination than ourself // Returns true if the peer is closer to the destination than ourself
/*
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
table := t.getTable() table := t.getTable()
if info, isIn := table.elems[port]; isIn { if info, isIn := table.elems[port]; isIn {
@ -617,6 +610,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
return false return false
} }
} }
*/
// Get the coords of a packet without decoding // Get the coords of a packet without decoding
func switch_getPacketCoords(packet []byte) []byte { func switch_getPacketCoords(packet []byte) []byte {
@ -686,23 +680,26 @@ func (t *lookupTable) lookup(coords []byte) switchPort {
// Either send it to ourself, or to the first idle peer that's free // Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued // Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) {
coords := switch_getPacketCoords(packet) /*
table := t.getTable() coords := switch_getPacketCoords(packet)
port := table.lookup(coords) table := t.getTable()
ports := t.core.peers.getPorts() port := table.lookup(coords)
peer := ports[port] ports := t.core.peers.getPorts()
if peer == nil { peer := ports[port]
// FIXME hack, if the peer disappeared durring a race then don't buffer if peer == nil {
return true, 0 // FIXME hack, if the peer disappeared durring a race then don't buffer
} return true, 0
if _, isIdle := idle[port]; isIdle || port == 0 { }
// Either no closer peers, or the closest peer is idle if _, isIdle := idle[port]; isIdle || port == 0 {
delete(idle, port) // Either no closer peers, or the closest peer is idle
peer.sendPacketsFrom(t, [][]byte{packet}) delete(idle, port)
return true, port peer.sendPacketsFrom(t, [][]byte{packet})
} return true, port
// There's a closer peer, but it's not idle, so buffer it }
return false, port // There's a closer peer, but it's not idle, so buffer it
return false, port
*/
return true, 0
} }
// Info about a buffered packet // Info about a buffered packet
@ -726,52 +723,54 @@ type switch_buffers struct {
} }
func (b *switch_buffers) _cleanup(t *switchTable) { func (b *switch_buffers) _cleanup(t *switchTable) {
for port, pbufs := range b.bufs { /*
for streamID, buf := range pbufs {
// Remove queues for which we have no next hop
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets {
util.PutBytes(packet.bytes)
}
b.size -= buf.size
delete(pbufs, streamID)
}
}
if len(pbufs) == 0 {
delete(b.bufs, port)
}
}
for b.size > b.totalMaxSize {
// Drop a random queue
target := rand.Uint64() % b.size
var size uint64 // running total
for port, pbufs := range b.bufs { for port, pbufs := range b.bufs {
for streamID, buf := range pbufs { for streamID, buf := range pbufs {
size += buf.size // Remove queues for which we have no next hop
if size < target { packet := buf.packets[0]
continue coords := switch_getPacketCoords(packet.bytes)
} if len(t.getCloser(coords)) == 0 {
var packet switch_packetInfo for _, packet := range buf.packets {
packet, buf.packets = buf.packets[0], buf.packets[1:] util.PutBytes(packet.bytes)
buf.size -= uint64(len(packet.bytes))
b.size -= uint64(len(packet.bytes))
util.PutBytes(packet.bytes)
if len(buf.packets) == 0 {
delete(pbufs, streamID)
if len(pbufs) == 0 {
delete(b.bufs, port)
} }
} else { b.size -= buf.size
// Need to update the map, since buf was retrieved by value delete(pbufs, streamID)
pbufs[streamID] = buf
} }
break }
if len(pbufs) == 0 {
delete(b.bufs, port)
} }
} }
}
for b.size > b.totalMaxSize {
// Drop a random queue
target := rand.Uint64() % b.size
var size uint64 // running total
for port, pbufs := range b.bufs {
for streamID, buf := range pbufs {
size += buf.size
if size < target {
continue
}
var packet switch_packetInfo
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
b.size -= uint64(len(packet.bytes))
util.PutBytes(packet.bytes)
if len(buf.packets) == 0 {
delete(pbufs, streamID)
if len(pbufs) == 0 {
delete(b.bufs, port)
}
} else {
// Need to update the map, since buf was retrieved by value
pbufs[streamID] = buf
}
break
}
}
}
*/
} }
// Handles incoming idle notifications // Handles incoming idle notifications
@ -779,57 +778,60 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
// Returns true if the peer is no longer idle, false if it should be added to the idle list // Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) _handleIdle(port switchPort) bool { func (t *switchTable) _handleIdle(port switchPort) bool {
// TODO? only send packets for which this is the best next hop that isn't currently blocked sending // TODO? only send packets for which this is the best next hop that isn't currently blocked sending
to := t.core.peers.getPorts()[port] /*
if to == nil { to := t.core.peers.getPorts()[port]
return true if to == nil {
} return true
var packets [][]byte
var psize int
t.queues._cleanup(t)
now := time.Now()
pbufs := t.queues.bufs[port]
for psize < 65535 {
var best *string
var bestPriority float64
for streamID, buf := range pbufs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
packet := buf.packets[0]
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority >= bestPriority {
b := streamID // copy since streamID is mutated in the loop
best = &b
bestPriority = priority
}
} }
if best != nil { var packets [][]byte
buf := pbufs[*best] var psize int
var packet switch_packetInfo t.queues._cleanup(t)
// TODO decide if this should be LIFO or FIFO now := time.Now()
packet, buf.packets = buf.packets[0], buf.packets[1:] pbufs := t.queues.bufs[port]
buf.size -= uint64(len(packet.bytes)) for psize < 65535 {
t.queues.size -= uint64(len(packet.bytes)) var best *string
if len(buf.packets) == 0 { var bestPriority float64
delete(pbufs, *best) for streamID, buf := range pbufs {
if len(pbufs) == 0 { // Filter over the streams that this node is closer to
delete(t.queues.bufs, port) // Keep the one with the smallest queue
packet := buf.packets[0]
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority >= bestPriority {
b := streamID // copy since streamID is mutated in the loop
best = &b
bestPriority = priority
} }
} else {
// Need to update the map, since buf was retrieved by value
pbufs[*best] = buf
} }
packets = append(packets, packet.bytes) if best != nil {
psize += len(packet.bytes) buf := pbufs[*best]
} else { var packet switch_packetInfo
// Finished finding packets // TODO decide if this should be LIFO or FIFO
break packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 {
delete(pbufs, *best)
if len(pbufs) == 0 {
delete(t.queues.bufs, port)
}
} else {
// Need to update the map, since buf was retrieved by value
pbufs[*best] = buf
}
packets = append(packets, packet.bytes)
psize += len(packet.bytes)
} else {
// Finished finding packets
break
}
} }
} if len(packets) > 0 {
if len(packets) > 0 { to.sendPacketsFrom(t, packets)
to.sendPacketsFrom(t, packets) return true
return true }
} return false
*/
return false return false
} }