diff --git a/src/core/api.go b/src/core/api.go index c6e60381..ebc818f5 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -6,8 +6,11 @@ import ( "fmt" "net" "net/url" + "sync/atomic" "time" + "github.com/Arceliar/phony" + "github.com/Arceliar/ironwood/network" "github.com/yggdrasil-network/yggdrasil-go/src/address" ) @@ -70,32 +73,30 @@ func (c *Core) GetPeers() []PeerInfo { conns[p.Conn] = p } - c.links.RLock() - defer c.links.RUnlock() - for info, state := range c.links._links { - var peerinfo PeerInfo - var conn net.Conn - state.RLock() - peerinfo.URI = info.uri - peerinfo.LastError = state._err - peerinfo.LastErrorTime = state._errtime - if c := state._conn; c != nil { - conn = c - peerinfo.Up = true - peerinfo.Inbound = state.linkType == linkTypeIncoming - peerinfo.RXBytes = c.rx - peerinfo.TXBytes = c.tx - peerinfo.Uptime = time.Since(c.up) + phony.Block(&c.links, func() { + for info, state := range c.links._links { + var peerinfo PeerInfo + var conn net.Conn + peerinfo.URI = info.uri + peerinfo.LastError = state._err + peerinfo.LastErrorTime = state._errtime + if c := state._conn; c != nil { + conn = c + peerinfo.Up = true + peerinfo.Inbound = state.linkType == linkTypeIncoming + peerinfo.RXBytes = atomic.LoadUint64(&c.rx) + peerinfo.TXBytes = atomic.LoadUint64(&c.tx) + peerinfo.Uptime = time.Since(c.up) + } + if p, ok := conns[conn]; ok { + peerinfo.Key = p.Key + peerinfo.Root = p.Root + peerinfo.Port = p.Port + peerinfo.Priority = p.Priority + } + peers = append(peers, peerinfo) } - state.RUnlock() - if p, ok := conns[conn]; ok { - peerinfo.Key = p.Key - peerinfo.Root = p.Root - peerinfo.Port = p.Port - peerinfo.Priority = p.Priority - } - peers = append(peers, peerinfo) - } + }) return peers } diff --git a/src/core/link.go b/src/core/link.go index 713e4db8..c13d6af3 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -13,7 +13,6 @@ import ( "net/url" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -30,13 +29,14 @@ const ( ) type links struct { - core *Core - tcp *linkTCP // TCP interface support - tls *linkTLS // TLS interface support - unix *linkUNIX // UNIX interface support - socks *linkSOCKS // SOCKS interface support - sync.RWMutex // Protects the below - _links map[linkInfo]*link // *link is nil if connection in progress + phony.Inbox + core *Core + tcp *linkTCP // TCP interface support + tls *linkTLS // TLS interface support + unix *linkUNIX // UNIX interface support + socks *linkSOCKS // SOCKS interface support + // _links can only be modified safely from within the links actor + _links map[linkInfo]*link // *link is nil if connection in progress } type linkProtocol interface { @@ -52,13 +52,13 @@ type linkInfo struct { // link tracks the state of a connection, either persistent or non-persistent type link struct { - kick chan struct{} // Attempt to reconnect now, if backing off - linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral - linkProto string // Protocol carrier of link, e.g. TCP, AWDL - sync.RWMutex // Protects the below - _conn *linkConn // Connected link, if any, nil if not connected - _err error // Last error on the connection, if any - _errtime time.Time // Last time an error occured + kick chan struct{} // Attempt to reconnect now, if backing off + linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral + linkProto string // Protocol carrier of link, e.g. TCP, AWDL + // The remaining fields can only be modified safely from within the links actor + _conn *linkConn // Connected link, if any, nil if not connected + _err error // Last error on the connection, if any + _errtime time.Time // Last time an error occured } type linkOptions struct { @@ -131,183 +131,192 @@ const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid") const ErrLinkUnrecognisedSchema = linkError("link schema unknown") func (l *links) add(u *url.URL, sintf string, linkType linkType) error { - // Generate the link info and see whether we think we already - // have an open peering to this peer. - lu := urlForLinkInfo(*u) - info := linkInfo{ - uri: lu.String(), - sintf: sintf, - } - - // Collect together the link options, these are global options - // that are not specific to any given protocol. - var options linkOptions - for _, pubkey := range u.Query()["key"] { - sigPub, err := hex.DecodeString(pubkey) - if err != nil { - return ErrLinkPinnedKeyInvalid + var retErr error + phony.Block(l, func() { + // Generate the link info and see whether we think we already + // have an open peering to this peer. + lu := urlForLinkInfo(*u) + info := linkInfo{ + uri: lu.String(), + sintf: sintf, } - var sigPubKey keyArray - copy(sigPubKey[:], sigPub) - if options.pinnedEd25519Keys == nil { - options.pinnedEd25519Keys = map[keyArray]struct{}{} - } - options.pinnedEd25519Keys[sigPubKey] = struct{}{} - } - if p := u.Query().Get("priority"); p != "" { - pi, err := strconv.ParseUint(p, 10, 8) - if err != nil { - return ErrLinkPriorityInvalid - } - options.priority = uint8(pi) - } - // If we think we're already connected to this peer, load up - // the existing peer state. Try to kick the peer if possible, - // which will cause an immediate connection attempt if it is - // backing off for some reason. - l.Lock() - state, ok := l._links[info] - if ok && state != nil { - select { - case state.kick <- struct{}{}: - default: - } - l.Unlock() - return ErrLinkAlreadyConfigured - } - - // Create the link entry. This will contain the connection - // in progress (if any), any error details and a context that - // lets the link be cancelled later. - state = &link{ - linkType: linkType, - linkProto: strings.ToUpper(u.Scheme), - kick: make(chan struct{}), - } - - // Store the state of the link so that it can be queried later. - l._links[info] = state - l.Unlock() - - // Track how many consecutive connection failures we have had, - // as we will back off exponentially rather than hammering the - // remote node endlessly. - var backoff int - - // backoffNow is called when there's a connection error. It - // will wait for the specified amount of time and then return - // true, unless the peering context was cancelled (due to a - // peer removal most likely), in which case it returns false. - // The caller should check the return value to decide whether - // or not to give up trying. - backoffNow := func() bool { - backoff++ - duration := time.Second * time.Duration(math.Exp2(float64(backoff))) - select { - case <-time.After(duration): - return true - case <-state.kick: - return true - case <-l.core.ctx.Done(): - return false - } - } - - // The goroutine is responsible for attempting the connection - // and then running the handler. If the connection is persistent - // then the loop will run endlessly, using backoffs as needed. - // Otherwise the loop will end, cleaning up the link entry. - go func() { - defer func() { - l.Lock() - defer l.Unlock() - delete(l._links, info) - }() - - // This loop will run each and every time we want to attempt - // a connection to this peer. - for { - conn, err := l.connect(u, info, options) + // Collect together the link options, these are global options + // that are not specific to any given protocol. + var options linkOptions + for _, pubkey := range u.Query()["key"] { + sigPub, err := hex.DecodeString(pubkey) if err != nil { - if linkType == linkTypePersistent { - // If the link is a persistent configured peering, - // store information about the connection error so - // that we can report it through the admin socket. - state.Lock() - state._conn = nil - state._err = err - state._errtime = time.Now() - state.Unlock() + retErr = ErrLinkPinnedKeyInvalid + return + } + var sigPubKey keyArray + copy(sigPubKey[:], sigPub) + if options.pinnedEd25519Keys == nil { + options.pinnedEd25519Keys = map[keyArray]struct{}{} + } + options.pinnedEd25519Keys[sigPubKey] = struct{}{} + } + if p := u.Query().Get("priority"); p != "" { + pi, err := strconv.ParseUint(p, 10, 8) + if err != nil { + retErr = ErrLinkPriorityInvalid + return + } + options.priority = uint8(pi) + } - // Back off for a bit. If true is returned here, we - // can continue onto the next loop iteration to try - // the next connection. + // If we think we're already connected to this peer, load up + // the existing peer state. Try to kick the peer if possible, + // which will cause an immediate connection attempt if it is + // backing off for some reason. + state, ok := l._links[info] + if ok && state != nil { + select { + case state.kick <- struct{}{}: + default: + } + retErr = ErrLinkAlreadyConfigured + return + } + + // Create the link entry. This will contain the connection + // in progress (if any), any error details and a context that + // lets the link be cancelled later. + state = &link{ + linkType: linkType, + linkProto: strings.ToUpper(u.Scheme), + kick: make(chan struct{}), + } + + // Store the state of the link so that it can be queried later. + l._links[info] = state + + // Track how many consecutive connection failures we have had, + // as we will back off exponentially rather than hammering the + // remote node endlessly. + var backoff int + + // backoffNow is called when there's a connection error. It + // will wait for the specified amount of time and then return + // true, unless the peering context was cancelled (due to a + // peer removal most likely), in which case it returns false. + // The caller should check the return value to decide whether + // or not to give up trying. + backoffNow := func() bool { + backoff++ + duration := time.Second * time.Duration(math.Exp2(float64(backoff))) + select { + case <-time.After(duration): + return true + case <-state.kick: + return true + case <-l.core.ctx.Done(): + return false + } + } + + // The goroutine is responsible for attempting the connection + // and then running the handler. If the connection is persistent + // then the loop will run endlessly, using backoffs as needed. + // Otherwise the loop will end, cleaning up the link entry. + go func() { + defer func() { + phony.Block(l, func() { + if l._links[info] == state { + delete(l._links, info) + } + }) + }() + + // This loop will run each and every time we want to attempt + // a connection to this peer. + // TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed + for { + conn, err := l.connect(u, info, options) + if err != nil { + if linkType == linkTypePersistent { + // If the link is a persistent configured peering, + // store information about the connection error so + // that we can report it through the admin socket. + phony.Block(l, func() { + state._conn = nil + state._err = err + state._errtime = time.Now() + }) + + // Back off for a bit. If true is returned here, we + // can continue onto the next loop iteration to try + // the next connection. + if backoffNow() { + continue + } else { + return + } + } else { + // Ephemeral and incoming connections don't remain + // after a connection failure, so exit out of the + // loop and clean up the link entry. + break + } + } + + // The linkConn wrapper allows us to track the number of + // bytes written to and read from this connection without + // the help of ironwood. + lc := &linkConn{ + Conn: conn, + up: time.Now(), + } + + // Update the link state with our newly wrapped connection. + // Clear the error state. + var doRet bool + phony.Block(l, func() { + if state._conn != nil { + // If a peering has come up in this time, abort this one. + doRet = true + } + state._conn = lc + }) + if doRet { + return + } + + // Give the connection to the handler. The handler will block + // for the lifetime of the connection. + if err = l.handler(linkType, options, lc); err != nil && err != io.EOF { + l.core.log.Debugf("Link %s error: %s\n", info.uri, err) + } else { + backoff = 0 + } + + // The handler has stopped running so the connection is dead, + // try to close the underlying socket just in case and then + // update the link state. + _ = lc.Close() + phony.Block(l, func() { + state._conn = nil + if state._err = err; state._err != nil { + state._errtime = time.Now() + } + }) + + // If the link is persistently configured, back off if needed + // and then try reconnecting. Otherwise, exit out. + if linkType == linkTypePersistent { if backoffNow() { continue } else { return } } else { - // Ephemeral and incoming connections don't remain - // after a connection failure, so exit out of the - // loop and clean up the link entry. break } } - - // The linkConn wrapper allows us to track the number of - // bytes written to and read from this connection without - // the help of ironwood. - lc := &linkConn{ - Conn: conn, - up: time.Now(), - } - - // Update the link state with our newly wrapped connection. - // Clear the error state. - state.Lock() - if state._conn != nil { - // If a peering has come up in this time, abort this one. - state.Unlock() - return - } - state._conn = lc - state.Unlock() - - // Give the connection to the handler. The handler will block - // for the lifetime of the connection. - if err = l.handler(linkType, options, lc); err != nil && err != io.EOF { - l.core.log.Debugf("Link %s error: %s\n", info.uri, err) - } else { - backoff = 0 - } - - // The handler has stopped running so the connection is dead, - // try to close the underlying socket just in case and then - // update the link state. - _ = lc.Close() - state.Lock() - state._conn = nil - if state._err = err; state._err != nil { - state._errtime = time.Now() - } - state.Unlock() - - // If the link is persistently configured, back off if needed - // and then try reconnecting. Otherwise, exit out. - if linkType == linkTypePersistent { - if backoffNow() { - continue - } else { - return - } - } else { - break - } - } - }() - return nil + }() + }) + return retErr } func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { @@ -369,43 +378,45 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { // If there's an existing link state for this link, get it. // If this node is already connected to us, just drop the // connection. This prevents duplicate peerings. - l.Lock() - state, ok := l._links[info] - if !ok || state == nil { - state = &link{ - linkType: linkTypeIncoming, - linkProto: strings.ToUpper(u.Scheme), - kick: make(chan struct{}), + var lc *linkConn + var state *link + phony.Block(l, func() { + var ok bool + state, ok = l._links[info] + if !ok || state == nil { + state = &link{ + linkType: linkTypeIncoming, + linkProto: strings.ToUpper(u.Scheme), + kick: make(chan struct{}), + } } - } - state.Lock() - if state._conn != nil { - // If a connection has come up in this time, abort - // this one. - state.Unlock() - l.Unlock() + if state._conn != nil { + // If a connection has come up in this time, abort + // this one. + return + } + + // The linkConn wrapper allows us to track the number of + // bytes written to and read from this connection without + // the help of ironwood. + lc = &linkConn{ + Conn: conn, + up: time.Now(), + } + + // Update the link state with our newly wrapped connection. + // Clear the error state. + state._conn = lc + state._err = nil + state._errtime = time.Time{} + + // Store the state of the link so that it can be queried later. + l._links[info] = state + }) + if lc == nil { return } - // The linkConn wrapper allows us to track the number of - // bytes written to and read from this connection without - // the help of ironwood. - lc := &linkConn{ - Conn: conn, - up: time.Now(), - } - - // Update the link state with our newly wrapped connection. - // Clear the error state. - state._conn = lc - state._err = nil - state._errtime = time.Time{} - state.Unlock() - - // Store the state of the link so that it can be queried later. - l._links[info] = state - l.Unlock() - // Give the connection to the handler. The handler will block // for the lifetime of the connection. if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF { @@ -416,9 +427,11 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { // try to close the underlying socket just in case and then // drop the link state. _ = lc.Close() - l.Lock() - delete(l._links, info) - l.Unlock() + phony.Block(l, func() { + if l._links[info] == state { + delete(l._links, info) + } + }) }(conn) } }() diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index e6fdb80a..9cd67ff1 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" "net" "net/url" "time" @@ -337,7 +338,8 @@ func (m *Multicast) _announce() { break } } - m._timer = time.AfterFunc(time.Second, func() { + annInterval := time.Second + time.Microsecond*(time.Duration(rand.Intn(1048576))) // Randomize delay + m._timer = time.AfterFunc(annInterval, func() { m.Act(nil, m._announce) }) }