From 73c6c25bd9d518a5bdc33e40c2f38e44505b706b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 22 Oct 2023 10:27:41 +0100 Subject: [PATCH] Restore `removePeer` method --- src/admin/removepeer.go | 11 ++++++- src/core/api.go | 25 ++------------- src/core/link.go | 70 +++++++++++++++++++++++++++++++---------- 3 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/admin/removepeer.go b/src/admin/removepeer.go index 145b8524..6b2e162e 100644 --- a/src/admin/removepeer.go +++ b/src/admin/removepeer.go @@ -1,5 +1,10 @@ package admin +import ( + "fmt" + "net/url" +) + type RemovePeerRequest struct { Uri string `json:"uri"` Sintf string `json:"interface,omitempty"` @@ -8,5 +13,9 @@ type RemovePeerRequest struct { type RemovePeerResponse struct{} func (a *AdminSocket) removePeerHandler(req *RemovePeerRequest, res *RemovePeerResponse) error { - return a.core.RemovePeer(req.Uri, req.Sintf) + u, err := url.Parse(req.Uri) + if err != nil { + return fmt.Errorf("unable to parse peering URI: %w", err) + } + return a.core.RemovePeer(u, req.Sintf) } diff --git a/src/core/api.go b/src/core/api.go index ebc818f5..b5fa93cb 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -3,7 +3,6 @@ package core import ( "crypto/ed25519" "encoding/json" - "fmt" "net" "net/url" "sync/atomic" @@ -192,28 +191,8 @@ func (c *Core) AddPeer(u *url.URL, sintf string) error { // RemovePeer removes a peer. The peer should be specified in URI format, see AddPeer. // The peer is not disconnected immediately. -func (c *Core) RemovePeer(uri string, sourceInterface string) error { - return fmt.Errorf("not implemented yet") - /* - var err error - phony.Block(c, func() { - peer := Peer{uri, sourceInterface} - linkInfo, ok := c.config._peers[peer] - if !ok { - err = fmt.Errorf("peer not configured") - return - } - if ok && linkInfo != nil { - c.links.Act(nil, func() { - if link := c.links._links[*linkInfo]; link != nil { - _ = link.conn.Close() - } - }) - } - delete(c.config._peers, peer) - }) - return err - */ +func (c *Core) RemovePeer(u *url.URL, sintf string) error { + return c.links.remove(u, sintf, linkTypePersistent) } // CallPeer calls a peer once. This should be specified in the peer URI format, diff --git a/src/core/link.go b/src/core/link.go index 4d51c03a..c3808698 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -53,9 +53,11 @@ type linkInfo struct { // link tracks the state of a connection, either persistent or non-persistent type link struct { - kick chan struct{} // Attempt to reconnect now, if backing off - linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral - linkProto string // Protocol carrier of link, e.g. TCP, AWDL + ctx context.Context // Connection context + cancel context.CancelFunc // Stop future redial attempts (when peer removed) + kick chan struct{} // Attempt to reconnect now, if backing off + linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral + linkProto string // Protocol carrier of link, e.g. TCP, AWDL // The remaining fields can only be modified safely from within the links actor _conn *linkConn // Connected link, if any, nil if not connected _err error // Last error on the connection, if any @@ -129,6 +131,7 @@ type linkError string func (e linkError) Error() string { return string(e) } const ErrLinkAlreadyConfigured = linkError("peer is already configured") +const ErrLinkNotConfigured = linkError("peer is not configured") const ErrLinkPriorityInvalid = linkError("priority value is invalid") const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid") const ErrLinkPasswordInvalid = linkError("password is invalid") @@ -199,6 +202,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { linkProto: strings.ToUpper(u.Scheme), kick: make(chan struct{}), } + state.ctx, state.cancel = context.WithCancel(l.core.ctx) // Store the state of the link so that it can be queried later. l._links[info] = state @@ -218,12 +222,14 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { backoff++ duration := time.Second * time.Duration(math.Exp2(float64(backoff))) select { - case <-time.After(duration): - return true case <-state.kick: return true + case <-state.ctx.Done(): + return false case <-l.core.ctx.Done(): return false + case <-time.After(duration): + return true } } @@ -232,19 +238,25 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { // then the loop will run endlessly, using backoffs as needed. // Otherwise the loop will end, cleaning up the link entry. go func() { - defer func() { - phony.Block(l, func() { - if l._links[info] == state { - delete(l._links, info) - } - }) - }() + defer phony.Block(l, func() { + if l._links[info] == state { + delete(l._links, info) + } + }) // This loop will run each and every time we want to attempt // a connection to this peer. // TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed for { - conn, err := l.connect(u, info, options) + select { + case <-state.ctx.Done(): + // The peering context has been cancelled, so don't try + // to dial again. + return + default: + } + + conn, err := l.connect(state.ctx, u, info, options) if err != nil { if linkType == linkTypePersistent { // If the link is a persistent configured peering, @@ -319,13 +331,39 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { } return } - break } }() }) return retErr } +func (l *links) remove(u *url.URL, sintf string, linkType linkType) error { + var retErr error + phony.Block(l, func() { + // Generate the link info and see whether we think we already + // have an open peering to this peer. + lu := urlForLinkInfo(*u) + info := linkInfo{ + uri: lu.String(), + sintf: sintf, + } + + // If this peer is already configured then we will close the + // connection and stop it from retrying. + state, ok := l._links[info] + if ok && state != nil { + state.cancel() + if conn := state._conn; conn != nil { + retErr = conn.Close() + } + return + } + + retErr = ErrLinkNotConfigured + }) + return retErr +} + func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { ctx, cancel := context.WithCancel(l.core.ctx) var protocol linkProtocol @@ -453,7 +491,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { return li, nil } -func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) { +func (l *links) connect(ctx context.Context, u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) { var dialer linkProtocol switch strings.ToLower(u.Scheme) { case "tcp": @@ -485,7 +523,7 @@ func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Con default: return nil, ErrLinkUnrecognisedSchema } - return dialer.dial(l.core.ctx, u, info, options) + return dialer.dial(ctx, u, info, options) } func (l *links) handler(linkType linkType, options linkOptions, conn net.Conn) error {