From f4e17b9a9f10bab9292cddc4bd20f146d2582fa2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 12:07:33 +0000 Subject: [PATCH] Properly handle multicast interfaces going up and down --- src/yggdrasil/link.go | 8 +++---- src/yggdrasil/multicast.go | 37 ++++++++++++++++++---------- src/yggdrasil/tcp.go | 49 ++++++++++++++++++++++---------------- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 07adbe84..a81b50d0 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -261,11 +261,11 @@ func (intf *linkInterface) handler() error { // Now block until something is ready or the timer triggers keepalive traffic select { case <-tcpTimer.C: - intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) send(nil) case <-sendAck: - intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) send(nil) case msg := <-intf.peer.linkOut: @@ -280,7 +280,7 @@ func (intf *linkInterface) handler() error { case signalReady <- struct{}{}: default: } - //intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", + //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s", // strings.ToUpper(intf.info.linkType), themString, intf.info.local) } } @@ -331,7 +331,7 @@ func (intf *linkInterface) handler() error { sendTimerRunning = true } if !gotMsg { - intf.link.core.log.Debugf("Received ack from %s: %s, source %s", + intf.link.core.log.Tracef("Received ack from %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } case sentMsg, ok := <-signalSent: diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 401f6783..d4a03ff3 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -64,13 +64,13 @@ func (m *multicast) start() error { return nil } -func (m *multicast) interfaces() []net.Interface { +func (m *multicast) interfaces() map[string]net.Interface { // Get interface expressions from config m.core.configMutex.RLock() exprs := m.core.config.MulticastInterfaces m.core.configMutex.RUnlock() // Ask the system for network interfaces - var interfaces []net.Interface + interfaces := make(map[string]net.Interface) allifaces, err := net.Interfaces() if err != nil { panic(err) @@ -97,7 +97,7 @@ func (m *multicast) interfaces() []net.Interface { } // Does the interface match the regular expression? Store it if so if e.MatchString(iface.Name) { - interfaces = append(interfaces, iface) + interfaces[iface.Name] = iface } } } @@ -114,7 +114,10 @@ func (m *multicast) announce() { panic(err) } for { - for _, iface := range m.interfaces() { + interfaces := m.interfaces() + // Now that we have a list of valid interfaces from the operating system, + // we can start checking if we can send multicasts on them + for _, iface := range interfaces { // Find interface addresses addrs, err := iface.Addrs() if err != nil { @@ -134,23 +137,24 @@ func (m *multicast) announce() { m.sock.JoinGroup(&iface, groupAddr) // Try and see if we already have a TCP listener for this interface var listener *tcpListener - if _, ok := m.listeners[iface.Name]; !ok { + if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil { // No listener was found - let's create one listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name) if l, err := m.core.link.tcp.listen(listenaddr); err == nil { + m.core.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed - listener = &tcpListener{ - listener: l, - stop: make(chan bool), - } - m.listeners[iface.Name] = listener + m.listeners[iface.Name] = l } } else { // An existing listener was found listener = m.listeners[iface.Name] } + // Make sure nothing above failed for some reason + if listener == nil { + continue + } // Get the listener details and construct the multicast beacon - lladdr := (*listener.listener).Addr().String() + lladdr := listener.listener.Addr().String() if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { destAddr.Zone = iface.Name msg := []byte(a.String()) @@ -160,7 +164,16 @@ func (m *multicast) announce() { } time.Sleep(time.Second) } - time.Sleep(time.Second) + // There might be interfaces that we configured listeners for but are no + // longer up - if that's the case then we should stop the listeners + for name, listener := range m.listeners { + if _, ok := interfaces[name]; !ok { + listener.stop <- true + delete(m.listeners, name) + m.core.log.Debugln("No longer multicasting on", name) + } + } + time.Sleep(time.Second * 5) } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 652f5abe..f46dc56c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -42,7 +42,7 @@ type tcp struct { } type tcpListener struct { - listener *net.Listener + listener net.Listener stop chan bool } @@ -63,8 +63,8 @@ func (t *tcp) getAddr() *net.TCPAddr { // doesn't have the ability to send more than one address in a packet either t.mutex.Lock() defer t.mutex.Unlock() - for _, listener := range t.listeners { - return (*listener.listener).Addr().(*net.TCPAddr) + for _, l := range t.listeners { + return l.listener.Addr().(*net.TCPAddr) } return nil } @@ -121,7 +121,7 @@ func (t *tcp) init(l *link) error { return nil } -func (t *tcp) listen(listenaddr string) (*net.Listener, error) { +func (t *tcp) listen(listenaddr string) (*tcpListener, error) { var err error ctx := context.Background() @@ -131,37 +131,40 @@ func (t *tcp) listen(listenaddr string) (*net.Listener, error) { listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { l := tcpListener{ - listener: &listener, - stop: make(chan bool, 1), + listener: listener, + stop: make(chan bool), } - t.mutex.Lock() - t.listeners[listenaddr[6:]] = &l - t.mutex.Unlock() - go t.listener(&l) - return &listener, nil + go t.listener(&l, listenaddr[6:]) + return &l, nil } return nil, err } // Runs the listener, which spawns off goroutines for incoming connections. -func (t *tcp) listener(listener *tcpListener) { - if listener == nil { +func (t *tcp) listener(l *tcpListener, listenaddr string) { + if l == nil { return } - reallistener := *listener.listener - reallistenaddr := reallistener.Addr().String() - stop := listener.stop - defer reallistener.Close() - t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) + // Track the listener so that we can find it again in future + t.mutex.Lock() + t.listeners[listenaddr] = l + t.mutex.Unlock() + // And here we go! accepted := make(chan bool) + defer l.listener.Close() + t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String()) for { var sock net.Conn var err error + // Listen in a separate goroutine, as that way it does not block us from + // receiving "stop" events go func() { - sock, err = reallistener.Accept() + sock, err = l.listener.Accept() accepted <- true }() + // Wait for either an accepted connection, or a message telling us to stop + // the TCP listener select { case <-accepted: if err != nil { @@ -169,8 +172,12 @@ func (t *tcp) listener(listener *tcpListener) { return } go t.handler(sock, true) - case <-stop: - t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) + case <-l.stop: + t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String()) + l.listener.Close() + t.mutex.Lock() + delete(t.listeners, listenaddr) + t.mutex.Unlock() return } }