work-in-progress on buffering overhaul

This commit is contained in:
Arceliar 2020-05-16 17:07:47 -05:00
parent dc128121e5
commit 052de98f12
5 changed files with 202 additions and 111 deletions

View File

@ -123,10 +123,10 @@ func (c *Core) GetPeers() []Peer {
var info Peer
phony.Block(p, func() {
info = Peer{
Endpoint: p.intf.name,
Endpoint: p.intf.name(),
BytesSent: p.bytesSent,
BytesRecvd: p.bytesRecvd,
Protocol: p.intf.info.linkType,
Protocol: p.intf.interfaceType(),
Port: uint64(port),
Uptime: time.Since(p.firstSeen),
}
@ -163,8 +163,8 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
BytesSent: peer.bytesSent,
BytesRecvd: peer.bytesRecvd,
Port: uint64(elem.port),
Protocol: peer.intf.info.linkType,
Endpoint: peer.intf.info.remote,
Protocol: peer.intf.interfaceType(),
Endpoint: peer.intf.remote(),
}
copy(info.PublicKey[:], peer.box[:])
})

View File

@ -47,7 +47,7 @@ type linkInterfaceMsgIO interface {
}
type linkInterface struct {
name string
lname string
link *link
peer *peer
msgIO linkInterfaceMsgIO
@ -125,7 +125,7 @@ func (l *link) listen(uri string) error {
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) {
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
intf := linkInterface{
name: name,
lname: name,
link: l,
msgIO: msgIO,
info: linkInfo{
@ -178,7 +178,7 @@ func (intf *linkInterface) handler() error {
}
base := version_getBaseMetadata()
if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer {
intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
intf.link.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
return errors.New("failed to connect: wrong version")
}
// Check if we're authorized to connect to this key / IP
@ -217,23 +217,9 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Unlock()
// Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
out := func(msgs [][]byte) {
// nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies
// the peer that it's ready for one
intf.writer.sendFrom(nil, msgs, false)
}
linkOut := func(bs []byte) {
// nil to prevent it from blocking if the link is somehow frozen
// FIXME this is hypothetically not safe, the peer shouldn't be sending
// additional packets until this one finishes, otherwise this could leak
// memory if writing happens slower than link packets are generated...
// that seems unlikely, so it's a lesser evil than deadlocking for now
intf.writer.sendFrom(nil, [][]byte{bs}, true)
}
phony.Block(&intf.link.core.peers, func() {
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut)
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf)
})
if intf.peer == nil {
return errors.New("failed to create peer")
@ -275,6 +261,58 @@ func (intf *linkInterface) handler() error {
////////////////////////////////////////////////////////////////////////////////
// linkInterface needs to match the peerInterface type needed by the peers
func (intf *linkInterface) out(bss [][]byte) {
intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies
// the peer that it's ready for one
intf.writer.sendFrom(nil, bss, false)
})
}
func (intf *linkInterface) linkOut(bs []byte) {
intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen
// FIXME this is hypothetically not safe, the peer shouldn't be sending
// additional packets until this one finishes, otherwise this could leak
// memory if writing happens slower than link packets are generated...
// that seems unlikely, so it's a lesser evil than deadlocking for now
intf.writer.sendFrom(nil, [][]byte{bs}, true)
})
}
func (intf *linkInterface) notifyQueued(seq uint64) {
// This is the part where we want non-nil 'from' fields
intf.Act(intf.peer, func() {
if !intf.isIdle {
intf.peer.dropFromQueue(intf, seq)
}
})
}
func (intf *linkInterface) close() {
intf.Act(nil, func() { intf.msgIO.close() })
}
func (intf *linkInterface) name() string {
return intf.lname
}
func (intf *linkInterface) local() string {
return intf.info.local
}
func (intf *linkInterface) remote() string {
return intf.info.remote
}
func (intf *linkInterface) interfaceType() string {
return intf.info.linkType
}
////////////////////////////////////////////////////////////////////////////////
const (
sendTime = 1 * time.Second // How long to wait before deciding a send is blocked
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send

View File

@ -4,9 +4,6 @@ import (
"time"
)
// TODO take max size from config
const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB
type pqStreamID string
type pqPacketInfo struct {
@ -25,46 +22,50 @@ type packetQueue struct {
size uint64
}
func (q *packetQueue) cleanup() {
for q.size > MAX_PACKET_QUEUE_SIZE {
// TODO? drop from a random stream
// odds proportional to size? bandwidth?
// always using the worst is exploitable -> flood 1 packet per random stream
// find the stream that's using the most bandwidth
now := time.Now()
var worst pqStreamID
for id := range q.streams {
// drop will remove a packet from the queue, returning it to the pool
// returns true if a packet was removed, false otherwise
func (q *packetQueue) drop() bool {
if q.size == 0 {
return false
}
// TODO? drop from a random stream
// odds proportional to size? bandwidth?
// always using the worst is exploitable -> flood 1 packet per random stream
// find the stream that's using the most bandwidth
now := time.Now()
var worst pqStreamID
for id := range q.streams {
worst = id
break // get a random ID to start
}
worstStream := q.streams[worst]
worstSize := float64(worstStream.size)
worstAge := now.Sub(worstStream.infos[0].time).Seconds()
for id, stream := range q.streams {
thisSize := float64(stream.size)
thisAge := now.Sub(stream.infos[0].time).Seconds()
// cross multiply to avoid division by zero issues
if worstSize*thisAge < thisSize*worstAge {
// worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth
worst = id
break // get a random ID to start
}
worstStream := q.streams[worst]
worstSize := float64(worstStream.size)
worstAge := now.Sub(worstStream.infos[0].time).Seconds()
for id, stream := range q.streams {
thisSize := float64(stream.size)
thisAge := now.Sub(stream.infos[0].time).Seconds()
// cross multiply to avoid division by zero issues
if worstSize*thisAge < thisSize*worstAge {
// worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth
worst = id
worstStream = stream
worstSize = thisSize
worstAge = thisAge
}
}
// Drop the oldest packet from the worst stream
packet := worstStream.infos[0].packet
worstStream.infos = worstStream.infos[1:]
worstStream.size -= uint64(len(packet))
q.size -= uint64(len(packet))
pool_putBytes(packet)
// save the modified stream to queues
if len(worstStream.infos) > 0 {
q.streams[worst] = worstStream
} else {
delete(q.streams, worst)
worstStream = stream
worstSize = thisSize
worstAge = thisAge
}
}
// Drop the oldest packet from the worst stream
packet := worstStream.infos[0].packet
worstStream.infos = worstStream.infos[1:]
worstStream.size -= uint64(len(packet))
q.size -= uint64(len(packet))
pool_putBytes(packet)
// save the modified stream to queues
if len(worstStream.infos) > 0 {
q.streams[worst] = worstStream
} else {
delete(q.streams, worst)
}
return true
}
func (q *packetQueue) push(packet []byte) {
@ -80,7 +81,6 @@ func (q *packetQueue) push(packet []byte) {
// save update to queues
q.streams[id] = stream
q.size += uint64(len(packet))
q.cleanup()
}
func (q *packetQueue) pop() ([]byte, bool) {

View File

@ -77,29 +77,38 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
return ps.core.config.Current.AllowedEncryptionPublicKeys
}
type peerInterface interface {
out([][]byte)
linkOut([]byte)
notifyQueued(uint64)
close()
// These next ones are only used by the API
name() string
local() string
remote() string
interfaceType() string
}
// Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct {
phony.Inbox
core *Core
intf *linkInterface
intf peerInterface
port switchPort
box crypto.BoxPubKey
sig crypto.SigPubKey
shared crypto.BoxSharedKey
linkShared crypto.BoxSharedKey
endpoint string
firstSeen time.Time // To track uptime for getPeers
linkOut func([]byte) // used for protocol traffic (bypasses the switch)
dinfo *dhtInfo // used to keep the DHT working
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
done (chan struct{}) // closed to exit the linkLoop
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
firstSeen time.Time // To track uptime for getPeers
dinfo *dhtInfo // used to keep the DHT working
// The below aren't actually useful internally, they're just gathered for getPeers statistics
bytesSent uint64
bytesRecvd uint64
ports map[switchPort]*peer
table *lookupTable
queue packetQueue
seq uint64 // this and idle are used to detect when to drop packets from queue
idle bool
}
@ -123,19 +132,15 @@ func (ps *peers) _updatePeers() {
}
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer {
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer {
now := time.Now()
p := peer{box: *box,
core: ps.core,
intf: intf,
sig: *sig,
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
linkShared: *linkShared,
firstSeen: now,
done: make(chan struct{}),
close: closer,
core: ps.core,
intf: intf,
out: out,
linkOut: linkOut,
}
oldPorts := ps.ports
newPorts := make(map[switchPort]*peer)
@ -172,10 +177,7 @@ func (ps *peers) _removePeer(p *peer) {
newPorts[k] = v
}
delete(newPorts, p.port)
if p.close != nil {
p.close()
}
close(p.done)
p.intf.close()
ps.ports = newPorts
ps._updatePeers()
}
@ -295,12 +297,26 @@ func (p *peer) _handleIdle() {
}
if len(packets) > 0 {
p.bytesSent += uint64(size)
p.out(packets)
p.intf.out(packets)
} else {
p.idle = true
}
}
func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
p.Act(from, func() {
switch {
case seq != p.seq:
case p.queue.drop():
p.intf.notifyQueued(p.seq)
}
if seq != p.seq {
return
}
})
}
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues.
func (p *peer) _sendLinkPacket(packet []byte) {
@ -316,7 +332,7 @@ func (p *peer) _sendLinkPacket(packet []byte) {
Payload: bs,
}
packet = linkPacket.encode()
p.linkOut(packet)
p.intf.linkOut(packet)
}
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.

View File

@ -45,6 +45,8 @@ type router struct {
nodeinfo nodeinfo
searches searches
sessions sessions
intf routerInterface
peer *peer
table *lookupTable // has a copy of our locator
}
@ -53,28 +55,17 @@ func (r *router) init(core *Core) {
r.core = core
r.addr = *address.AddrForNodeID(&r.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.dht.nodeID)
self := linkInterface{
name: "(self)",
info: linkInfo{
local: "(self)",
remote: "(self)",
linkType: "self",
},
}
var p *peer
peerOut := func(packets [][]byte) {
r.handlePackets(p, packets)
r.Act(p, func() {
// after the router handle the packets, notify the peer that it's ready for more
p.Act(r, p._handleIdle)
})
}
r.intf.router = r
phony.Block(&r.core.peers, func() {
// FIXME don't block here!
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil)
r.peer = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &r.intf)
})
p.Act(r, p._handleIdle)
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.peer.Act(r, r.peer._handleIdle)
r.out = func(bs []byte) {
r.intf.Act(r, func() {
r.peer.handlePacketFrom(&r.intf, bs)
})
}
r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock()
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)
@ -123,15 +114,6 @@ func (r *router) start() error {
return nil
}
// In practice, the switch will call this with 1 packet
func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
r.Act(from, func() {
for _, packet := range packets {
r._handlePacket(packet)
}
})
}
// Insert a peer info into the dht, TODO? make the dht a separate actor
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
r.Act(from, func() {
@ -275,3 +257,58 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
req.SendPermPub = *fromKey
r.nodeinfo.handleNodeInfo(r, &req)
}
////////////////////////////////////////////////////////////////////////////////
// routerInterface is a helper that implements peerInterface
type routerInterface struct {
phony.Inbox
router *router
busy bool
}
func (intf *routerInterface) out(bss [][]byte) {
intf.Act(intf.router.peer, func() {
intf.router.Act(intf, func() {
for _, bs := range bss {
intf.router._handlePacket(bs)
}
// we may block due to the above
// so we send a message to ourself, that we'd handle after unblocking
// that message tells us to tell the interface that we're finally idle again
intf.router.Act(nil, func() {
intf.Act(intf.router, intf._handleIdle)
})
intf.Act(intf.router, intf._handleBusy)
})
})
}
func (intf *routerInterface) _handleBusy() {
intf.busy = true
}
func (intf *routerInterface) _handleIdle() {
intf.busy = false
intf.router.peer.Act(intf, intf.router.peer._handleIdle)
}
func (intf *routerInterface) linkOut(_ []byte) {}
func (intf *routerInterface) notifyQueued(seq uint64) {
intf.Act(intf.router.peer, func() {
if intf.busy {
intf.router.peer.dropFromQueue(intf, seq)
}
})
}
func (intf *routerInterface) close() {}
func (intf *routerInterface) name() string { return "(self)" }
func (intf *routerInterface) local() string { return "(self)" }
func (intf *routerInterface) remote() string { return "(self)" }
func (intf *routerInterface) interfaceType() string { return "self" }