some minor refactoring to dht callbacks and searches, work in progress

This commit is contained in:
Arceliar 2019-06-25 19:31:29 -05:00
parent 2fd3ac6837
commit 29a0f8b572
4 changed files with 61 additions and 58 deletions

View File

@ -517,21 +517,14 @@ func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, err
rq := dhtReqKey{info.key, target}
sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }()
select {
case resCh <- res:
default:
}
resCh <- res
})
c.dht.ping(&info, &target)
}
c.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
// TODO: do something better than the below...
for res := range resCh {
res := <-resCh
if res != nil {
r := DHTRes{
Coords: append([]byte{}, res.Coords...),
}

View File

@ -128,7 +128,7 @@ func (c *Conn) startSearch() {
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
c.core.searches.continueSearch(sinfo)
sinfo.continueSearch()
}
// Take a copy of the session object, in case it changes later
c.mutex.RLock()

View File

@ -68,9 +68,9 @@ type dht struct {
core *Core
reconfigure chan chan error
nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar...
table map[crypto.NodeID]*dhtInfo
imp []*dhtInfo
@ -88,7 +88,7 @@ func (t *dht) init(c *Core) {
}()
t.nodeID = *t.core.NodeID()
t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey]dht_callbackInfo)
t.callbacks = make(map[dhtReqKey][]dht_callbackInfo)
t.reset()
}
@ -244,15 +244,17 @@ type dht_callbackInfo struct {
// Adds a callback and removes it after some timeout.
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)}
t.callbacks[*rq] = info
t.callbacks[*rq] = append(t.callbacks[*rq], info)
}
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) {
rq := dhtReqKey{res.Key, res.Dest}
if callback, isIn := t.callbacks[rq]; isIn {
callback.f(res)
if callbacks, isIn := t.callbacks[rq]; isIn {
for _, callback := range callbacks {
callback.f(res)
}
delete(t.callbacks, rq)
}
_, isIn := t.reqs[rq]
@ -326,10 +328,15 @@ func (t *dht) doMaintenance() {
}
}
t.reqs = newReqs
newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks))
for key, callback := range t.callbacks {
if now.Before(callback.time) {
newCallbacks[key] = callback
newCallbacks := make(map[dhtReqKey][]dht_callbackInfo, len(t.callbacks))
for key, cs := range t.callbacks {
for _, c := range cs {
if now.Before(c.time) {
newCallbacks[key] = append(newCallbacks[key], c)
} else {
// Signal failure
c.f(nil)
}
}
}
t.callbacks = newCallbacks

View File

@ -33,6 +33,7 @@ const search_RETRY_TIME = time.Second
// Information about an ongoing search.
// Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited.
type searchInfo struct {
core *Core
dest crypto.NodeID
mask crypto.NodeID
time time.Time
@ -40,6 +41,7 @@ type searchInfo struct {
toVisit []*dhtInfo
visited map[crypto.NodeID]bool
callback func(*sessionInfo, error)
// TODO context.Context for timeout and cancellation
}
// This stores a map of active searches.
@ -49,7 +51,7 @@ type searches struct {
searches map[crypto.NodeID]*searchInfo
}
// Intializes the searches struct.
// Initializes the searches struct.
func (s *searches) init(core *Core) {
s.core = core
s.reconfigure = make(chan chan error, 1)
@ -65,12 +67,13 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now()
for dest, sinfo := range s.searches {
if now.Sub(sinfo.time) > time.Minute {
delete(s.searches, dest)
}
}
//for dest, sinfo := range s.searches {
// if now.Sub(sinfo.time) > time.Minute {
// delete(s.searches, dest)
// }
//}
info := searchInfo{
core: s.core,
dest: *dest,
mask: *mask,
time: now.Add(-time.Second),
@ -82,30 +85,29 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba
////////////////////////////////////////////////////////////////////////////////
// Checks if there's an ongoing search relaed to a dhtRes.
// Checks if there's an ongoing search related to a dhtRes.
// If there is, it adds the response info to the search and triggers a new search step.
// If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more.
func (s *searches) handleDHTRes(res *dhtRes) {
sinfo, isIn := s.searches[res.Dest]
if !isIn || s.checkDHTRes(sinfo, res) {
func (sinfo *searchInfo) handleDHTRes(res *dhtRes) {
if res == nil || sinfo.checkDHTRes(res) {
// Either we don't recognize this search, or we just finished it
return
}
// Add to the search and continue
s.addToSearch(sinfo, res)
s.doSearchStep(sinfo)
sinfo.addToSearch(res)
sinfo.doSearchStep()
}
// Adds the information from a dhtRes to an ongoing search.
// Info about a node that has already been visited is not re-added to the search.
// Duplicate information about nodes toVisit is deduplicated (the newest information is kept).
// The toVisit list is sorted in ascending order of keyspace distance from the destination.
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Add responses to toVisit if closer to dest than the res node
from := dhtInfo{key: res.Key, coords: res.Coords}
sinfo.visited[*from.getNodeID()] = true
for _, info := range res.Infos {
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue
}
if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) {
@ -135,10 +137,10 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
// If there are no nodes left toVisit, then this cleans up the search.
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (s *searches) doSearchStep(sinfo *searchInfo) {
func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup
delete(s.searches, sinfo.dest)
delete(sinfo.core.searches.searches, sinfo.dest)
go sinfo.callback(nil, errors.New("search reached dead end"))
return
}
@ -146,31 +148,32 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
rq := dhtReqKey{next.key, sinfo.dest}
s.core.dht.addCallback(&rq, s.handleDHTRes)
s.core.dht.ping(next, &sinfo.dest)
sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.dht.ping(next, &sinfo.dest)
}
// If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (s *searches) continueSearch(sinfo *searchInfo) {
func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME {
return
}
sinfo.time = time.Now()
s.doSearchStep(sinfo)
sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later
retryLater := func() {
newSearchInfo := s.searches[sinfo.dest]
// FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.core.searches.searches[sinfo.dest]
if newSearchInfo != sinfo {
return
}
s.continueSearch(sinfo)
sinfo.continueSearch()
}
go func() {
time.Sleep(search_RETRY_TIME)
s.core.router.admin <- retryLater
sinfo.core.router.admin <- retryLater
}()
}
@ -185,37 +188,37 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb
// Checks if a dhtRes is good (called by handleDHTRes).
// If the response is from the target, get/create a session, trigger a session ping, and return true.
// Otherwise return false.
func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool {
func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
them := crypto.GetNodeID(&res.Key)
var destMasked crypto.NodeID
var themMasked crypto.NodeID
for idx := 0; idx < crypto.NodeIDLen; idx++ {
destMasked[idx] = info.dest[idx] & info.mask[idx]
themMasked[idx] = them[idx] & info.mask[idx]
destMasked[idx] = sinfo.dest[idx] & sinfo.mask[idx]
themMasked[idx] = them[idx] & sinfo.mask[idx]
}
if themMasked != destMasked {
return false
}
// They match, so create a session and send a sessionRequest
sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key)
sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn {
sinfo = s.core.sessions.createSession(&res.Key)
if sinfo == nil {
sess = sinfo.core.sessions.createSession(&res.Key)
if sess == nil {
// nil if the DHT search finished but the session wasn't allowed
go info.callback(nil, errors.New("session not allowed"))
go sinfo.callback(nil, errors.New("session not allowed"))
return true
}
_, isIn := s.core.sessions.getByTheirPerm(&res.Key)
_, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn {
panic("This should never happen")
}
}
// FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)?
sinfo.coords = res.Coords
sinfo.packet = info.packet
s.core.sessions.ping(sinfo)
go info.callback(sinfo, nil)
sess.coords = res.Coords
sess.packet = sinfo.packet
sinfo.core.sessions.ping(sess)
go sinfo.callback(sess, nil)
// Cleanup
delete(s.searches, res.Dest)
delete(sinfo.core.searches.searches, res.Dest)
return true
}