diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a0ce5d87..dc61892f 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -62,7 +62,6 @@ type linkInterface struct { keepAliveTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - isIdle bool // True if the peer actor knows the link is idle isSending bool // True between a notifySending and a notifySent blocked bool // True if we've blocked the peer in the switch } @@ -279,7 +278,7 @@ func (intf *linkInterface) out(bss [][]byte) { // nil to prevent it from blocking if the link is somehow frozen // this is safe because another packet won't be sent until the link notifies // the peer that it's ready for one - intf.writer.sendFrom(nil, bss, false) + intf.writer.sendFrom(nil, bss) }) } @@ -290,7 +289,7 @@ func (intf *linkInterface) linkOut(bs []byte) { // additional packets until this one finishes, otherwise this could leak // memory if writing happens slower than link packets are generated... // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) + intf.writer.sendFrom(nil, [][]byte{bs}) }) } @@ -332,11 +331,8 @@ const ( ) // notify the intf that we're currently sending -func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { +func (intf *linkInterface) notifySending(size int) { intf.Act(&intf.writer, func() { - if !isLinkTraffic { - intf.isIdle = false - } intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() @@ -365,13 +361,18 @@ func (intf *linkInterface) notifyBlockedSend() { } // notify the intf that we've finished sending, returning the peer to the switch -func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { +func (intf *linkInterface) notifySent(size int) { intf.Act(&intf.writer, func() { - intf.sendTimer.Stop() - intf.sendTimer = nil - if !isLinkTraffic { - intf._notifyIdle() + if intf.sendTimer != nil { + intf.sendTimer.Stop() + intf.sendTimer = nil } + if intf.keepAliveTimer != nil { + // TODO? unset this when we start sending, not when we finish... + intf.keepAliveTimer.Stop() + intf.keepAliveTimer = nil + } + intf._notifyIdle() intf.isSending = false if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) @@ -381,10 +382,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { // Notify the peer that we're ready for more traffic func (intf *linkInterface) _notifyIdle() { - if !intf.isIdle { - intf.isIdle = true - intf.peer.Act(intf, intf.peer._handleIdle) - } + intf.peer.Act(intf, intf.peer._handleIdle) } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds @@ -416,8 +414,8 @@ func (intf *linkInterface) notifyRead(size int) { intf.stallTimer.Stop() intf.stallTimer = nil } - if size > 0 && intf.stallTimer == nil { - intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + if size > 0 && intf.keepAliveTimer == nil { + intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } if intf.blocked { intf.blocked = false @@ -432,7 +430,7 @@ func (intf *linkInterface) notifyDoKeepAlive() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil - intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic } }) } @@ -446,7 +444,7 @@ type linkWriter struct { closed bool } -func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { +func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) { w.Act(from, func() { if w.closed { return @@ -455,9 +453,9 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool for _, bs := range bss { size += len(bs) } - w.intf.notifySending(size, isLinkTraffic) + w.intf.notifySending(size) w.worker <- bss - w.intf.notifySent(size, isLinkTraffic) + w.intf.notifySent(size) }) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 339ea5a7..3cfc0b4f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -317,7 +317,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { if seq == p.seq { p.drop = true - p.max = p.queue.size + p.max = p.queue.size + streamMsgSize } }) }