mirror of
https://github.com/yggdrasil-network/yggdrasil-go
synced 2024-11-10 15:30:34 +03:00
buffer packets moving from the switch to the router, allow them front drop if there's too many
This commit is contained in:
parent
371b5ca6a2
commit
06df791efc
@ -176,6 +176,7 @@ type switchTable struct {
|
|||||||
admin chan func() // Pass a lambda for the admin socket to query stuff
|
admin chan func() // Pass a lambda for the admin socket to query stuff
|
||||||
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
|
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
|
||||||
queueTotalMaxSize uint64 // Maximum combined size of queues
|
queueTotalMaxSize uint64 // Maximum combined size of queues
|
||||||
|
toRouter chan []byte // Packets to be sent to the router
|
||||||
}
|
}
|
||||||
|
|
||||||
// Minimum allowed total size of switch queues.
|
// Minimum allowed total size of switch queues.
|
||||||
@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) {
|
|||||||
t.idleIn = make(chan switchPort, 1024)
|
t.idleIn = make(chan switchPort, 1024)
|
||||||
t.admin = make(chan func())
|
t.admin = make(chan func())
|
||||||
t.queueTotalMaxSize = SwitchQueueTotalMinSize
|
t.queueTotalMaxSize = SwitchQueueTotalMinSize
|
||||||
|
t.toRouter = make(chan []byte, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safely gets a copy of this node's locator.
|
// Safely gets a copy of this node's locator.
|
||||||
@ -616,17 +618,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
|
|||||||
// Returns true if the packet has been handled somehow, false if it should be queued
|
// Returns true if the packet has been handled somehow, false if it should be queued
|
||||||
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
||||||
coords := switch_getPacketCoords(packet)
|
coords := switch_getPacketCoords(packet)
|
||||||
ports := t.core.peers.getPorts()
|
|
||||||
closer := t.getCloser(coords)
|
closer := t.getCloser(coords)
|
||||||
if len(closer) == 0 {
|
if len(closer) == 0 {
|
||||||
// TODO? call the router directly, and remove the whole concept of a self peer?
|
// TODO? call the router directly, and remove the whole concept of a self peer?
|
||||||
ports[0].sendPacket(packet)
|
t.toRouter <- packet
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
table := t.getTable()
|
table := t.getTable()
|
||||||
var best *peer
|
var best *peer
|
||||||
var bestDist int
|
var bestDist int
|
||||||
var bestCoordLen int
|
var bestCoordLen int
|
||||||
|
ports := t.core.peers.getPorts()
|
||||||
for port, dist := range closer {
|
for port, dist := range closer {
|
||||||
to := ports[port]
|
to := ports[port]
|
||||||
_, isIdle := idle[port]
|
_, isIdle := idle[port]
|
||||||
@ -775,6 +777,33 @@ func (t *switchTable) handleIdle(port switchPort) bool {
|
|||||||
|
|
||||||
// The switch worker does routing lookups and sends packets to where they need to be
|
// The switch worker does routing lookups and sends packets to where they need to be
|
||||||
func (t *switchTable) doWorker() {
|
func (t *switchTable) doWorker() {
|
||||||
|
sendingToRouter := make(chan []byte, 1)
|
||||||
|
go func() {
|
||||||
|
// Keep sending packets to the router
|
||||||
|
self := t.core.peers.getPorts()[0]
|
||||||
|
for bs := range sendingToRouter {
|
||||||
|
self.sendPacket(bs)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
|
||||||
|
var buf [][]byte
|
||||||
|
for {
|
||||||
|
buf = append(buf, <-t.toRouter)
|
||||||
|
for len(buf) > 0 {
|
||||||
|
select {
|
||||||
|
case bs := <-t.toRouter:
|
||||||
|
buf = append(buf, bs)
|
||||||
|
for len(buf) > 32 {
|
||||||
|
util.PutBytes(buf[0])
|
||||||
|
buf = buf[1:]
|
||||||
|
}
|
||||||
|
case sendingToRouter <- buf[0]:
|
||||||
|
buf = buf[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
t.queues.switchTable = t
|
t.queues.switchTable = t
|
||||||
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
|
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
|
||||||
idle := make(map[switchPort]struct{}) // this is to deduplicate things
|
idle := make(map[switchPort]struct{}) // this is to deduplicate things
|
||||||
|
Loading…
Reference in New Issue
Block a user