From 40204caab6c9a29a2d56ba9c18f8c75ce4850bed Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 14:03:31 +0100 Subject: [PATCH 01/19] Try to fix race condition in sessions.reset --- src/yggdrasil/session.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 8a6d16fc..778fa2a9 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -414,11 +414,10 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Resets all sessions to an uninitialized state. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. +// Only call this from the router actor. func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.Act(ss.router, func() { - sinfo.reset = true - }) + sinfo.reset = true } } From c3016e680c77e1dfb018d22422c7d6621f04505a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 14:05:18 +0100 Subject: [PATCH 02/19] Fix panic where slice goes out of bounds because iface.Read returns less than zero (which might happen when the TUN/TAP interface is closed) --- src/tuntap/iface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 92ba36ab..753a8d02 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -111,7 +111,7 @@ func (r *tunReader) _read() { recvd := util.ResizeBytes(util.GetBytes(), 65535+tun_ETHER_HEADER_LENGTH) // Wait for a packet to be delivered to us through the TUN/TAP adapter n, err := r.tun.iface.Read(recvd) - if n == 0 { + if n <= 0 { util.PutBytes(recvd) } else { r.tun.handlePacketFrom(r, recvd[:n], err) From e9bacda0b328a67871b3b66c9328778348b1a268 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 13:37:01 +0100 Subject: [PATCH 03/19] Catch a nil pointer when sending a session packet to a conn, this shouldn't happen but it's caused multiple crashes in conn.recvMsg --- src/yggdrasil/session.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 778fa2a9..0b3947e2 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -469,7 +469,14 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { callback := func() { util.PutBytes(p.Payload) if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { - // Either we failed to decrypt, or the session was updated, or we received this packet in the mean time + // Either we failed to decrypt, or the session was updated, or we + // received this packet in the mean time + util.PutBytes(bs) + return + } + if sinfo.conn == nil { + // There's no connection associated with this session for some reason + // TODO: Figure out why this happens sometimes, it shouldn't util.PutBytes(bs) return } From 200b3623b21cbf05e1254a1ca122d301166e3e71 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 14:32:28 +0100 Subject: [PATCH 04/19] Fix #539 --- src/yggdrasil/tcp.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index cce352bd..5ac921c4 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -249,7 +249,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if err != nil { return } - t.handler(conn, false, dialerdst.String()) + t.handler(conn, false, saddr) } else { dst, err := net.ResolveTCPAddr("tcp", saddr) if err != nil { @@ -322,18 +322,19 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) { t.setExtraOptions(sock) stream := stream{} stream.init(sock) - local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) - remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() - var name string - var proto string + var name, proto, local, remote string if socksaddr, issocks := options.(string); issocks { - name = "socks://" + socksaddr + "/" + sock.RemoteAddr().String() + name = "socks://" + sock.RemoteAddr().String() + "/" + socksaddr proto = "socks" + local, _, _ = net.SplitHostPort(sock.LocalAddr().String()) + remote, _, _ = net.SplitHostPort(socksaddr) } else { name = "tcp://" + sock.RemoteAddr().String() proto = "tcp" + local, _, _ = net.SplitHostPort(sock.LocalAddr().String()) + remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) } + force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force) if err != nil { t.link.core.log.Println(err) From 27158d7b44993fea0ba2bff666c4ef8192aa99b1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 14:35:11 +0100 Subject: [PATCH 05/19] Fix #509 --- build | 2 -- 1 file changed, 2 deletions(-) diff --git a/build b/build index 7ca5f4fa..d11f991d 100755 --- a/build +++ b/build @@ -1,7 +1,5 @@ #!/bin/sh -set -ef - PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/version} PKGNAME=${PKGNAME:-$(sh contrib/semver/name.sh)} PKGVER=${PKGVER:-$(sh contrib/semver/version.sh --bare)} From a62e029e21eadf1b40fd0c0931727e97c6c5999b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 14:37:25 +0100 Subject: [PATCH 06/19] Update apt before trying to pull in RPM dependencies --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index f9cd0720..5778ffc9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -29,6 +29,7 @@ jobs: - run: name: Install RPM utilities command: | + sudo apt-get update sudo apt-get install -y rpm file mkdir -p ~/rpmbuild/BUILD ~/rpmbuild/RPMS ~/rpmbuild/SOURCES ~/rpmbuild/SPECS ~/rpmbuild/SRPMS From 846df4789a0f3e74c13a6f2c6cf302e761e46ad5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 15:01:19 +0100 Subject: [PATCH 07/19] Be more verbose when a peer or listener is badly formatted --- src/yggdrasil/core.go | 14 ++++++++++---- src/yggdrasil/link.go | 4 ++-- src/yggdrasil/tcp.go | 3 +++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 754d7d64..4dcd16fd 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -91,15 +91,21 @@ func (c *Core) _addPeerLoop() { // Add peers from the Peers section for _, peer := range current.Peers { - go c.AddPeer(peer, "") // TODO: this should be acted and not in a goroutine? - time.Sleep(time.Second) + go func() { + if err := c.AddPeer(peer, ""); err != nil { + c.log.Errorln("Failed to add peer:", err) + } + }() // TODO: this should be acted and not in a goroutine? } // Add peers from the InterfacePeers section for intf, intfpeers := range current.InterfacePeers { for _, peer := range intfpeers { - go c.AddPeer(peer, intf) // TODO: this should be acted and not in a goroutine? - time.Sleep(time.Second) + go func() { + if err := c.AddPeer(peer, intf); err != nil { + c.log.Errorln("Failed to add peer:", err) + } + }() // TODO: this should be acted and not in a goroutine? } } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 6e393514..bdf15547 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -86,7 +86,7 @@ func (l *link) reconfigure() { func (l *link) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { - return err + return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) } pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") switch u.Scheme { @@ -103,7 +103,7 @@ func (l *link) call(uri string, sintf string) error { func (l *link) listen(uri string) error { u, err := url.Parse(uri) if err != nil { - return err + return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err) } switch u.Scheme { case "tcp": diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 5ac921c4..93e39e40 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -85,6 +85,7 @@ func (t *tcp) init(l *link) error { defer t.link.core.config.Mutex.RUnlock() for _, listenaddr := range t.link.core.config.Current.Listen { if listenaddr[:6] != "tcp://" { + t.link.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") continue } if _, err := t.listen(listenaddr[6:]); err != nil { @@ -103,6 +104,7 @@ func (t *tcp) reconfigure() { if len(added) > 0 || len(deleted) > 0 { for _, a := range added { if a[:6] != "tcp://" { + t.link.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") continue } if _, err := t.listen(a[6:]); err != nil { @@ -113,6 +115,7 @@ func (t *tcp) reconfigure() { } for _, d := range deleted { if d[:6] != "tcp://" { + t.link.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") continue } t.mutex.Lock() From 00a972b74ed838c2d7b8f634ec316a39eabbac10 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 15:22:17 +0100 Subject: [PATCH 08/19] Disconnect peers when stopping, stop modules before core --- cmd/yggdrasil/main.go | 2 +- src/yggdrasil/core.go | 31 +++++++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index e9a21c13..3b7c9220 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -292,10 +292,10 @@ exit: } func (n *node) shutdown() { - n.core.Stop() n.admin.Stop() n.multicast.Stop() n.tuntap.Stop() + n.core.Stop() os.Exit(0) } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 4dcd16fd..2857de1d 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -21,16 +21,17 @@ type Core struct { // We're going to keep our own copy of the provided config - that way we can // guarantee that it will be covered by the mutex phony.Inbox - config config.NodeState // Config - boxPub crypto.BoxPubKey - boxPriv crypto.BoxPrivKey - sigPub crypto.SigPubKey - sigPriv crypto.SigPrivKey - switchTable switchTable - peers peers - router router - link link - log *log.Logger + config config.NodeState // Config + boxPub crypto.BoxPubKey + boxPriv crypto.BoxPrivKey + sigPub crypto.SigPubKey + sigPriv crypto.SigPrivKey + switchTable switchTable + peers peers + router router + link link + log *log.Logger + addPeerTimer *time.Timer } func (c *Core) _init() error { @@ -110,7 +111,7 @@ func (c *Core) _addPeerLoop() { } // Sit for a while - time.AfterFunc(time.Minute, func() { + c.addPeerTimer = time.AfterFunc(time.Minute, func() { c.Act(c, c._addPeerLoop) }) } @@ -177,7 +178,9 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState return nil, err } - c.Act(c, c._addPeerLoop) + c.addPeerTimer = time.AfterFunc(time.Second, func() { + c.Act(c, c._addPeerLoop) + }) c.log.Infoln("Startup complete") return &c.config, nil @@ -191,4 +194,8 @@ func (c *Core) Stop() { // This function is unsafe and should only be ran by the core actor. func (c *Core) _stop() { c.log.Infoln("Stopping...") + c.addPeerTimer.Stop() + for _, peer := range c.GetPeers() { + c.DisconnectPeer(peer.Port) + } } From 366fe7e772376a51b7937bc451f63cd7c6aa899a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 15:31:43 +0100 Subject: [PATCH 09/19] Allow multicast to be shut down more sanely --- src/multicast/multicast.go | 212 +++++++++++++++--------------- src/multicast/multicast_darwin.go | 19 +-- 2 files changed, 113 insertions(+), 118 deletions(-) diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 22ac9356..2102a4d9 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -19,14 +19,16 @@ import ( // configured multicast interface, Yggdrasil will attempt to peer with that node // automatically. type Multicast struct { - core *yggdrasil.Core - config *config.NodeState - log *log.Logger - sock *ipv6.PacketConn - groupAddr string - listeners map[string]*yggdrasil.TcpListener - listenPort uint16 - isOpen bool + core *yggdrasil.Core + config *config.NodeState + log *log.Logger + sock *ipv6.PacketConn + groupAddr string + listeners map[string]*yggdrasil.TcpListener + listenPort uint16 + isOpen bool + announcer *time.Timer + platformhandler *time.Timer } // Init prepares the multicast interface for use. @@ -63,9 +65,9 @@ func (m *Multicast) Start() error { } m.isOpen = true - go m.multicastStarted() go m.listen() - go m.announce() + m.multicastStarted() + m.announce() return nil } @@ -73,6 +75,8 @@ func (m *Multicast) Start() error { // Stop is not implemented for multicast yet. func (m *Multicast) Stop() error { m.isOpen = false + m.announcer.Stop() + m.platformhandler.Stop() m.sock.Close() return nil } @@ -136,108 +140,106 @@ func (m *Multicast) announce() { if err != nil { panic(err) } - for { - interfaces := m.Interfaces() - // There might be interfaces that we configured listeners for but are no - // longer up - if that's the case then we should stop the listeners - for name, listener := range m.listeners { - // Prepare our stop function! - stop := func() { - listener.Stop <- true - delete(m.listeners, name) - m.log.Debugln("No longer multicasting on", name) - } - // If the interface is no longer visible on the system then stop the - // listener, as another one will be started further down - if _, ok := interfaces[name]; !ok { - stop() - continue - } - // It's possible that the link-local listener address has changed so if - // that is the case then we should clean up the interface listener - found := false - listenaddr, err := net.ResolveTCPAddr("tcp6", listener.Listener.Addr().String()) - if err != nil { - stop() - continue - } - // Find the interface that matches the listener - if intf, err := net.InterfaceByName(name); err == nil { - if addrs, err := intf.Addrs(); err == nil { - // Loop through the addresses attached to that listener and see if any - // of them match the current address of the listener - for _, addr := range addrs { - if ip, _, err := net.ParseCIDR(addr.String()); err == nil { - // Does the interface address match our listener address? - if ip.Equal(listenaddr.IP) { - found = true - break - } + interfaces := m.Interfaces() + // There might be interfaces that we configured listeners for but are no + // longer up - if that's the case then we should stop the listeners + for name, listener := range m.listeners { + // Prepare our stop function! + stop := func() { + listener.Stop <- true + delete(m.listeners, name) + m.log.Debugln("No longer multicasting on", name) + } + // If the interface is no longer visible on the system then stop the + // listener, as another one will be started further down + if _, ok := interfaces[name]; !ok { + stop() + continue + } + // It's possible that the link-local listener address has changed so if + // that is the case then we should clean up the interface listener + found := false + listenaddr, err := net.ResolveTCPAddr("tcp6", listener.Listener.Addr().String()) + if err != nil { + stop() + continue + } + // Find the interface that matches the listener + if intf, err := net.InterfaceByName(name); err == nil { + if addrs, err := intf.Addrs(); err == nil { + // Loop through the addresses attached to that listener and see if any + // of them match the current address of the listener + for _, addr := range addrs { + if ip, _, err := net.ParseCIDR(addr.String()); err == nil { + // Does the interface address match our listener address? + if ip.Equal(listenaddr.IP) { + found = true + break } } } } - // If the address has not been found on the adapter then we should stop - // and clean up the TCP listener. A new one will be created below if a - // suitable link-local address is found - if !found { - stop() - } } - // Now that we have a list of valid interfaces from the operating system, - // we can start checking if we can send multicasts on them - for _, iface := range interfaces { - // Find interface addresses - addrs, err := iface.Addrs() - if err != nil { - panic(err) - } - for _, addr := range addrs { - addrIP, _, _ := net.ParseCIDR(addr.String()) - // Ignore IPv4 addresses - if addrIP.To4() != nil { - continue - } - // Ignore non-link-local addresses - if !addrIP.IsLinkLocalUnicast() { - continue - } - // Join the multicast group - m.sock.JoinGroup(&iface, groupAddr) - // Try and see if we already have a TCP listener for this interface - var listener *yggdrasil.TcpListener - if l, ok := m.listeners[iface.Name]; !ok || l.Listener == nil { - // No listener was found - let's create one - listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) - if li, err := m.core.ListenTCP(listenaddr); err == nil { - m.log.Debugln("Started multicasting on", iface.Name) - // Store the listener so that we can stop it later if needed - m.listeners[iface.Name] = li - listener = li - } else { - m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) - } - } else { - // An existing listener was found - listener = m.listeners[iface.Name] - } - // Make sure nothing above failed for some reason - if listener == nil { - continue - } - // Get the listener details and construct the multicast beacon - lladdr := listener.Listener.Addr().String() - if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { - a.Zone = "" - destAddr.Zone = iface.Name - msg := []byte(a.String()) - m.sock.WriteTo(msg, nil, destAddr) - } - break - } + // If the address has not been found on the adapter then we should stop + // and clean up the TCP listener. A new one will be created below if a + // suitable link-local address is found + if !found { + stop() } - time.Sleep(time.Second * 15) } + // Now that we have a list of valid interfaces from the operating system, + // we can start checking if we can send multicasts on them + for _, iface := range interfaces { + // Find interface addresses + addrs, err := iface.Addrs() + if err != nil { + panic(err) + } + for _, addr := range addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + // Ignore IPv4 addresses + if addrIP.To4() != nil { + continue + } + // Ignore non-link-local addresses + if !addrIP.IsLinkLocalUnicast() { + continue + } + // Join the multicast group + m.sock.JoinGroup(&iface, groupAddr) + // Try and see if we already have a TCP listener for this interface + var listener *yggdrasil.TcpListener + if l, ok := m.listeners[iface.Name]; !ok || l.Listener == nil { + // No listener was found - let's create one + listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) + if li, err := m.core.ListenTCP(listenaddr); err == nil { + m.log.Debugln("Started multicasting on", iface.Name) + // Store the listener so that we can stop it later if needed + m.listeners[iface.Name] = li + listener = li + } else { + m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) + } + } else { + // An existing listener was found + listener = m.listeners[iface.Name] + } + // Make sure nothing above failed for some reason + if listener == nil { + continue + } + // Get the listener details and construct the multicast beacon + lladdr := listener.Listener.Addr().String() + if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { + a.Zone = "" + destAddr.Zone = iface.Name + msg := []byte(a.String()) + m.sock.WriteTo(msg, nil, destAddr) + } + break + } + } + m.announcer = time.AfterFunc(time.Second*15, m.announce) } func (m *Multicast) listen() { diff --git a/src/multicast/multicast_darwin.go b/src/multicast/multicast_darwin.go index c88b4a81..6fdccb2b 100644 --- a/src/multicast/multicast_darwin.go +++ b/src/multicast/multicast_darwin.go @@ -32,21 +32,14 @@ import ( var awdlGoroutineStarted bool func (m *Multicast) multicastStarted() { - if awdlGoroutineStarted { - return - } - awdlGoroutineStarted = true - for { - C.StopAWDLBrowsing() - for intf := range m.Interfaces() { - if intf == "awdl0" { - m.log.Infoln("Multicast discovery is using AWDL discovery") - C.StartAWDLBrowsing() - break - } + C.StopAWDLBrowsing() + for intf := range m.Interfaces() { + if intf == "awdl0" { + C.StartAWDLBrowsing() + break } - time.Sleep(time.Minute) } + m.platformhandler = time.AfterFunc(time.Minute, m.multicastStarted) } func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { From c78a4cb28fa1fe572adb0770539633b02ee41674 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 15:34:26 +0100 Subject: [PATCH 10/19] Only stop timers if they are running --- src/multicast/multicast.go | 8 ++++++-- src/yggdrasil/core.go | 9 ++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 2102a4d9..08b7180e 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -75,8 +75,12 @@ func (m *Multicast) Start() error { // Stop is not implemented for multicast yet. func (m *Multicast) Stop() error { m.isOpen = false - m.announcer.Stop() - m.platformhandler.Stop() + if m.announcer != nil { + m.announcer.Stop() + } + if m.platformhandler != nil { + m.platformhandler.Stop() + } m.sock.Close() return nil } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 2857de1d..4cdcd8e4 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -110,7 +110,6 @@ func (c *Core) _addPeerLoop() { } } - // Sit for a while c.addPeerTimer = time.AfterFunc(time.Minute, func() { c.Act(c, c._addPeerLoop) }) @@ -178,9 +177,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState return nil, err } - c.addPeerTimer = time.AfterFunc(time.Second, func() { - c.Act(c, c._addPeerLoop) - }) + c.Act(c, c._addPeerLoop) c.log.Infoln("Startup complete") return &c.config, nil @@ -194,7 +191,9 @@ func (c *Core) Stop() { // This function is unsafe and should only be ran by the core actor. func (c *Core) _stop() { c.log.Infoln("Stopping...") - c.addPeerTimer.Stop() + if c.addPeerTimer != nil { + c.addPeerTimer.Stop() + } for _, peer := range c.GetPeers() { c.DisconnectPeer(peer.Port) } From b0df9e2f31020373b62d15600410672fe3a2a34c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 16:15:33 +0100 Subject: [PATCH 11/19] Fix race when adding peers --- src/yggdrasil/core.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 4cdcd8e4..598d8121 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -92,21 +92,21 @@ func (c *Core) _addPeerLoop() { // Add peers from the Peers section for _, peer := range current.Peers { - go func() { - if err := c.AddPeer(peer, ""); err != nil { + go func(peer, intf string) { + if err := c.AddPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } - }() // TODO: this should be acted and not in a goroutine? + }(peer, "") // TODO: this should be acted and not in a goroutine? } // Add peers from the InterfacePeers section for intf, intfpeers := range current.InterfacePeers { for _, peer := range intfpeers { - go func() { + go func(peer, intf string) { if err := c.AddPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } - }() // TODO: this should be acted and not in a goroutine? + }(peer, intf) // TODO: this should be acted and not in a goroutine? } } From b959f53fee55783a9b2c821efb08b67eca064fb3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 16:32:22 +0100 Subject: [PATCH 12/19] Shut down listeners when stopping --- src/yggdrasil/core.go | 2 ++ src/yggdrasil/link.go | 7 +++++++ src/yggdrasil/tcp.go | 7 +++++++ 3 files changed, 16 insertions(+) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 598d8121..cdc8ad44 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -194,7 +194,9 @@ func (c *Core) _stop() { if c.addPeerTimer != nil { c.addPeerTimer.Stop() } + c.link.stop() for _, peer := range c.GetPeers() { c.DisconnectPeer(peer.Port) } + c.log.Infoln("Stopped") } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index bdf15547..df73cc4c 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -134,6 +134,13 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st return &intf, nil } +func (l *link) stop() error { + if err := l.tcp.stop(); err != nil { + return err + } + return nil +} + func (intf *linkInterface) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later myLinkPub, myLinkPriv := crypto.NewBoxKeys() diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 93e39e40..a8c23623 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -96,6 +96,13 @@ func (t *tcp) init(l *link) error { return nil } +func (t *tcp) stop() error { + for _, listener := range t.listeners { + close(listener.Stop) + } + return nil +} + func (t *tcp) reconfigure() { t.link.core.config.Mutex.RLock() added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) From 2dc136f94a64fdce5a1c08c04c51ba66b24374f0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 18 Sep 2019 16:51:46 +0100 Subject: [PATCH 13/19] Multicast actor to prevent races --- src/multicast/multicast.go | 10 +++++++--- src/multicast/multicast_darwin.go | 4 +++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 08b7180e..63156d82 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -7,6 +7,7 @@ import ( "regexp" "time" + "github.com/Arceliar/phony" "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -19,6 +20,7 @@ import ( // configured multicast interface, Yggdrasil will attempt to peer with that node // automatically. type Multicast struct { + phony.Inbox core *yggdrasil.Core config *config.NodeState log *log.Logger @@ -66,8 +68,8 @@ func (m *Multicast) Start() error { m.isOpen = true go m.listen() - m.multicastStarted() - m.announce() + m.Act(m, m.multicastStarted) + m.Act(m, m.announce) return nil } @@ -243,7 +245,9 @@ func (m *Multicast) announce() { break } } - m.announcer = time.AfterFunc(time.Second*15, m.announce) + m.announcer = time.AfterFunc(time.Second*15, func() { + m.Act(m, m.announce) + }) } func (m *Multicast) listen() { diff --git a/src/multicast/multicast_darwin.go b/src/multicast/multicast_darwin.go index 6fdccb2b..4cfef9e9 100644 --- a/src/multicast/multicast_darwin.go +++ b/src/multicast/multicast_darwin.go @@ -39,7 +39,9 @@ func (m *Multicast) multicastStarted() { break } } - m.platformhandler = time.AfterFunc(time.Minute, m.multicastStarted) + m.platformhandler = time.AfterFunc(time.Minute, func() { + m.Act(m, m.multicastStarted) + }) } func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { From 2d64a6380ab26f91080267254d167ec2ba6074b5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 18 Sep 2019 18:33:51 -0500 Subject: [PATCH 14/19] misc other fixes --- src/yggdrasil/conn.go | 3 ++- src/yggdrasil/session.go | 13 +++++-------- src/yggdrasil/tcp.go | 10 ++++------ 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index cb28a6f2..9af5d240 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -102,11 +102,12 @@ func (c *Conn) search() error { if sinfo != nil { // Need to clean up to avoid a session leak sinfo.cancel.Cancel(nil) + sinfo.sessions.removeSession(sinfo) } default: if sinfo != nil { // Finish initializing the session - sinfo.conn = c + sinfo.setConn(nil, c) } c.session = sinfo err = e diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 0b3947e2..2f5e0af3 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -416,8 +416,11 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Only call this from the router actor. func (ss *sessions) reset() { - for _, sinfo := range ss.sinfos { - sinfo.reset = true + for _, _sinfo := range ss.sinfos { + sinfo := _sinfo // So we can safely put it in a closure + sinfo.Act(ss.router, func() { + sinfo.reset = true + }) } } @@ -474,12 +477,6 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { util.PutBytes(bs) return } - if sinfo.conn == nil { - // There's no connection associated with this session for some reason - // TODO: Figure out why this happens sometimes, it shouldn't - util.PutBytes(bs) - return - } sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a8c23623..ffda6b76 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -207,11 +207,12 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { } // Checks if we already are calling this address -func (t *tcp) isAlreadyCalling(saddr string) bool { +func (t *tcp) startCalling(saddr string) bool { t.mutex.Lock() defer t.mutex.Unlock() _, isIn := t.calls[saddr] - return isIn + t.calls[saddr] = struct{}{} + return !isIn } // Checks if a connection already exists. @@ -225,12 +226,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if t.isAlreadyCalling(callname) { + if !t.startCalling(callname) { return } - t.mutex.Lock() - t.calls[callname] = struct{}{} - t.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios time.Sleep(default_timeout) From 995d67cca8fda11457a28e22cd5380656935c16f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 18 Sep 2019 18:46:03 -0500 Subject: [PATCH 15/19] fix leak in _addPeerLoop --- src/yggdrasil/core.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index cdc8ad44..98a5c6e1 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -93,7 +93,7 @@ func (c *Core) _addPeerLoop() { // Add peers from the Peers section for _, peer := range current.Peers { go func(peer, intf string) { - if err := c.AddPeer(peer, intf); err != nil { + if err := c.CallPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } }(peer, "") // TODO: this should be acted and not in a goroutine? @@ -103,7 +103,7 @@ func (c *Core) _addPeerLoop() { for intf, intfpeers := range current.InterfacePeers { for _, peer := range intfpeers { go func(peer, intf string) { - if err := c.AddPeer(peer, intf); err != nil { + if err := c.CallPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } }(peer, intf) // TODO: this should be acted and not in a goroutine? @@ -111,7 +111,7 @@ func (c *Core) _addPeerLoop() { } c.addPeerTimer = time.AfterFunc(time.Minute, func() { - c.Act(c, c._addPeerLoop) + c.Act(nil, c._addPeerLoop) }) } From 39461cb60363d8d1c0b5139a4a54e2c9b3986439 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 19 Sep 2019 09:56:27 +0100 Subject: [PATCH 16/19] Don't os.Exit --- build | 2 ++ cmd/yggdrasil/main.go | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/build b/build index d11f991d..7ca5f4fa 100755 --- a/build +++ b/build @@ -1,5 +1,7 @@ #!/bin/sh +set -ef + PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/version} PKGNAME=${PKGNAME:-$(sh contrib/semver/name.sh)} PKGVER=${PKGVER:-$(sh contrib/semver/version.sh --bare)} diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 3b7c9220..33a8769d 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -296,7 +296,6 @@ func (n *node) shutdown() { n.multicast.Stop() n.tuntap.Stop() n.core.Stop() - os.Exit(0) } func (n *node) sessionFirewall(pubkey *crypto.BoxPubKey, initiator bool) bool { From 93e81867fdd986f2812d72923d28348bc7bf9e8a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 19 Sep 2019 19:15:59 -0500 Subject: [PATCH 17/19] have link.stop signal active links to close, have tcp.stop wait for all listeners and active connections to close --- src/yggdrasil/link.go | 14 ++++++++++++++ src/yggdrasil/tcp.go | 11 +++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index df73cc4c..a4a41e7f 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -25,6 +25,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface tcp tcp // TCP interface support + stopped chan struct{} // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -70,6 +71,7 @@ func (l *link) init(c *Core) error { l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() + l.stopped = make(chan struct{}) if err := l.tcp.init(l); err != nil { c.log.Errorln("Failed to start TCP interface") @@ -135,6 +137,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st } func (l *link) stop() error { + close(l.stopped) if err := l.tcp.stop(); err != nil { return err } @@ -231,7 +234,18 @@ func (intf *linkInterface) handler() error { go intf.peer.start() intf.reader.Act(nil, intf.reader._read) // Wait for the reader to finish + // TODO find a way to do this without keeping live goroutines around + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-intf.link.stopped: + intf.msgIO.close() + case <-done: + } + }() err = <-intf.reader.err + // TODO don't report an error if it's just a 'use of closed network connection' if err != nil { intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 8389ecc6..36d80589 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -34,6 +34,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { link *link + waitgroup sync.WaitGroup mutex sync.Mutex // Protecting the below listeners map[string]*TcpListener calls map[string]struct{} @@ -97,9 +98,12 @@ func (t *tcp) init(l *link) error { } func (t *tcp) stop() error { + t.mutex.Lock() for _, listener := range t.listeners { close(listener.Stop) } + t.mutex.Unlock() + t.waitgroup.Wait() return nil } @@ -150,6 +154,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { Listener: listener, Stop: make(chan bool), } + t.waitgroup.Add(1) go t.listener(&l, listenaddr) return &l, nil } @@ -159,6 +164,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { // Runs the listener, which spawns off goroutines for incoming connections. func (t *tcp) listener(l *TcpListener, listenaddr string) { + defer t.waitgroup.Done() if l == nil { return } @@ -199,8 +205,10 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { t.link.core.log.Errorln("Failed to accept connection:", err) return } + t.waitgroup.Add(1) go t.handler(sock, true, nil) case <-l.Stop: + // FIXME this races with the goroutine that Accepts a TCP connection, may leak connections when a listener is removed return } } @@ -257,6 +265,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if err != nil { return } + t.waitgroup.Add(1) t.handler(conn, false, saddr) } else { dst, err := net.ResolveTCPAddr("tcp", saddr) @@ -321,12 +330,14 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { t.link.core.log.Debugln("Failed to dial TCP:", err) return } + t.waitgroup.Add(1) t.handler(conn, false, nil) } }() } func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) { + defer t.waitgroup.Done() // Happens after sock.close defer sock.Close() t.setExtraOptions(sock) stream := stream{} From eeb34ce4e4f0cdab87b68d44d6caa524f30993d4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 19 Sep 2019 19:45:17 -0500 Subject: [PATCH 18/19] modify TcpListener --- src/multicast/multicast.go | 2 +- src/yggdrasil/tcp.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 63156d82..7f2f5915 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -152,7 +152,7 @@ func (m *Multicast) announce() { for name, listener := range m.listeners { // Prepare our stop function! stop := func() { - listener.Stop <- true + listener.Stop() delete(m.listeners, name) m.log.Debugln("No longer multicasting on", name) } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 36d80589..01185e54 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -47,7 +47,12 @@ type tcp struct { // multicast interfaces. type TcpListener struct { Listener net.Listener - Stop chan bool + stop chan struct{} +} + +func (l *TcpListener) Stop() { + defer func() { recover() }() + close(l.stop) } // Wrapper function to set additional options for specific connection types. @@ -100,7 +105,7 @@ func (t *tcp) init(l *link) error { func (t *tcp) stop() error { t.mutex.Lock() for _, listener := range t.listeners { - close(listener.Stop) + listener.Stop() } t.mutex.Unlock() t.waitgroup.Wait() @@ -132,7 +137,7 @@ func (t *tcp) reconfigure() { t.mutex.Lock() if listener, ok := t.listeners[d[6:]]; ok { t.mutex.Unlock() - listener.Stop <- true + listener.Stop() t.link.core.log.Infoln("Stopped TCP listener:", d[6:]) } else { t.mutex.Unlock() @@ -152,7 +157,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { if err == nil { l := TcpListener{ Listener: listener, - Stop: make(chan bool), + stop: make(chan struct{}), } t.waitgroup.Add(1) go t.listener(&l, listenaddr) @@ -207,7 +212,7 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { } t.waitgroup.Add(1) go t.handler(sock, true, nil) - case <-l.Stop: + case <-l.stop: // FIXME this races with the goroutine that Accepts a TCP connection, may leak connections when a listener is removed return } From f9163a56b64e90a7085afc20e9123027960b7042 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 19 Sep 2019 19:50:45 -0500 Subject: [PATCH 19/19] fix race between listener accepting and shutting down --- src/yggdrasil/tcp.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 01185e54..ed8f7b9b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -184,7 +184,6 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { t.mutex.Unlock() } // And here we go! - accepted := make(chan bool) defer func() { t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) l.Listener.Close() @@ -193,29 +192,19 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { t.mutex.Unlock() }() t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) + go func() { + <-l.stop + l.Listener.Close() + }() + defer l.Stop() for { - var sock net.Conn - var err error - // Listen in a separate goroutine, as that way it does not block us from - // receiving "stop" events - go func() { - sock, err = l.Listener.Accept() - accepted <- true - }() - // Wait for either an accepted connection, or a message telling us to stop - // the TCP listener - select { - case <-accepted: - if err != nil { - t.link.core.log.Errorln("Failed to accept connection:", err) - return - } - t.waitgroup.Add(1) - go t.handler(sock, true, nil) - case <-l.stop: - // FIXME this races with the goroutine that Accepts a TCP connection, may leak connections when a listener is removed + sock, err := l.Listener.Accept() + if err != nil { + t.link.core.log.Errorln("Failed to accept connection:", err) return } + t.waitgroup.Add(1) + go t.handler(sock, true, nil) } }