Properly handle multicast interfaces going up and down

This commit is contained in:
Neil Alexander 2019-03-06 12:07:33 +00:00
parent cc0c725e63
commit f4e17b9a9f
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 57 additions and 37 deletions

View File

@ -261,11 +261,11 @@ func (intf *linkInterface) handler() error {
// Now block until something is ready or the timer triggers keepalive traffic // Now block until something is ready or the timer triggers keepalive traffic
select { select {
case <-tcpTimer.C: case <-tcpTimer.C:
intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send(nil)
case <-sendAck: case <-sendAck:
intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send(nil)
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
@ -280,7 +280,7 @@ func (intf *linkInterface) handler() error {
case signalReady <- struct{}{}: case signalReady <- struct{}{}:
default: default:
} }
//intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local) // strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
} }
@ -331,7 +331,7 @@ func (intf *linkInterface) handler() error {
sendTimerRunning = true sendTimerRunning = true
} }
if !gotMsg { if !gotMsg {
intf.link.core.log.Debugf("Received ack from %s: %s, source %s", intf.link.core.log.Tracef("Received ack from %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
case sentMsg, ok := <-signalSent: case sentMsg, ok := <-signalSent:

View File

@ -64,13 +64,13 @@ func (m *multicast) start() error {
return nil return nil
} }
func (m *multicast) interfaces() []net.Interface { func (m *multicast) interfaces() map[string]net.Interface {
// Get interface expressions from config // Get interface expressions from config
m.core.configMutex.RLock() m.core.configMutex.RLock()
exprs := m.core.config.MulticastInterfaces exprs := m.core.config.MulticastInterfaces
m.core.configMutex.RUnlock() m.core.configMutex.RUnlock()
// Ask the system for network interfaces // Ask the system for network interfaces
var interfaces []net.Interface interfaces := make(map[string]net.Interface)
allifaces, err := net.Interfaces() allifaces, err := net.Interfaces()
if err != nil { if err != nil {
panic(err) panic(err)
@ -97,7 +97,7 @@ func (m *multicast) interfaces() []net.Interface {
} }
// Does the interface match the regular expression? Store it if so // Does the interface match the regular expression? Store it if so
if e.MatchString(iface.Name) { if e.MatchString(iface.Name) {
interfaces = append(interfaces, iface) interfaces[iface.Name] = iface
} }
} }
} }
@ -114,7 +114,10 @@ func (m *multicast) announce() {
panic(err) panic(err)
} }
for { for {
for _, iface := range m.interfaces() { interfaces := m.interfaces()
// Now that we have a list of valid interfaces from the operating system,
// we can start checking if we can send multicasts on them
for _, iface := range interfaces {
// Find interface addresses // Find interface addresses
addrs, err := iface.Addrs() addrs, err := iface.Addrs()
if err != nil { if err != nil {
@ -134,23 +137,24 @@ func (m *multicast) announce() {
m.sock.JoinGroup(&iface, groupAddr) m.sock.JoinGroup(&iface, groupAddr)
// Try and see if we already have a TCP listener for this interface // Try and see if we already have a TCP listener for this interface
var listener *tcpListener var listener *tcpListener
if _, ok := m.listeners[iface.Name]; !ok { if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil {
// No listener was found - let's create one // No listener was found - let's create one
listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name) listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name)
if l, err := m.core.link.tcp.listen(listenaddr); err == nil { if l, err := m.core.link.tcp.listen(listenaddr); err == nil {
m.core.log.Debugln("Started multicasting on", iface.Name)
// Store the listener so that we can stop it later if needed // Store the listener so that we can stop it later if needed
listener = &tcpListener{ m.listeners[iface.Name] = l
listener: l,
stop: make(chan bool),
}
m.listeners[iface.Name] = listener
} }
} else { } else {
// An existing listener was found // An existing listener was found
listener = m.listeners[iface.Name] listener = m.listeners[iface.Name]
} }
// Make sure nothing above failed for some reason
if listener == nil {
continue
}
// Get the listener details and construct the multicast beacon // Get the listener details and construct the multicast beacon
lladdr := (*listener.listener).Addr().String() lladdr := listener.listener.Addr().String()
if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil {
destAddr.Zone = iface.Name destAddr.Zone = iface.Name
msg := []byte(a.String()) msg := []byte(a.String())
@ -160,7 +164,16 @@ func (m *multicast) announce() {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
time.Sleep(time.Second) // There might be interfaces that we configured listeners for but are no
// longer up - if that's the case then we should stop the listeners
for name, listener := range m.listeners {
if _, ok := interfaces[name]; !ok {
listener.stop <- true
delete(m.listeners, name)
m.core.log.Debugln("No longer multicasting on", name)
}
}
time.Sleep(time.Second * 5)
} }
} }

View File

@ -42,7 +42,7 @@ type tcp struct {
} }
type tcpListener struct { type tcpListener struct {
listener *net.Listener listener net.Listener
stop chan bool stop chan bool
} }
@ -63,8 +63,8 @@ func (t *tcp) getAddr() *net.TCPAddr {
// doesn't have the ability to send more than one address in a packet either // doesn't have the ability to send more than one address in a packet either
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
for _, listener := range t.listeners { for _, l := range t.listeners {
return (*listener.listener).Addr().(*net.TCPAddr) return l.listener.Addr().(*net.TCPAddr)
} }
return nil return nil
} }
@ -121,7 +121,7 @@ func (t *tcp) init(l *link) error {
return nil return nil
} }
func (t *tcp) listen(listenaddr string) (*net.Listener, error) { func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
var err error var err error
ctx := context.Background() ctx := context.Background()
@ -131,37 +131,40 @@ func (t *tcp) listen(listenaddr string) (*net.Listener, error) {
listener, err := lc.Listen(ctx, "tcp", listenaddr) listener, err := lc.Listen(ctx, "tcp", listenaddr)
if err == nil { if err == nil {
l := tcpListener{ l := tcpListener{
listener: &listener, listener: listener,
stop: make(chan bool, 1), stop: make(chan bool),
} }
t.mutex.Lock() go t.listener(&l, listenaddr[6:])
t.listeners[listenaddr[6:]] = &l return &l, nil
t.mutex.Unlock()
go t.listener(&l)
return &listener, nil
} }
return nil, err return nil, err
} }
// Runs the listener, which spawns off goroutines for incoming connections. // Runs the listener, which spawns off goroutines for incoming connections.
func (t *tcp) listener(listener *tcpListener) { func (t *tcp) listener(l *tcpListener, listenaddr string) {
if listener == nil { if l == nil {
return return
} }
reallistener := *listener.listener // Track the listener so that we can find it again in future
reallistenaddr := reallistener.Addr().String() t.mutex.Lock()
stop := listener.stop t.listeners[listenaddr] = l
defer reallistener.Close() t.mutex.Unlock()
t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) // And here we go!
accepted := make(chan bool) accepted := make(chan bool)
defer l.listener.Close()
t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String())
for { for {
var sock net.Conn var sock net.Conn
var err error var err error
// Listen in a separate goroutine, as that way it does not block us from
// receiving "stop" events
go func() { go func() {
sock, err = reallistener.Accept() sock, err = l.listener.Accept()
accepted <- true accepted <- true
}() }()
// Wait for either an accepted connection, or a message telling us to stop
// the TCP listener
select { select {
case <-accepted: case <-accepted:
if err != nil { if err != nil {
@ -169,8 +172,12 @@ func (t *tcp) listener(listener *tcpListener) {
return return
} }
go t.handler(sock, true) go t.handler(sock, true)
case <-stop: case <-l.stop:
t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String())
l.listener.Close()
t.mutex.Lock()
delete(t.listeners, listenaddr)
t.mutex.Unlock()
return return
} }
} }