Use sync.Map instead of link actor

This commit is contained in:
Neil Alexander 2023-05-20 18:31:01 +01:00
parent 6e338b6f89
commit 5ba9dadc49
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
2 changed files with 49 additions and 62 deletions

View File

@ -71,31 +71,32 @@ func (c *Core) GetPeers() []PeerInfo {
conns[p.Conn] = p conns[p.Conn] = p
} }
phony.Block(&c.links, func() { c.links.links.Range(func(key, value any) bool {
for info, state := range c.links._links { info := key.(linkInfo)
var peerinfo PeerInfo state := value.(*link)
var conn net.Conn var peerinfo PeerInfo
phony.Block(state, func() { var conn net.Conn
peerinfo.URI = info.uri phony.Block(state, func() {
peerinfo.LastError = state._err peerinfo.URI = info.uri
peerinfo.LastErrorTime = state._errtime peerinfo.LastError = state._err
if c := state._conn; c != nil { peerinfo.LastErrorTime = state._errtime
conn = c if c := state._conn; c != nil {
peerinfo.Up = true conn = c
peerinfo.Inbound = info.linkType == linkTypeIncoming peerinfo.Up = true
peerinfo.RXBytes = c.rx peerinfo.Inbound = info.linkType == linkTypeIncoming
peerinfo.TXBytes = c.tx peerinfo.RXBytes = c.rx
peerinfo.Uptime = time.Since(c.up) peerinfo.TXBytes = c.tx
} peerinfo.Uptime = time.Since(c.up)
})
if p, ok := conns[conn]; ok {
peerinfo.Key = p.Key
peerinfo.Root = p.Root
peerinfo.Port = p.Port
peerinfo.Priority = p.Priority
} }
peers = append(peers, peerinfo) })
if p, ok := conns[conn]; ok {
peerinfo.Key = p.Key
peerinfo.Root = p.Root
peerinfo.Port = p.Port
peerinfo.Priority = p.Priority
} }
peers = append(peers, peerinfo)
return true
}) })
return peers return peers

View File

@ -12,6 +12,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -28,13 +29,12 @@ const (
) )
type links struct { type links struct {
phony.Inbox core *Core
core *Core tcp *linkTCP // TCP interface support
tcp *linkTCP // TCP interface support tls *linkTLS // TLS interface support
tls *linkTLS // TLS interface support unix *linkUNIX // UNIX interface support
unix *linkUNIX // UNIX interface support socks *linkSOCKS // SOCKS interface support
socks *linkSOCKS // SOCKS interface support links sync.Map // map[linkInfo]*link // *link is nil if connection in progress
_links map[linkInfo]*link // *link is nil if connection in progress
} }
type linkProtocol interface { type linkProtocol interface {
@ -92,7 +92,6 @@ func (l *links) init(c *Core) error {
l.tls = l.newLinkTLS(l.tcp) l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX() l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS() l.socks = l.newLinkSOCKS()
l._links = make(map[linkInfo]*link)
var listeners []ListenAddress var listeners []ListenAddress
phony.Block(c, func() { phony.Block(c, func() {
@ -124,15 +123,11 @@ func (l *links) shutdown() {
} }
func (l *links) isConnectedTo(info linkInfo) bool { func (l *links) isConnectedTo(info linkInfo) bool {
var isConnected bool li, ok := l.links.Load(info)
phony.Block(l, func() { if !ok || li == nil {
link, ok := l._links[info] return false
if !ok { }
return return li.(*link)._conn != nil
}
isConnected = link._conn != nil
})
return isConnected
} }
type linkError string type linkError string
@ -152,12 +147,12 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
sintf: sintf, sintf: sintf,
linkType: linkType, linkType: linkType,
} }
var state *link var state *link
var ok bool if s, ok := l.links.Load(info); ok {
phony.Block(l, func() { state = s.(*link)
state, ok = l._links[info] }
}) if state != nil {
if ok && state != nil {
select { select {
case state.kick <- struct{}{}: case state.kick <- struct{}{}:
default: default:
@ -201,9 +196,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// Store the state of the link, try to connect and then run // Store the state of the link, try to connect and then run
// the handler. // the handler.
phony.Block(l, func() { l.links.Store(info, state)
l._links[info] = state
})
// Track how many consecutive connection failures we have had, // Track how many consecutive connection failures we have had,
// as we will back off exponentially rather than hammering the // as we will back off exponentially rather than hammering the
@ -234,9 +227,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// then the loop will run endlessly, using backoffs as needed. // then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry. // Otherwise the loop will end, cleaning up the link entry.
go func() { go func() {
defer phony.Block(l, func() { l.links.Delete(info)
delete(l._links, info)
})
for { for {
conn, err := l.connect(u, info, options) conn, err := l.connect(u, info, options)
if err != nil { if err != nil {
@ -333,11 +324,10 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
continue continue
} }
var state *link var state *link
var ok bool if s, ok := l.links.Load(info); ok {
phony.Block(l, func() { state = s.(*link)
state = l._links[info] }
}) if state == nil {
if !ok || state == nil {
state = &link{ state = &link{
info: info, info: info,
} }
@ -352,9 +342,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
state._err = nil state._err = nil
state.linkProto = strings.ToUpper(u.Scheme) state.linkProto = strings.ToUpper(u.Scheme)
}) })
phony.Block(l, func() { l.links.Store(info, state)
l._links[info] = state
})
if err = l.handler(&info, options, lc); err != nil && err != io.EOF { if err = l.handler(&info, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", u.Host, err) l.core.log.Debugf("Link %s error: %s\n", u.Host, err)
} }
@ -364,9 +352,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
state._errtime = time.Now() state._errtime = time.Now()
} }
}) })
phony.Block(l, func() { l.links.Delete(info)
delete(l._links, info)
})
} }
}() }()
return li, nil return li, nil