From 7a28eb787ee860e1555af52673b046ccb4b0d56c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 11 Aug 2019 13:00:19 -0500 Subject: [PATCH 1/2] try to fix a few edge cases with searches that could lead them to ending without the callback being run or without cleaning up the old search info --- src/yggdrasil/conn.go | 4 ++-- src/yggdrasil/search.go | 14 ++------------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 20db931d..134f3cd2 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -130,9 +130,9 @@ func (c *Conn) doSearch() { searchCompleted := func(sinfo *sessionInfo, e error) {} sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) + // Start the search + sinfo.continueSearch() } - // Continue the search - sinfo.continueSearch() } go func() { c.core.router.admin <- routerWork }() } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 4c31fd6b..676ac4f9 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -36,7 +36,6 @@ type searchInfo struct { core *Core dest crypto.NodeID mask crypto.NodeID - time time.Time toVisit []*dhtInfo visited map[crypto.NodeID]bool callback func(*sessionInfo, error) @@ -65,17 +64,10 @@ func (s *searches) init(core *Core) { // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { - now := time.Now() - //for dest, sinfo := range s.searches { - // if now.Sub(sinfo.time) > time.Minute { - // delete(s.searches, dest) - // } - //} info := searchInfo{ core: s.core, dest: *dest, mask: *mask, - time: now.Add(-time.Second), callback: callback, } s.searches[*dest] = &info @@ -154,10 +146,6 @@ func (sinfo *searchInfo) doSearchStep() { // If we've recenty sent a ping for this search, do nothing. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. func (sinfo *searchInfo) continueSearch() { - if time.Since(sinfo.time) < search_RETRY_TIME { - return - } - sinfo.time = time.Now() sinfo.doSearchStep() // In case the search dies, try to spawn another thread later // Note that this will spawn multiple parallel searches as time passes @@ -209,6 +197,8 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { if sess == nil { // nil if the DHT search finished but the session wasn't allowed sinfo.callback(nil, errors.New("session not allowed")) + // Cleanup + delete(sinfo.core.searches.searches, res.Dest) return true } _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) From 277da1fe60fb6e32bfdbe572ff23f3fc585dbd99 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 11 Aug 2019 13:11:14 -0500 Subject: [PATCH 2/2] make sure searches don't end if try to continue (in parallel) with nowhere left to send, but we just sent a search and are still waiting for a response --- src/yggdrasil/search.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 676ac4f9..b970fe55 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -36,6 +36,7 @@ type searchInfo struct { core *Core dest crypto.NodeID mask crypto.NodeID + time time.Time toVisit []*dhtInfo visited map[crypto.NodeID]bool callback func(*sessionInfo, error) @@ -68,6 +69,7 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba core: s.core, dest: *dest, mask: *mask, + time: time.Now(), callback: callback, } s.searches[*dest] = &info @@ -130,9 +132,11 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) { // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { - // Dead end, do cleanup - delete(sinfo.core.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) + if time.Since(sinfo.time) > search_RETRY_TIME { + // Dead end and no response in too long, do cleanup + delete(sinfo.core.searches.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) + } return } // Send to the next search target @@ -141,6 +145,7 @@ func (sinfo *searchInfo) doSearchStep() { rq := dhtReqKey{next.key, sinfo.dest} sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) sinfo.core.dht.ping(next, &sinfo.dest) + sinfo.time = time.Now() } // If we've recenty sent a ping for this search, do nothing.