From bd3eaefb72675473ca46a0c0b943ffc9ac2ffdee Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Aug 2019 22:55:17 -0500 Subject: [PATCH] more link migration --- src/yggdrasil/debug.go | 7 +- src/yggdrasil/link.go | 497 ++++++++++------------------------------ src/yggdrasil/peer.go | 12 +- src/yggdrasil/switch.go | 4 +- 4 files changed, 135 insertions(+), 385 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c6f98ca0..1cc7eae4 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -554,7 +554,8 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.start() goWorkers := func(source, dest *peer) { - source.linkOut = make(chan []byte, 1) + linkOut := make(chan []byte, 1) + source.linkOut = func(bs []byte) { linkOut <- bs } send := make(chan []byte, 1) source.out = func(bss [][]byte) { for _, bs := range bss { @@ -566,7 +567,7 @@ func DEBUG_simLinkPeers(p, q *peer) { var packets [][]byte for { select { - case packet := <-source.linkOut: + case packet := <-linkOut: packets = append(packets, packet) continue case packet := <-send: @@ -583,7 +584,7 @@ func DEBUG_simLinkPeers(p, q *peer) { continue } select { - case packet := <-source.linkOut: + case packet := <-linkOut: packets = append(packets, packet) case packet := <-send: packets = append(packets, packet) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 5489c015..b4ca38c4 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -46,23 +46,23 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO - info linkInfo - incoming bool - force bool - closed chan struct{} - reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch - writer linkWriter // Writes packets, notifies this linkInterface - phony.Inbox // Protects the below - sendTimer *time.Timer // Fires to signal that sending is blocked - stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen - recvTimer *time.Timer // Fires to send keep-alive traffic - closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - inSwitch bool // True if the switch is tracking this link - stalled bool // True if we haven't been receiving any response traffic + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + incoming bool + force bool + closed chan struct{} + reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch + writer linkWriter // Writes packets, notifies this linkInterface + phony.Inbox // Protects the below + sendTimer *time.Timer // Fires to signal that sending is blocked + stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen + recvTimer *time.Timer // Fires to send keep-alive traffic + closeTimer *time.Timer // Fires when the link has been idle so long we need to close it + inSwitch bool // True if the switch is tracking this link + stalled bool // True if we haven't been receiving any response traffic } func (l *link) init(c *Core) error { @@ -133,9 +133,9 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } - intf.writer.intf = &intf - intf.reader.intf = &intf - intf.reader.err = make(chan error) + intf.writer.intf = &intf + intf.reader.intf = &intf + intf.reader.err = make(chan error) return &intf, nil } @@ -199,7 +199,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Lock() delete(intf.link.interfaces, intf.info) intf.link.mutex.Unlock() - //close(intf.closed) + close(intf.closed) }() intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) } @@ -214,427 +214,184 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.link.core.peers.removePeer(intf.peer.port) }() - // Finish setting up the peer struct - /* - out := make(chan [][]byte, 1) - defer close(out) intf.peer.out = func(msgs [][]byte) { - defer func() { recover() }() - out <- msgs + intf.writer.sendFrom(intf.peer, msgs, false) + } + intf.peer.linkOut = func(bs []byte) { + intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) } - */ - intf.peer.out = func(msgs [][]byte) { - intf.writer.sendFrom(intf.peer, msgs, false) - } - intf.peer.linkOut = make(chan []byte, 1) - go func() { - // TODO fix this - for bs := range intf.peer.linkOut { - intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) - } - }() themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - // Start the link loop + // Start things go intf.peer.start() - // Start the writer - /* - signalReady := make(chan struct{}, 1) - signalSent := make(chan bool, 1) - sendAck := make(chan struct{}, 1) - sendBlocked := time.NewTimer(time.Second) - defer util.TimerStop(sendBlocked) - util.TimerStop(sendBlocked) - go func() { - defer close(signalReady) - defer close(signalSent) - interval := 4 * time.Second - tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp - defer util.TimerStop(tcpTimer) - send := func(bss [][]byte) { - sendBlocked.Reset(time.Second) - size, _ := intf.msgIO.writeMsgs(bss) - util.TimerStop(sendBlocked) - select { - case signalSent <- size > 0: - default: - } - } - for { - // First try to send any link protocol traffic - select { - case msg := <-intf.peer.linkOut: - send([][]byte{msg}) - continue - default: - } - // No protocol traffic to send, so reset the timer - util.TimerStop(tcpTimer) - tcpTimer.Reset(interval) - // Now block until something is ready or the timer triggers keepalive traffic - select { - case <-tcpTimer.C: - intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send([][]byte{nil}) - case <-sendAck: - intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send([][]byte{nil}) - case msg := <-intf.peer.linkOut: - send([][]byte{msg}) - case msgs, ok := <-out: - if !ok { - return - } - send(msgs) - for _, msg := range msgs { - util.PutBytes(msg) - } - select { - case signalReady <- struct{}{}: - default: - } - //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s", - // strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } - } - }() - //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - // Used to enable/disable activity in the switch - signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive - defer close(signalAlive) - ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved - go func() { - var isAlive bool - var isReady bool - var sendTimerRunning bool - var recvTimerRunning bool - recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?... - sendTime := time.Second - sendTimer := time.NewTimer(sendTime) - defer util.TimerStop(sendTimer) - recvTimer := time.NewTimer(recvTime) - defer util.TimerStop(recvTimer) - closeTimer := time.NewTimer(closeTime) - defer util.TimerStop(closeTimer) - for { - //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", - // strings.ToUpper(intf.info.linkType), themString, intf.info.local, - // isAlive, isReady, sendTimerRunning, recvTimerRunning) - select { - case gotMsg, ok := <-signalAlive: - if !ok { - return - } - util.TimerStop(closeTimer) - closeTimer.Reset(closeTime) - util.TimerStop(recvTimer) - recvTimerRunning = false - isAlive = true - if !isReady { - // (Re-)enable in the switch - intf.link.core.switchTable.RecvFrom(nil, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) - isReady = true - } - if gotMsg && !sendTimerRunning { - // We got a message - // Start a timer, if it expires then send a 0-sized ack to let them know we're alive - util.TimerStop(sendTimer) - sendTimer.Reset(sendTime) - sendTimerRunning = true - } - if !gotMsg { - intf.link.core.log.Tracef("Received ack from %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } - case sentMsg, ok := <-signalSent: - // Stop any running ack timer - if !ok { - return - } - util.TimerStop(sendTimer) - sendTimerRunning = false - if sentMsg && !recvTimerRunning { - // We sent a message - // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem - util.TimerStop(recvTimer) - recvTimer.Reset(recvTime) - recvTimerRunning = true - } - case _, ok := <-signalReady: - if !ok { - return - } - if !isAlive { - // Disable in the switch - isReady = false - } else { - // Keep enabled in the switch - intf.link.core.switchTable.RecvFrom(nil, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) - isReady = true - } - case <-sendBlocked.C: - // We blocked while trying to send something - isReady = false - intf.link.core.switchTable.blockPeer(intf.peer.port) - case <-sendTimer.C: - // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive - select { - case sendAck <- struct{}{}: - default: - } - case <-recvTimer.C: - // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding - isAlive = false - intf.link.core.switchTable.blockPeer(intf.peer.port) - case <-closeTimer.C: - // We haven't received anything in a really long time, so things have died at the switch level and then some... - // Just close the connection at this point... - select { - case ret <- errors.New("timeout"): - default: - } - intf.msgIO.close() - } - } - }() - // Run reader loop - var helper phony.Inbox - done := make(chan struct{}) - var helperFunc func() - helperFunc = func() { - // The helper reads in a loop and sends to the peer - // It loops by sending itself a message, which can be delayed by backpressure - // So if the peer is busy, backpressure will pause reading until the peer catches up - msg, err := intf.msgIO.readMsg() - if len(msg) > 0 { - // TODO rewrite this if the link becomes an actor - intf.peer.handlePacketFrom(&helper, msg) - } - if err != nil { - if err != io.EOF { - select { - case ret <- err: - default: - } - } - close(done) - return - } - select { - case signalAlive <- len(msg) > 0: - default: - } - // Now try to read again - helper.RecvFrom(nil, helperFunc) - } - - // Start the read loop - helper.RecvFrom(nil, helperFunc) - <-done // Wait for the helper to exit - //////////////////////////////////////////////////////////////////////////////// - // Remember to set `err` to something useful before returning - select { - case err = <-ret: + intf.reader.RecvFrom(nil, intf.reader._read) + // Wait for the reader to finish + err = <-intf.reader.err + if err != nil { intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) - default: - err = nil + } else { intf.link.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } - */ - // Start the reader - intf.reader.RecvFrom(nil, intf.reader._read) - // Wait for the reader to finish - err = <- intf.reader.err - if err != nil { - intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) - } else { - intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } return err } //////////////////////////////////////////////////////////////////////////////// - -/* - phony.Inbox // Protects the below - sendTimer *time.Timer // Fires to signal that sending is blocked - stallTimer *time.Time // Fires to signal that no incoming traffic (including keep-alive) has been seen - recvTimer *time.Timer // Fires to send keep-alive traffic - closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - inSwitch bool // True if the switch is tracking this link - stalled bool // True if we haven't been receiving any response traffic -*/ - const ( - sendBlockedTime = time.Second // How long to wait before deciding a send is blocked - keepAliveTime = 2*time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send - stallTime = 6*time.Second // How long to wait for response traffic before deciding the connection has stalled - closeTime = 2*switch_timeout // How long to wait before closing the link + sendBlockedTime = time.Second // How long to wait before deciding a send is blocked + keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send + stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled + closeTime = 2 * switch_timeout // How long to wait before closing the link ) // notify the intf that we're currently sending func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { - intf.RecvFrom(nil, func() { - if !isLinkTraffic && size > 0 { - intf.inSwitch = false - } - intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) - intf._cancelRecvTimer() - }) + intf.RecvFrom(nil, func() { + if !isLinkTraffic && size > 0 { + intf.inSwitch = false + } + intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) + intf._cancelRecvTimer() + }) } // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelRecvTimer() { - intf.RecvFrom(nil, func() { - if intf.recvTimer != nil { - intf.recvTimer.Stop() - intf.recvTimer = nil - } - }) + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + } + }) } // called by an AfterFunc if we appear to have timed out func (intf *linkInterface) notifyBlockedSend() { - intf.RecvFrom(nil, func() { - if intf.sendTimer != nil { - //As far as we know, we're still trying to send, and the timer fired. + intf.RecvFrom(nil, func() { + if intf.sendTimer != nil { + //As far as we know, we're still trying to send, and the timer fired. intf.link.core.switchTable.blockPeer(intf.peer.port) - } - }) + } + }) } // notify the intf that we've finished sending, returning the peer to the switch func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { - intf.RecvFrom(nil, func() { - intf.sendTimer.Stop() - intf.sendTimer = nil - if !isLinkTraffic { - intf._notifySwitch() - } - if size > 0 && intf.stallTimer == nil { - intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) - } - }) + intf.RecvFrom(nil, func() { + intf.sendTimer.Stop() + intf.sendTimer = nil + if !isLinkTraffic { + intf._notifySwitch() + } + if size > 0 && intf.stallTimer == nil { + intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) + } + }) } // Notify the switch that we're ready for more traffic, assuming we're not in a stalled state func (intf *linkInterface) _notifySwitch() { - if !intf.inSwitch && !intf.stalled { - intf.inSwitch = true - intf.link.core.switchTable.RecvFrom(intf, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) - } + if !intf.inSwitch && !intf.stalled { + intf.inSwitch = true + intf.link.core.switchTable.RecvFrom(intf, func() { + intf.link.core.switchTable._idleIn(intf.peer.port) + }) + } } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds func (intf *linkInterface) notifyStalled() { - intf.RecvFrom(nil, func() { - if intf.stallTimer != nil { - intf.stallTimer = nil - intf.stalled = true + intf.RecvFrom(nil, func() { + if intf.stallTimer != nil { + intf.stallTimer = nil + intf.stalled = true intf.link.core.switchTable.blockPeer(intf.peer.port) - } - }) + } + }) } - // reset the close timer func (intf *linkInterface) notifyReading(from phony.Actor) { - intf.RecvFrom(from, func() { - if intf.closeTimer != nil { - intf.closeTimer.Stop() - } - intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() }) - }) + intf.RecvFrom(from, func() { + if intf.closeTimer != nil { + intf.closeTimer.Stop() + } + intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() }) + }) } // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) { - intf.RecvFrom(from, func() { - intf.link.core.log.Printf("DEBUG notifyReadFrom: inSwitch %v, stalled %v\n", intf.inSwitch, intf.stalled) - if intf.stallTimer != nil { - intf.stallTimer.Stop() - intf.stallTimer = nil - } - intf.stalled = false - intf._notifySwitch() - if size > 0 && intf.recvTimer == nil { - intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) - } - }) + intf.RecvFrom(from, func() { + if intf.stallTimer != nil { + intf.stallTimer.Stop() + intf.stallTimer = nil + } + intf.stalled = false + intf._notifySwitch() + if size > 0 && intf.recvTimer == nil { + intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + } + }) } // We need to send keep-alive traffic now func (intf *linkInterface) notifyDoKeepAlive() { - intf.RecvFrom(nil, func() { - if intf.recvTimer != nil { - intf.recvTimer.Stop() - intf.recvTimer = nil - intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic - } - }) + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + } + }) } //////////////////////////////////////////////////////////////////////////////// type linkWriter struct { - phony.Inbox - intf *linkInterface + phony.Inbox + intf *linkInterface } func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { - w.RecvFrom(from, func() { - var size int - for _, bs := range bss { - size += len(bs) - } - w.intf.notifySending(size, isLinkTraffic) - w.intf.msgIO.writeMsgs(bss) - w.intf.notifySent(size, isLinkTraffic) - w.intf.link.core.log.Println("DEBUG: wrote something, size:", size, "isLinkTraffic:", isLinkTraffic) - }) + w.RecvFrom(from, func() { + var size int + for _, bs := range bss { + size += len(bs) + } + w.intf.notifySending(size, isLinkTraffic) + w.intf.msgIO.writeMsgs(bss) + w.intf.notifySent(size, isLinkTraffic) + }) } //////////////////////////////////////////////////////////////////////////////// type linkReader struct { - phony.Inbox - intf *linkInterface - err chan error + phony.Inbox + intf *linkInterface + err chan error } func (r *linkReader) _read() { - r.intf.notifyReading(r) - msg, err := r.intf.msgIO.readMsg() - r.intf.link.core.log.Println("DEBUG read something") - r.intf.notifyReadFrom(r, len(msg)) - if len(msg) > 0 { - r.intf.peer.handlePacketFrom(r, msg) + r.intf.notifyReading(r) + msg, err := r.intf.msgIO.readMsg() + r.intf.notifyReadFrom(r, len(msg)) + if len(msg) > 0 { + r.intf.peer.handlePacketFrom(r, msg) + } + if err != nil { + if err != io.EOF { + r.err <- err } - if err != nil { - if err != io.EOF { - r.err<-err - } - close(r.err) - return - } - // Now try to read again - r.RecvFrom(nil, r._read) + close(r.err) + return + } + // Now try to read again + r.RecvFrom(nil, r._read) } - diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 50fb03f8..4cf0068b 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -103,7 +103,7 @@ type peer struct { linkShared crypto.BoxSharedKey endpoint string firstSeen time.Time // To track uptime for getPeers - linkOut (chan []byte) // used for protocol traffic (to bypass queues) + linkOut func([]byte) // used for protocol traffic (bypasses the switch) dinfo *dhtInfo // used to keep the DHT working out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes done (chan struct{}) // closed to exit the linkLoop @@ -263,8 +263,6 @@ func (p *peer) _sendPackets(packets [][]byte) { p.out(packets) } -var peerLinkOutHelper phony.Inbox - // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) _sendLinkPacket(packet []byte) { @@ -280,13 +278,7 @@ func (p *peer) _sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - // TODO replace this with a message send if/when the link becomes an actor - peerLinkOutHelper.RecvFrom(nil, func() { - select { - case p.linkOut <- packet: - case <-p.done: - } - }) + p.linkOut(packet) } // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6d882252..b30e59d5 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -250,7 +250,7 @@ func (t *switchTable) cleanRoot() { t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} - t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor + t.core.peers.sendSwitchMsgs(t) } } @@ -517,7 +517,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep } t.data.locator = sender.locator t.parent = sender.port - t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor + t.core.peers.sendSwitchMsgs(t) } if doUpdate { t.updater.Store(&sync.Once{})