Merge pull request #498 from Arceliar/search

Search fixes
This commit is contained in:
Neil Alexander 2019-08-11 21:13:49 +01:00 committed by GitHub
commit 16076b53b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 16 deletions

View File

@ -130,9 +130,9 @@ func (c *Conn) doSearch() {
searchCompleted := func(sinfo *sessionInfo, e error) {} searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Start the search
sinfo.continueSearch()
} }
// Continue the search
sinfo.continueSearch()
} }
go func() { c.core.router.admin <- routerWork }() go func() { c.core.router.admin <- routerWork }()
} }

View File

@ -65,17 +65,11 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. // Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now()
//for dest, sinfo := range s.searches {
// if now.Sub(sinfo.time) > time.Minute {
// delete(s.searches, dest)
// }
//}
info := searchInfo{ info := searchInfo{
core: s.core, core: s.core,
dest: *dest, dest: *dest,
mask: *mask, mask: *mask,
time: now.Add(-time.Second), time: time.Now(),
callback: callback, callback: callback,
} }
s.searches[*dest] = &info s.searches[*dest] = &info
@ -138,9 +132,11 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (sinfo *searchInfo) doSearchStep() { func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup if time.Since(sinfo.time) > search_RETRY_TIME {
delete(sinfo.core.searches.searches, sinfo.dest) // Dead end and no response in too long, do cleanup
sinfo.callback(nil, errors.New("search reached dead end")) delete(sinfo.core.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
}
return return
} }
// Send to the next search target // Send to the next search target
@ -149,15 +145,12 @@ func (sinfo *searchInfo) doSearchStep() {
rq := dhtReqKey{next.key, sinfo.dest} rq := dhtReqKey{next.key, sinfo.dest}
sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.dht.ping(next, &sinfo.dest) sinfo.core.dht.ping(next, &sinfo.dest)
sinfo.time = time.Now()
} }
// If we've recenty sent a ping for this search, do nothing. // If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (sinfo *searchInfo) continueSearch() { func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME {
return
}
sinfo.time = time.Now()
sinfo.doSearchStep() sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later // In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
@ -209,6 +202,8 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
if sess == nil { if sess == nil {
// nil if the DHT search finished but the session wasn't allowed // nil if the DHT search finished but the session wasn't allowed
sinfo.callback(nil, errors.New("session not allowed")) sinfo.callback(nil, errors.New("session not allowed"))
// Cleanup
delete(sinfo.core.searches.searches, res.Dest)
return true return true
} }
_, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)