From 502f2937a9419fabc7985da9a55d5913a40499db Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Aug 2019 17:00:02 -0500 Subject: [PATCH] a couple race fixes and use timer.AfterFunc instead of sleeping goroutines or ticker in a few places --- src/tuntap/conn.go | 31 ++++--------------------------- src/tuntap/tun.go | 11 +++++------ src/yggdrasil/api.go | 6 ++---- src/yggdrasil/debug.go | 4 ++-- src/yggdrasil/link.go | 2 +- src/yggdrasil/nodeinfo.go | 34 +++++++++++++++++----------------- src/yggdrasil/peer.go | 25 ++++++++++++++----------- src/yggdrasil/router.go | 22 +++++++++------------- src/yggdrasil/search.go | 22 ++++++++++------------ src/yggdrasil/session.go | 1 - src/yggdrasil/switch.go | 6 ++---- 11 files changed, 66 insertions(+), 98 deletions(-) diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index 0e0dd461..31490916 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -23,7 +23,7 @@ type tunConn struct { addr address.Address snet address.Subnet stop chan struct{} - alive chan struct{} + alive *time.Timer // From calling time.AfterFunc } func (s *tunConn) close() { @@ -40,10 +40,6 @@ func (s *tunConn) _close_nomutex() { defer func() { recover() }() close(s.stop) // Closes reader/writer goroutines }() - func() { - defer func() { recover() }() - close(s.alive) // Closes timeout goroutine - }() } func (s *tunConn) _read(bs []byte) (err error) { @@ -228,27 +224,8 @@ func (s *tunConn) _write(bs []byte) (err error) { } func (s *tunConn) stillAlive() { - defer func() { recover() }() - select { - case s.alive <- struct{}{}: - default: - } -} - -func (s *tunConn) checkForTimeouts() error { - timer := time.NewTimer(tunConnTimeout) - defer util.TimerStop(timer) - defer s.close() - for { - select { - case _, ok := <-s.alive: - if !ok { - return errors.New("connection closed") - } - util.TimerStop(timer) - timer.Reset(tunConnTimeout) - case <-timer.C: - return errors.New("timed out") - } + if s.alive != nil { + s.alive.Stop() } + s.alive = time.AfterFunc(tunConnTimeout, s.close) } diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index dfc343df..d73384d4 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -227,10 +227,9 @@ func (tun *TunAdapter) handler() error { func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { // Prepare a session wrapper for the given connection s := tunConn{ - tun: tun, - conn: conn, - stop: make(chan struct{}), - alive: make(chan struct{}, 1), + tun: tun, + conn: conn, + stop: make(chan struct{}), } c = &s // Get the remote address and subnet of the other side @@ -255,13 +254,13 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { // we receive a packet through the interface for this address tun.addrToConn[s.addr] = &s tun.subnetToConn[s.snet] = &s - // Set the read callback and start the timeout goroutine + // Set the read callback and start the timeout conn.SetReadCallback(func(bs []byte) { s.RecvFrom(conn, func() { s._read(bs) }) }) - go s.checkForTimeouts() + s.RecvFrom(nil, s.stillAlive) // Return return c, err } diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 2bc5c816..c6c15e24 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -344,10 +344,8 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) } c.router.doAdmin(sendNodeInfoRequest) - go func() { - time.Sleep(6 * time.Second) - close(response) - }() + timer := time.AfterFunc(6*time.Second, func() { close(response) }) + defer timer.Stop() for res := range response { return *res, nil } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f6ec8716..c6f98ca0 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -552,7 +552,7 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { //////////////////////////////////////////////////////////////////////////////// func DEBUG_simLinkPeers(p, q *peer) { - // Sets q.out() to point to p and starts p.linkLoop() + // Sets q.out() to point to p and starts p.start() goWorkers := func(source, dest *peer) { source.linkOut = make(chan []byte, 1) send := make(chan []byte, 1) @@ -561,7 +561,7 @@ func DEBUG_simLinkPeers(p, q *peer) { send <- bs } } - go source.linkLoop() + go source.start() go func() { var packets [][]byte for { diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index bfbcc99b..1b48f391 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -216,7 +216,7 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop - go intf.peer.linkLoop() + go intf.peer.start() // Start the writer signalReady := make(chan struct{}, 1) signalSent := make(chan bool, 1) diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 50f5bf9c..8a5d7872 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -47,25 +47,25 @@ func (m *nodeinfo) init(core *Core) { m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback) m.cache = make(map[crypto.BoxPubKey]nodeinfoCached) - go func() { - for { - m.callbacksMutex.Lock() - for boxPubKey, callback := range m.callbacks { - if time.Since(callback.created) > time.Minute { - delete(m.callbacks, boxPubKey) - } + var f func() + f = func() { + m.callbacksMutex.Lock() + for boxPubKey, callback := range m.callbacks { + if time.Since(callback.created) > time.Minute { + delete(m.callbacks, boxPubKey) } - m.callbacksMutex.Unlock() - m.cacheMutex.Lock() - for boxPubKey, cache := range m.cache { - if time.Since(cache.created) > time.Hour { - delete(m.cache, boxPubKey) - } - } - m.cacheMutex.Unlock() - time.Sleep(time.Second * 30) } - }() + m.callbacksMutex.Unlock() + m.cacheMutex.Lock() + for boxPubKey, cache := range m.cache { + if time.Since(cache.created) > time.Hour { + delete(m.cache, boxPubKey) + } + } + m.cacheMutex.Unlock() + time.AfterFunc(time.Second*30, f) + } + go f() } // Add a callback for a nodeinfo lookup diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 989d9ee1..50fb03f8 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -184,18 +184,21 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) { // This must be launched in a separate goroutine by whatever sets up the peer struct. // It handles link protocol traffic. -func (p *peer) linkLoop() { - tick := time.NewTicker(time.Second) - defer tick.Stop() - <-p.SyncExec(p._sendSwitchMsg) // Startup message - for { - select { - case <-p.done: - return - case _ = <-tick.C: - <-p.SyncExec(p._updateDHT) - } +func (p *peer) start() { + var updateDHT func() + updateDHT = func() { + <-p.SyncExec(func() { + select { + case <-p.done: + default: + p._updateDHT() + time.AfterFunc(time.Second, updateDHT) + } + }) } + updateDHT() + // Just for good measure, immediately send a switch message to this peer when we start + <-p.SyncExec(p._sendSwitchMsg) } func (p *peer) _updateDHT() { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 002905bc..fec55b7c 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -92,7 +92,7 @@ func (r *router) reconfigure(e chan error) { // Starts the tickerLoop goroutine. func (r *router) start() error { r.core.log.Infoln("Starting router") - go r.tickerLoop() + go r.doMaintenance() return nil } @@ -122,18 +122,14 @@ func (r *router) reset(from phony.Actor) { // TODO remove reconfigure so this is just a ticker loop // and then find something better than a ticker loop to schedule things... -func (r *router) tickerLoop() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - <-ticker.C - <-r.SyncExec(func() { - // Any periodic maintenance stuff goes here - r.core.switchTable.doMaintenance() - r.dht.doMaintenance() - r.sessions.cleanup() - }) - } +func (r *router) doMaintenance() { + <-r.SyncExec(func() { + // Any periodic maintenance stuff goes here + r.core.switchTable.doMaintenance() + r.dht.doMaintenance() + r.sessions.cleanup() + }) + time.AfterFunc(time.Second, r.doMaintenance) } // Checks incoming traffic type and passes it to the appropriate handler. diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 5fb36584..9cf5f06e 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -152,18 +152,16 @@ func (sinfo *searchInfo) continueSearch() { // In case the search dies, try to spawn another thread later // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later - retryLater := func() { - // FIXME this keeps the search alive forever if not for the searches map, fix that - newSearchInfo := sinfo.searches.searches[sinfo.dest] - if newSearchInfo != sinfo { - return - } - sinfo.continueSearch() - } - go func() { - time.Sleep(search_RETRY_TIME) - sinfo.searches.router.doAdmin(retryLater) - }() + time.AfterFunc(search_RETRY_TIME, func() { + sinfo.searches.router.RecvFrom(nil, func() { + // FIXME this keeps the search alive forever if not for the searches map, fix that + newSearchInfo := sinfo.searches.searches[sinfo.dest] + if newSearchInfo != sinfo { + return + } + sinfo.continueSearch() + }) + }) } // Calls create search, and initializes the iterative search parts of the struct before returning it. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index e448cf25..94ee41e6 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -262,7 +262,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.sessions.removeSession(&sinfo) }) }() - //go sinfo.startWorkers() return &sinfo } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index cb5cf1eb..6d882252 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -200,10 +200,8 @@ func (t *switchTable) init(core *Core) { } func (t *switchTable) reconfigure(e chan error) { - go func() { - defer close(e) - // This is where reconfiguration would go, if we had anything useful to do. - }() + defer close(e) + // This is where reconfiguration would go, if we had anything useful to do. } // Safely gets a copy of this node's locator.