more switch migration

This commit is contained in:
Arceliar 2019-08-24 15:22:46 -05:00
parent 555b4c18d4
commit 998c76fd8c
5 changed files with 42 additions and 38 deletions

View File

@ -191,7 +191,7 @@ func (c *Core) GetSwitchQueues() SwitchQueues {
Size: switchTable.queues.size, Size: switchTable.queues.size,
HighestCount: uint64(switchTable.queues.maxbufs), HighestCount: uint64(switchTable.queues.maxbufs),
HighestSize: switchTable.queues.maxsize, HighestSize: switchTable.queues.maxsize,
MaximumSize: switchTable.queueTotalMaxSize, MaximumSize: switchTable.queues.totalMaxSize,
} }
for k, v := range switchTable.queues.bufs { for k, v := range switchTable.queues.bufs {
nexthop := switchTable.bestPortForCoords([]byte(k)) nexthop := switchTable.bestPortForCoords([]byte(k))

View File

@ -174,7 +174,9 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState,
c.config.Mutex.RLock() c.config.Mutex.RLock()
if c.config.Current.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize { if c.config.Current.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize {
c.switchTable.queueTotalMaxSize = c.config.Current.SwitchOptions.MaxTotalQueueSize c.switchTable.doAdmin(func() {
c.switchTable.queues.totalMaxSize = c.config.Current.SwitchOptions.MaxTotalQueueSize
})
} }
c.config.Mutex.RUnlock() c.config.Mutex.RUnlock()

View File

@ -318,7 +318,9 @@ func (intf *linkInterface) handler() error {
isAlive = true isAlive = true
if !isReady { if !isReady {
// (Re-)enable in the switch // (Re-)enable in the switch
intf.link.core.switchTable.idleIn <- intf.peer.port intf.link.core.switchTable.EnqueueFrom(nil, func() {
intf.link.core.switchTable._idleIn(intf.peer.port)
})
isReady = true isReady = true
} }
if gotMsg && !sendTimerRunning { if gotMsg && !sendTimerRunning {
@ -355,7 +357,9 @@ func (intf *linkInterface) handler() error {
isReady = false isReady = false
} else { } else {
// Keep enabled in the switch // Keep enabled in the switch
intf.link.core.switchTable.idleIn <- intf.peer.port intf.link.core.switchTable.EnqueueFrom(nil, func() {
intf.link.core.switchTable._idleIn(intf.peer.port)
})
isReady = true isReady = true
} }
case <-sendBlocked.C: case <-sendBlocked.C:

View File

@ -242,7 +242,7 @@ func (p *peer) _handleTraffic(packet []byte, pTypeLen int) {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
return return
} }
p.core.switchTable.packetIn <- packet p.core.switchTable.packetInFrom(p, packet)
} }
func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) { func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) {

View File

@ -164,22 +164,19 @@ type switchData struct {
// All the information stored by the switch. // All the information stored by the switch.
type switchTable struct { type switchTable struct {
core *Core core *Core
reconfigure chan chan error reconfigure chan chan error
key crypto.SigPubKey // Our own key key crypto.SigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData // data switchData //
updater atomic.Value // *sync.Once updater atomic.Value // *sync.Once
table atomic.Value // lookupTable table atomic.Value // lookupTable
phony.Actor // Owns the below phony.Actor // Owns the below
packetIn chan []byte // Incoming packets for the worker to handle queues switch_buffers // Queues - not atomic so ONLY use through the actor
idleIn chan switchPort // Incoming idle notifications from peer links idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
queueTotalMaxSize uint64 // Maximum combined size of queues
idle map[switchPort]time.Time // idle peers
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -197,12 +194,11 @@ func (t *switchTable) init(core *Core) {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{}) t.table.Store(lookupTable{})
t.drop = make(map[crypto.SigPubKey]int64) t.drop = make(map[crypto.SigPubKey]int64)
t.packetIn = make(chan []byte, 1024) <-t.SyncExec(func() {
t.idleIn = make(chan switchPort, 1024) t.queues.totalMaxSize = SwitchQueueTotalMinSize
t.queueTotalMaxSize = SwitchQueueTotalMinSize t.queues.bufs = make(map[string]switch_buffer)
t.idle = make(map[switchPort]time.Time) t.idle = make(map[switchPort]time.Time)
t.queues.switchTable = t })
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
} }
// Safely gets a copy of this node's locator. // Safely gets a copy of this node's locator.
@ -727,12 +723,12 @@ type switch_buffer struct {
} }
type switch_buffers struct { type switch_buffers struct {
switchTable *switchTable totalMaxSize uint64
bufs map[string]switch_buffer // Buffers indexed by StreamID bufs map[string]switch_buffer // Buffers indexed by StreamID
size uint64 // Total size of all buffers, in bytes size uint64 // Total size of all buffers, in bytes
maxbufs int maxbufs int
maxsize uint64 maxsize uint64
closer []closerInfo // Scratch space closer []closerInfo // Scratch space
} }
func (b *switch_buffers) _cleanup(t *switchTable) { func (b *switch_buffers) _cleanup(t *switchTable) {
@ -749,7 +745,7 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
} }
} }
for b.size > b.switchTable.queueTotalMaxSize { for b.size > b.totalMaxSize {
// Drop a random queue // Drop a random queue
target := rand.Uint64() % b.size target := rand.Uint64() % b.size
var size uint64 // running total var size uint64 // running total
@ -828,6 +824,12 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
return false return false
} }
func (t *switchTable) packetInFrom(from phony.IActor, bytes []byte) {
t.EnqueueFrom(from, func() {
t._packetIn(bytes)
})
}
func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end) // Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t._handleIn(bytes, t.idle) { if !t._handleIn(bytes, t.idle) {
@ -868,10 +870,6 @@ func (t *switchTable) doWorker() {
for { for {
//t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs))
select { select {
case bytes := <-t.packetIn:
<-t.SyncExec(func() { t._packetIn(bytes) })
case port := <-t.idleIn:
<-t.SyncExec(func() { t._idleIn(port) })
case e := <-t.reconfigure: case e := <-t.reconfigure:
e <- nil e <- nil
} }