From b1283e15f63dc7a553e0381efed6abdfa6006819 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 11 Aug 2024 10:42:25 +0100 Subject: [PATCH] Link state tracking tweaks and improved shutdown --- src/core/link.go | 66 +++++++++++++++++++------------------------ src/core/link_tcp.go | 2 -- src/core/link_tls.go | 10 +++---- src/core/link_unix.go | 6 ++-- src/core/link_ws.go | 6 +++- 5 files changed, 40 insertions(+), 50 deletions(-) diff --git a/src/core/link.go b/src/core/link.go index ee2ad06f..1ead4e32 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "errors" "fmt" "io" "net" @@ -40,7 +41,8 @@ type links struct { ws *linkWS // WS interface support wss *linkWSS // WSS interface support // _links can only be modified safely from within the links actor - _links map[linkInfo]*link // *link is nil if connection in progress + _links map[linkInfo]*link // *link is nil if connection in progress + _listeners map[*Listener]context.CancelFunc } type linkProtocol interface { @@ -85,13 +87,6 @@ func (l *Listener) Addr() net.Addr { return l.listener.Addr() } -func (l *Listener) Close() error { - l.Cancel() - err := l.listener.Close() - <-l.ctx.Done() - return err -} - func (l *links) init(c *Core) error { l.core = c l.tcp = l.newLinkTCP() @@ -102,32 +97,18 @@ func (l *links) init(c *Core) error { l.ws = l.newLinkWS() l.wss = l.newLinkWSS() l._links = make(map[linkInfo]*link) - - var listeners []ListenAddress - phony.Block(c, func() { - listeners = make([]ListenAddress, 0, len(c.config._listeners)) - for listener := range c.config._listeners { - listeners = append(listeners, listener) - } - }) + l._listeners = make(map[*Listener]context.CancelFunc) return nil } func (l *links) shutdown() { - phony.Block(l.tcp, func() { - for l := range l.tcp._listeners { - _ = l.Close() + phony.Block(l, func() { + for listener := range l._listeners { + _ = listener.listener.Close() } - }) - phony.Block(l.tls, func() { - for l := range l.tls._listeners { - _ = l.Close() - } - }) - phony.Block(l.unix, func() { - for l := range l.unix._listeners { - _ = l.Close() + for _, link := range l._links { + _ = link._conn.Close() } }) } @@ -457,11 +438,18 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { options.password = []byte(p) } + phony.Block(l, func() { + l._listeners[li] = cancel + }) + go func() { - l.core.log.Infof("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr()) - defer l.core.log.Infof("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr()) + l.core.log.Infof("%s listener started on %s", strings.ToUpper(u.Scheme), li.listener.Addr()) + defer l.core.log.Infof("%s listener stopped on %s", strings.ToUpper(u.Scheme), li.listener.Addr()) + defer phony.Block(l, func() { + delete(l._listeners, li) + }) for { - conn, err := listener.Accept() + conn, err := li.listener.Accept() if err != nil { return } @@ -517,13 +505,22 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { // Store the state of the link so that it can be queried later. l._links[info] = state }) + defer phony.Block(l, func() { + if l._links[info] == state { + delete(l._links, info) + } + }) if lc == nil { return } // Give the connection to the handler. The handler will block // for the lifetime of the connection. - if err = l.handler(linkTypeIncoming, options, lc, nil); err != nil && err != io.EOF { + switch err = l.handler(linkTypeIncoming, options, lc, nil); { + case err == nil: + case errors.Is(err, io.EOF): + case errors.Is(err, net.ErrClosed): + default: l.core.log.Debugf("Link %s error: %s\n", u.Host, err) } @@ -531,11 +528,6 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { // try to close the underlying socket just in case and then // drop the link state. _ = lc.Close() - phony.Block(l, func() { - if l._links[info] == state { - delete(l._links, info) - } - }) }(conn) } }() diff --git a/src/core/link_tcp.go b/src/core/link_tcp.go index f595aeb9..4ee50941 100644 --- a/src/core/link_tcp.go +++ b/src/core/link_tcp.go @@ -15,7 +15,6 @@ type linkTCP struct { phony.Inbox *links listenconfig *net.ListenConfig - _listeners map[*Listener]context.CancelFunc } func (l *links) newLinkTCP() *linkTCP { @@ -24,7 +23,6 @@ func (l *links) newLinkTCP() *linkTCP { listenconfig: &net.ListenConfig{ KeepAlive: -1, }, - _listeners: map[*Listener]context.CancelFunc{}, } lt.listenconfig.Control = lt.tcpContext return lt diff --git a/src/core/link_tls.go b/src/core/link_tls.go index a93227f6..d51d0ce5 100644 --- a/src/core/link_tls.go +++ b/src/core/link_tls.go @@ -13,10 +13,9 @@ import ( type linkTLS struct { phony.Inbox *links - tcp *linkTCP - listener *net.ListenConfig - config *tls.Config - _listeners map[*Listener]context.CancelFunc + tcp *linkTCP + listener *net.ListenConfig + config *tls.Config } func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS { @@ -27,8 +26,7 @@ func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS { Control: tcp.tcpContext, KeepAlive: -1, }, - config: l.core.config.tls.Clone(), - _listeners: map[*Listener]context.CancelFunc{}, + config: l.core.config.tls.Clone(), } return lt } diff --git a/src/core/link_unix.go b/src/core/link_unix.go index 8dde8946..ddbfa0ad 100644 --- a/src/core/link_unix.go +++ b/src/core/link_unix.go @@ -12,9 +12,8 @@ import ( type linkUNIX struct { phony.Inbox *links - dialer *net.Dialer - listener *net.ListenConfig - _listeners map[*Listener]context.CancelFunc + dialer *net.Dialer + listener *net.ListenConfig } func (l *links) newLinkUNIX() *linkUNIX { @@ -27,7 +26,6 @@ func (l *links) newLinkUNIX() *linkUNIX { listener: &net.ListenConfig{ KeepAlive: -1, }, - _listeners: map[*Listener]context.CancelFunc{}, } return lt } diff --git a/src/core/link_ws.go b/src/core/link_ws.go index f323b025..7a7d66f7 100644 --- a/src/core/link_ws.go +++ b/src/core/link_ws.go @@ -14,6 +14,7 @@ import ( type linkWS struct { phony.Inbox *links + listenconfig *net.ListenConfig } type linkWSConn struct { @@ -78,6 +79,9 @@ func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (l *links) newLinkWS() *linkWS { lt := &linkWS{ links: l, + listenconfig: &net.ListenConfig{ + KeepAlive: -1, + }, } return lt } @@ -95,7 +99,7 @@ func (l *linkWS) dial(ctx context.Context, url *url.URL, info linkInfo, options } func (l *linkWS) listen(ctx context.Context, url *url.URL, _ string) (net.Listener, error) { - nl, err := net.Listen("tcp", url.Host) + nl, err := l.listenconfig.Listen(ctx, "tcp", url.Host) if err != nil { return nil, err }