yggdrasil-go/src/yggdrasil/dht.go

305 lines
8.6 KiB
Go
Raw Normal View History

2017-12-29 07:16:20 +03:00
package yggdrasil
2018-06-13 01:50:08 +03:00
import (
"fmt"
2018-06-13 01:50:08 +03:00
"sort"
"time"
)
2017-12-29 07:16:20 +03:00
const dht_lookup_size = 16
// dhtInfo represents everything we know about a node in the DHT.
// This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance.
2017-12-29 07:16:20 +03:00
type dhtInfo struct {
2018-01-05 01:37:51 +03:00
nodeID_hidden *NodeID
key boxPubKey
coords []byte
send time.Time // When we last sent a message
recv time.Time // When we last received a message
//pings int // Decide when to drop
//throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute
//bootstrapSend time.Time // The time checked/updated as part of throttle checks
2017-12-29 07:16:20 +03:00
}
// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times.
2017-12-29 07:16:20 +03:00
func (info *dhtInfo) getNodeID() *NodeID {
2018-01-05 01:37:51 +03:00
if info.nodeID_hidden == nil {
info.nodeID_hidden = getNodeID(&info.key)
}
return info.nodeID_hidden
2017-12-29 07:16:20 +03:00
}
// Request for a node to do a lookup.
// Includes our key and coords so they can send a response back, and the destination NodeID we want to ask about.
2017-12-29 07:16:20 +03:00
type dhtReq struct {
Key boxPubKey // Key of whoever asked
Coords []byte // Coords of whoever asked
Dest NodeID // NodeID they're asking about
2017-12-29 07:16:20 +03:00
}
// Response to a DHT lookup.
// Includes the key and coords of the node that's responding, and the destination they were asked about.
// The main part is Infos []*dhtInfo, the lookup response.
2017-12-29 07:16:20 +03:00
type dhtRes struct {
Key boxPubKey // key of the sender
Coords []byte // coords of the sender
Dest NodeID
Infos []*dhtInfo // response
2017-12-29 07:16:20 +03:00
}
// The main DHT struct.
2017-12-29 07:16:20 +03:00
type dht struct {
core *Core
nodeID NodeID
table map[NodeID]*dhtInfo
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time
//rumorMill []dht_rumor
2017-12-29 07:16:20 +03:00
}
func (t *dht) init(c *Core) {
// TODO
2018-01-05 01:37:51 +03:00
t.core = c
t.nodeID = *t.core.GetNodeID()
2018-06-07 18:58:24 +03:00
t.peers = make(chan *dhtInfo, 1024)
t.reset()
}
func (t *dht) reset() {
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
t.table = make(map[NodeID]*dhtInfo)
}
func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo {
var results []*dhtInfo
var successor *dhtInfo
sTarget := t.nodeID.next()
for infoID, info := range t.table {
if allowWorse || dht_ordered(&t.nodeID, &infoID, nodeID) {
results = append(results, info)
} else {
if successor == nil || dht_ordered(&sTarget, &infoID, successor.getNodeID()) {
successor = info
}
}
}
sort.SliceStable(results, func(i, j int) bool {
return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID)
})
if successor != nil {
results = append([]*dhtInfo{successor}, results...)
}
if len(results) > dht_lookup_size {
results = results[:dht_lookup_size]
}
return results
}
// Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now
func (t *dht) insert(info *dhtInfo) {
if *info.getNodeID() == t.nodeID {
// This shouldn't happen, but don't crash or add it in case it does
return
panic("FIXME")
}
info.recv = time.Now()
if oldInfo, isIn := t.table[*info.getNodeID()]; isIn {
info.send = oldInfo.send
} else {
info.send = info.recv
}
t.table[*info.getNodeID()] = info
}
// Return true if first/second/third are (partially) ordered correctly
// FIXME? maybe total ordering makes more sense
func dht_ordered(first, second, third *NodeID) bool {
var ordered bool
for idx := 0; idx < NodeIDLen; idx++ {
f, s, t := first[idx], second[idx], third[idx]
switch {
case f == s && s == t:
continue
case f <= s && s <= t:
ordered = true // nothing wrapped around 0
case t <= f && f <= s:
ordered = true // 0 is between second and third
case s <= t && t <= f:
ordered = true // 0 is between first and second
}
break
}
return ordered
2017-12-29 07:16:20 +03:00
}
// Reads a request, performs a lookup, and responds.
// Update info about the node that sent the request.
2017-12-29 07:16:20 +03:00
func (t *dht) handleReq(req *dhtReq) {
2018-01-05 01:37:51 +03:00
// Send them what they asked for
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{
Key: t.core.boxPub,
Coords: coords,
Dest: req.Dest,
Infos: t.lookup(&req.Dest, false),
2018-01-05 01:37:51 +03:00
}
t.sendRes(&res, req)
// Also add them to our DHT
2018-01-05 01:37:51 +03:00
info := dhtInfo{
key: req.Key,
coords: req.Coords,
2018-01-05 01:37:51 +03:00
}
// For bootstrapping to work, we need to add these nodes to the table
t.insert(&info)
}
// Sends a lookup response to the specified node.
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
// Send a reply for a dhtReq
bs := res.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: req.Coords,
ToKey: req.Key,
FromKey: t.core.boxPub,
Nonce: *nonce,
Payload: payload,
}
packet := p.encode()
t.core.router.out(packet)
}
// Returns nodeID + 1
func (nodeID NodeID) next() NodeID {
for idx := len(nodeID) - 1; idx >= 0; idx-- {
nodeID[idx] += 1
if nodeID[idx] != 0 {
break
}
}
return nodeID
}
// Returns nodeID - 1
func (nodeID NodeID) prev() NodeID {
for idx := len(nodeID) - 1; idx >= 0; idx-- {
nodeID[idx] -= 1
if nodeID[idx] != 0xff {
break
}
}
return nodeID
2017-12-29 07:16:20 +03:00
}
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
2017-12-29 07:16:20 +03:00
func (t *dht) handleRes(res *dhtRes) {
2018-06-02 07:34:21 +03:00
t.core.searches.handleDHTRes(res)
reqs, isIn := t.reqs[res.Key]
2018-01-05 01:37:51 +03:00
if !isIn {
return
}
_, isIn = reqs[res.Dest]
2018-01-05 01:37:51 +03:00
if !isIn {
return
}
delete(reqs, res.Dest)
2018-01-05 01:37:51 +03:00
rinfo := dhtInfo{
key: res.Key,
coords: res.Coords,
}
t.insert(&rinfo) // Or at the end, after checking successor/predecessor?
var successor *dhtInfo
var predecessor *dhtInfo
for infoID, info := range t.table {
// Get current successor and predecessor
if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = info
}
if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) {
predecessor = info
2018-01-05 01:37:51 +03:00
}
}
for _, info := range res.Infos {
if *info.getNodeID() == t.nodeID {
continue
} // Skip self
if _, isIn := t.table[*info.getNodeID()]; isIn {
// TODO? don't skip if coords are different?
continue
}
// Send a request to all better successors or predecessors
// We could try sending to only the best, but then packet loss matters more
if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) {
t.ping(info, &t.nodeID)
fmt.Println("pinging new successor", t.nodeID[:4], info.getNodeID()[:4], successor)
}
if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) {
t.ping(info, &t.nodeID)
fmt.Println("pinging new predecessor", t.nodeID[:4], info.getNodeID()[:4], predecessor)
2018-01-05 01:37:51 +03:00
}
}
// TODO add everyting else to a rumor mill for later use? (when/how?)
2017-12-29 07:16:20 +03:00
}
// Sends a lookup request to the specified node.
2017-12-29 07:16:20 +03:00
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
2018-01-05 01:37:51 +03:00
// Send a dhtReq to the node in dhtInfo
bs := req.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
2018-06-02 23:21:05 +03:00
Coords: dest.coords,
ToKey: dest.key,
FromKey: t.core.boxPub,
Nonce: *nonce,
Payload: payload,
2018-01-05 01:37:51 +03:00
}
packet := p.encode()
t.core.router.out(packet)
reqsToDest, isIn := t.reqs[dest.key]
if !isIn {
t.reqs[dest.key] = make(map[NodeID]time.Time)
reqsToDest, isIn = t.reqs[dest.key]
if !isIn {
panic("This should never happen")
}
}
reqsToDest[req.Dest] = time.Now()
2017-12-29 07:16:20 +03:00
}
func (t *dht) ping(info *dhtInfo, target *NodeID) {
// Creates a req for the node at dhtInfo, asking them about the target (if one is given) or themself (if no target is given)
2018-01-05 01:37:51 +03:00
if target == nil {
target = info.getNodeID()
2018-01-05 01:37:51 +03:00
}
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{
Key: t.core.boxPub,
Coords: coords,
Dest: *target,
2018-01-05 01:37:51 +03:00
}
info.send = time.Now()
t.sendReq(&req, info)
2017-12-29 07:16:20 +03:00
}
func (t *dht) doMaintenance() {
// Ping successor, asking for their predecessor, and clean up old/expired info
var successor *dhtInfo
now := time.Now()
for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute {
delete(t.table, infoID)
} else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = info
2018-01-05 01:37:51 +03:00
}
}
if successor != nil &&
now.Sub(successor.send) > 6*time.Second {
t.ping(successor, nil)
}
}