diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 16e8d25d..da419369 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -247,24 +247,26 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { } if !known { go func() { - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - tun.RecvFrom(nil, func() { - // We've been given a connection so prepare the session wrapper - packets := tun.dials[*dstNodeID] - delete(tun.dials, *dstNodeID) - var tc *tunConn - var err error - if tc, err = tun._wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) - return - } - for _, packet := range packets { - tc.writeFrom(nil, packet) - } - }) - } + conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask) + tun.RecvFrom(nil, func() { + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + if err != nil { + return + } + // We've been given a connection so prepare the session wrapper + var tc *tunConn + if tc, err = tun._wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + return + } + for _, packet := range packets { + tc.writeFrom(nil, packet) + } + }) + return }() } } diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index b0c26867..efc0c81e 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -304,6 +304,8 @@ func (c *Conn) Close() (err error) { // Close the session, if it hasn't been closed already if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + } else { + c.session.doRemove() } } }) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c6f98ca0..1cc7eae4 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -554,7 +554,8 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.start() goWorkers := func(source, dest *peer) { - source.linkOut = make(chan []byte, 1) + linkOut := make(chan []byte, 1) + source.linkOut = func(bs []byte) { linkOut <- bs } send := make(chan []byte, 1) source.out = func(bss [][]byte) { for _, bs := range bss { @@ -566,7 +567,7 @@ func DEBUG_simLinkPeers(p, q *peer) { var packets [][]byte for { select { - case packet := <-source.linkOut: + case packet := <-linkOut: packets = append(packets, packet) continue case packet := <-send: @@ -583,7 +584,7 @@ func DEBUG_simLinkPeers(p, q *peer) { continue } select { - case packet := <-source.linkOut: + case packet := <-linkOut: packets = append(packets, packet) case packet := <-send: packets = append(packets, packet) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 1b48f391..8fbd0317 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -46,14 +46,23 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO - info linkInfo - incoming bool - force bool - closed chan struct{} + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + incoming bool + force bool + closed chan struct{} + reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch + writer linkWriter // Writes packets, notifies this linkInterface + phony.Inbox // Protects the below + sendTimer *time.Timer // Fires to signal that sending is blocked + stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen + recvTimer *time.Timer // Fires to send keep-alive traffic + closeTimer *time.Timer // Fires when the link has been idle so long we need to close it + inSwitch bool // True if the switch is tracking this link + stalled bool // True if we haven't been receiving any response traffic } func (l *link) init(c *Core) error { @@ -124,6 +133,9 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } + intf.writer.intf = &intf + intf.reader.intf = &intf + intf.reader.err = make(chan error) return &intf, nil } @@ -202,230 +214,188 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.link.core.peers.removePeer(intf.peer.port) }() - // Finish setting up the peer struct - out := make(chan [][]byte, 1) - defer close(out) intf.peer.out = func(msgs [][]byte) { - defer func() { recover() }() - out <- msgs + intf.writer.sendFrom(intf.peer, msgs, false) + } + intf.peer.linkOut = func(bs []byte) { + intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) } - intf.peer.linkOut = make(chan []byte, 1) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - // Start the link loop + // Start things go intf.peer.start() - // Start the writer - signalReady := make(chan struct{}, 1) - signalSent := make(chan bool, 1) - sendAck := make(chan struct{}, 1) - sendBlocked := time.NewTimer(time.Second) - defer util.TimerStop(sendBlocked) - util.TimerStop(sendBlocked) - go func() { - defer close(signalReady) - defer close(signalSent) - interval := 4 * time.Second - tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp - defer util.TimerStop(tcpTimer) - send := func(bss [][]byte) { - sendBlocked.Reset(time.Second) - size, _ := intf.msgIO.writeMsgs(bss) - util.TimerStop(sendBlocked) - select { - case signalSent <- size > 0: - default: - } - } - for { - // First try to send any link protocol traffic - select { - case msg := <-intf.peer.linkOut: - send([][]byte{msg}) - continue - default: - } - // No protocol traffic to send, so reset the timer - util.TimerStop(tcpTimer) - tcpTimer.Reset(interval) - // Now block until something is ready or the timer triggers keepalive traffic - select { - case <-tcpTimer.C: - intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send([][]byte{nil}) - case <-sendAck: - intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send([][]byte{nil}) - case msg := <-intf.peer.linkOut: - send([][]byte{msg}) - case msgs, ok := <-out: - if !ok { - return - } - send(msgs) - for _, msg := range msgs { - util.PutBytes(msg) - } - select { - case signalReady <- struct{}{}: - default: - } - //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s", - // strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } - } - }() - //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - // Used to enable/disable activity in the switch - signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive - defer close(signalAlive) - ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved - go func() { - var isAlive bool - var isReady bool - var sendTimerRunning bool - var recvTimerRunning bool - recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?... - sendTime := time.Second - sendTimer := time.NewTimer(sendTime) - defer util.TimerStop(sendTimer) - recvTimer := time.NewTimer(recvTime) - defer util.TimerStop(recvTimer) - closeTimer := time.NewTimer(closeTime) - defer util.TimerStop(closeTimer) - for { - //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", - // strings.ToUpper(intf.info.linkType), themString, intf.info.local, - // isAlive, isReady, sendTimerRunning, recvTimerRunning) - select { - case gotMsg, ok := <-signalAlive: - if !ok { - return - } - util.TimerStop(closeTimer) - closeTimer.Reset(closeTime) - util.TimerStop(recvTimer) - recvTimerRunning = false - isAlive = true - if !isReady { - // (Re-)enable in the switch - intf.link.core.switchTable.RecvFrom(nil, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) - isReady = true - } - if gotMsg && !sendTimerRunning { - // We got a message - // Start a timer, if it expires then send a 0-sized ack to let them know we're alive - util.TimerStop(sendTimer) - sendTimer.Reset(sendTime) - sendTimerRunning = true - } - if !gotMsg { - intf.link.core.log.Tracef("Received ack from %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } - case sentMsg, ok := <-signalSent: - // Stop any running ack timer - if !ok { - return - } - util.TimerStop(sendTimer) - sendTimerRunning = false - if sentMsg && !recvTimerRunning { - // We sent a message - // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem - util.TimerStop(recvTimer) - recvTimer.Reset(recvTime) - recvTimerRunning = true - } - case _, ok := <-signalReady: - if !ok { - return - } - if !isAlive { - // Disable in the switch - isReady = false - } else { - // Keep enabled in the switch - intf.link.core.switchTable.RecvFrom(nil, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) - isReady = true - } - case <-sendBlocked.C: - // We blocked while trying to send something - isReady = false - intf.link.core.switchTable.blockPeer(intf.peer.port) - case <-sendTimer.C: - // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive - select { - case sendAck <- struct{}{}: - default: - } - case <-recvTimer.C: - // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding - isAlive = false - intf.link.core.switchTable.blockPeer(intf.peer.port) - case <-closeTimer.C: - // We haven't received anything in a really long time, so things have died at the switch level and then some... - // Just close the connection at this point... - select { - case ret <- errors.New("timeout"): - default: - } - intf.msgIO.close() - } - } - }() - // Run reader loop - var helper phony.Inbox - done := make(chan struct{}) - var helperFunc func() - helperFunc = func() { - // The helper reads in a loop and sends to the peer - // It loops by sending itself a message, which can be delayed by backpressure - // So if the peer is busy, backpressure will pause reading until the peer catches up - msg, err := intf.msgIO.readMsg() - if len(msg) > 0 { - // TODO rewrite this if the link becomes an actor - intf.peer.handlePacketFrom(&helper, msg) - } - if err != nil { - if err != io.EOF { - select { - case ret <- err: - default: - } - } - close(done) - return - } - select { - case signalAlive <- len(msg) > 0: - default: - } - // Now try to read again - helper.RecvFrom(nil, helperFunc) - } - // Start the read loop - helper.RecvFrom(nil, helperFunc) - <-done // Wait for the helper to exit - //////////////////////////////////////////////////////////////////////////////// - // Remember to set `err` to something useful before returning - select { - case err = <-ret: + intf.reader.RecvFrom(nil, intf.reader._read) + // Wait for the reader to finish + err = <-intf.reader.err + if err != nil { intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) - default: - err = nil + } else { intf.link.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } return err } + +//////////////////////////////////////////////////////////////////////////////// + +const ( + sendBlockedTime = time.Second // How long to wait before deciding a send is blocked + keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send + stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled + closeTime = 2 * switch_timeout // How long to wait before closing the link +) + +// notify the intf that we're currently sending +func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { + intf.RecvFrom(nil, func() { + if !isLinkTraffic { + intf.inSwitch = false + } + intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) + intf._cancelRecvTimer() + }) +} + +// we just sent something, so cancel any pending timer to send keep-alive traffic +func (intf *linkInterface) _cancelRecvTimer() { + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + } + }) +} + +// called by an AfterFunc if we appear to have timed out +func (intf *linkInterface) notifyBlockedSend() { + intf.RecvFrom(nil, func() { + if intf.sendTimer != nil { + //As far as we know, we're still trying to send, and the timer fired. + intf.link.core.switchTable.blockPeer(intf.peer.port) + } + }) +} + +// notify the intf that we've finished sending, returning the peer to the switch +func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { + intf.RecvFrom(nil, func() { + intf.sendTimer.Stop() + intf.sendTimer = nil + if !isLinkTraffic { + intf._notifySwitch() + } + if size > 0 && intf.stallTimer == nil { + intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) + } + }) +} + +// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state +func (intf *linkInterface) _notifySwitch() { + if !intf.inSwitch && !intf.stalled { + intf.inSwitch = true + intf.link.core.switchTable.RecvFrom(intf, func() { + intf.link.core.switchTable._idleIn(intf.peer.port) + }) + } +} + +// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds +func (intf *linkInterface) notifyStalled() { + intf.RecvFrom(nil, func() { + if intf.stallTimer != nil { + intf.stallTimer = nil + intf.stalled = true + intf.link.core.switchTable.blockPeer(intf.peer.port) + } + }) +} + +// reset the close timer +func (intf *linkInterface) notifyReading(from phony.Actor) { + intf.RecvFrom(from, func() { + if intf.closeTimer != nil { + intf.closeTimer.Stop() + } + intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() }) + }) +} + +// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic +func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) { + intf.RecvFrom(from, func() { + if intf.stallTimer != nil { + intf.stallTimer.Stop() + intf.stallTimer = nil + } + intf.stalled = false + intf._notifySwitch() + if size > 0 && intf.recvTimer == nil { + intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + } + }) +} + +// We need to send keep-alive traffic now +func (intf *linkInterface) notifyDoKeepAlive() { + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + } + }) +} + +//////////////////////////////////////////////////////////////////////////////// + +type linkWriter struct { + phony.Inbox + intf *linkInterface +} + +func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { + w.RecvFrom(from, func() { + var size int + for _, bs := range bss { + size += len(bs) + } + w.intf.notifySending(size, isLinkTraffic) + w.intf.msgIO.writeMsgs(bss) + w.intf.notifySent(size, isLinkTraffic) + // Cleanup + for _, bs := range bss { + util.PutBytes(bs) + } + }) +} + +//////////////////////////////////////////////////////////////////////////////// + +type linkReader struct { + phony.Inbox + intf *linkInterface + err chan error +} + +func (r *linkReader) _read() { + r.intf.notifyReading(r) + msg, err := r.intf.msgIO.readMsg() + r.intf.notifyReadFrom(r, len(msg)) + if len(msg) > 0 { + r.intf.peer.handlePacketFrom(r, msg) + } + if err != nil { + if err != io.EOF { + r.err <- err + } + close(r.err) + return + } + // Now try to read again + r.RecvFrom(nil, r._read) +} diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 50fb03f8..4cf0068b 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -103,7 +103,7 @@ type peer struct { linkShared crypto.BoxSharedKey endpoint string firstSeen time.Time // To track uptime for getPeers - linkOut (chan []byte) // used for protocol traffic (to bypass queues) + linkOut func([]byte) // used for protocol traffic (bypasses the switch) dinfo *dhtInfo // used to keep the DHT working out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes done (chan struct{}) // closed to exit the linkLoop @@ -263,8 +263,6 @@ func (p *peer) _sendPackets(packets [][]byte) { p.out(packets) } -var peerLinkOutHelper phony.Inbox - // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) _sendLinkPacket(packet []byte) { @@ -280,13 +278,7 @@ func (p *peer) _sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - // TODO replace this with a message send if/when the link becomes an actor - peerLinkOutHelper.RecvFrom(nil, func() { - select { - case p.linkOut <- packet: - case <-p.done: - } - }) + p.linkOut(packet) } // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 94ee41e6..0fc7ec80 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -255,13 +255,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle - go func() { - // Run cleanup when the session is canceled - <-sinfo.cancel.Finished() - sinfo.sessions.router.doAdmin(func() { - sinfo.sessions.removeSession(&sinfo) - }) - }() return &sinfo } @@ -293,6 +286,12 @@ func (ss *sessions) cleanup() { ss.lastCleanup = time.Now() } +func (sinfo *sessionInfo) doRemove() { + sinfo.sessions.router.RecvFrom(nil, func() { + sinfo.sessions.removeSession(sinfo) + }) +} + // Closes a session, removing it from sessions maps. func (ss *sessions) removeSession(sinfo *sessionInfo) { if s := sinfo.sessions.sinfos[sinfo.myHandle]; s == sinfo {