have link.stop signal active links to close, have tcp.stop wait for all listeners and active connections to close

This commit is contained in:
Arceliar 2019-09-19 19:15:59 -05:00
parent 39461cb603
commit 93e81867fd
2 changed files with 25 additions and 0 deletions

View File

@ -25,6 +25,7 @@ type link struct {
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects interfaces below
interfaces map[linkInfo]*linkInterface interfaces map[linkInfo]*linkInterface
tcp tcp // TCP interface support tcp tcp // TCP interface support
stopped chan struct{}
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
} }
@ -70,6 +71,7 @@ func (l *link) init(c *Core) error {
l.mutex.Lock() l.mutex.Lock()
l.interfaces = make(map[linkInfo]*linkInterface) l.interfaces = make(map[linkInfo]*linkInterface)
l.mutex.Unlock() l.mutex.Unlock()
l.stopped = make(chan struct{})
if err := l.tcp.init(l); err != nil { if err := l.tcp.init(l); err != nil {
c.log.Errorln("Failed to start TCP interface") c.log.Errorln("Failed to start TCP interface")
@ -135,6 +137,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
} }
func (l *link) stop() error { func (l *link) stop() error {
close(l.stopped)
if err := l.tcp.stop(); err != nil { if err := l.tcp.stop(); err != nil {
return err return err
} }
@ -231,7 +234,18 @@ func (intf *linkInterface) handler() error {
go intf.peer.start() go intf.peer.start()
intf.reader.Act(nil, intf.reader._read) intf.reader.Act(nil, intf.reader._read)
// Wait for the reader to finish // Wait for the reader to finish
// TODO find a way to do this without keeping live goroutines around
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-intf.link.stopped:
intf.msgIO.close()
case <-done:
}
}()
err = <-intf.reader.err err = <-intf.reader.err
// TODO don't report an error if it's just a 'use of closed network connection'
if err != nil { if err != nil {
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)

View File

@ -34,6 +34,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3)
// The TCP listener and information about active TCP connections, to avoid duplication. // The TCP listener and information about active TCP connections, to avoid duplication.
type tcp struct { type tcp struct {
link *link link *link
waitgroup sync.WaitGroup
mutex sync.Mutex // Protecting the below mutex sync.Mutex // Protecting the below
listeners map[string]*TcpListener listeners map[string]*TcpListener
calls map[string]struct{} calls map[string]struct{}
@ -97,9 +98,12 @@ func (t *tcp) init(l *link) error {
} }
func (t *tcp) stop() error { func (t *tcp) stop() error {
t.mutex.Lock()
for _, listener := range t.listeners { for _, listener := range t.listeners {
close(listener.Stop) close(listener.Stop)
} }
t.mutex.Unlock()
t.waitgroup.Wait()
return nil return nil
} }
@ -150,6 +154,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) {
Listener: listener, Listener: listener,
Stop: make(chan bool), Stop: make(chan bool),
} }
t.waitgroup.Add(1)
go t.listener(&l, listenaddr) go t.listener(&l, listenaddr)
return &l, nil return &l, nil
} }
@ -159,6 +164,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) {
// Runs the listener, which spawns off goroutines for incoming connections. // Runs the listener, which spawns off goroutines for incoming connections.
func (t *tcp) listener(l *TcpListener, listenaddr string) { func (t *tcp) listener(l *TcpListener, listenaddr string) {
defer t.waitgroup.Done()
if l == nil { if l == nil {
return return
} }
@ -199,8 +205,10 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
t.link.core.log.Errorln("Failed to accept connection:", err) t.link.core.log.Errorln("Failed to accept connection:", err)
return return
} }
t.waitgroup.Add(1)
go t.handler(sock, true, nil) go t.handler(sock, true, nil)
case <-l.Stop: case <-l.Stop:
// FIXME this races with the goroutine that Accepts a TCP connection, may leak connections when a listener is removed
return return
} }
} }
@ -257,6 +265,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) {
if err != nil { if err != nil {
return return
} }
t.waitgroup.Add(1)
t.handler(conn, false, saddr) t.handler(conn, false, saddr)
} else { } else {
dst, err := net.ResolveTCPAddr("tcp", saddr) dst, err := net.ResolveTCPAddr("tcp", saddr)
@ -321,12 +330,14 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) {
t.link.core.log.Debugln("Failed to dial TCP:", err) t.link.core.log.Debugln("Failed to dial TCP:", err)
return return
} }
t.waitgroup.Add(1)
t.handler(conn, false, nil) t.handler(conn, false, nil)
} }
}() }()
} }
func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) { func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) {
defer t.waitgroup.Done() // Happens after sock.close
defer sock.Close() defer sock.Close()
t.setExtraOptions(sock) t.setExtraOptions(sock)
stream := stream{} stream := stream{}