From 29a0f8b572f5db41e86af8d657a578c62890922c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 25 Jun 2019 19:31:29 -0500 Subject: [PATCH 1/5] some minor refactoring to dht callbacks and searches, work in progress --- src/yggdrasil/api.go | 13 ++----- src/yggdrasil/conn.go | 2 +- src/yggdrasil/dht.go | 29 ++++++++++------ src/yggdrasil/search.go | 75 +++++++++++++++++++++-------------------- 4 files changed, 61 insertions(+), 58 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 25f9869c..b98df3bf 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -517,21 +517,14 @@ func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, err rq := dhtReqKey{info.key, target} sendPing := func() { c.dht.addCallback(&rq, func(res *dhtRes) { - defer func() { recover() }() - select { - case resCh <- res: - default: - } + resCh <- res }) c.dht.ping(&info, &target) } c.router.doAdmin(sendPing) - go func() { - time.Sleep(6 * time.Second) - close(resCh) - }() // TODO: do something better than the below... - for res := range resCh { + res := <-resCh + if res != nil { r := DHTRes{ Coords: append([]byte{}, res.Coords...), } diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 9ce5563d..7216be91 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -128,7 +128,7 @@ func (c *Conn) startSearch() { c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) } // Continue the search - c.core.searches.continueSearch(sinfo) + sinfo.continueSearch() } // Take a copy of the session object, in case it changes later c.mutex.RLock() diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b081c92d..b53e29c9 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -68,9 +68,9 @@ type dht struct { core *Core reconfigure chan chan error nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -88,7 +88,7 @@ func (t *dht) init(c *Core) { }() t.nodeID = *t.core.NodeID() t.peers = make(chan *dhtInfo, 1024) - t.callbacks = make(map[dhtReqKey]dht_callbackInfo) + t.callbacks = make(map[dhtReqKey][]dht_callbackInfo) t.reset() } @@ -244,15 +244,17 @@ type dht_callbackInfo struct { // Adds a callback and removes it after some timeout. func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} - t.callbacks[*rq] = info + t.callbacks[*rq] = append(t.callbacks[*rq], info) } // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { rq := dhtReqKey{res.Key, res.Dest} - if callback, isIn := t.callbacks[rq]; isIn { - callback.f(res) + if callbacks, isIn := t.callbacks[rq]; isIn { + for _, callback := range callbacks { + callback.f(res) + } delete(t.callbacks, rq) } _, isIn := t.reqs[rq] @@ -326,10 +328,15 @@ func (t *dht) doMaintenance() { } } t.reqs = newReqs - newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) - for key, callback := range t.callbacks { - if now.Before(callback.time) { - newCallbacks[key] = callback + newCallbacks := make(map[dhtReqKey][]dht_callbackInfo, len(t.callbacks)) + for key, cs := range t.callbacks { + for _, c := range cs { + if now.Before(c.time) { + newCallbacks[key] = append(newCallbacks[key], c) + } else { + // Signal failure + c.f(nil) + } } } t.callbacks = newCallbacks diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 0a643363..576034bf 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -33,6 +33,7 @@ const search_RETRY_TIME = time.Second // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. type searchInfo struct { + core *Core dest crypto.NodeID mask crypto.NodeID time time.Time @@ -40,6 +41,7 @@ type searchInfo struct { toVisit []*dhtInfo visited map[crypto.NodeID]bool callback func(*sessionInfo, error) + // TODO context.Context for timeout and cancellation } // This stores a map of active searches. @@ -49,7 +51,7 @@ type searches struct { searches map[crypto.NodeID]*searchInfo } -// Intializes the searches struct. +// Initializes the searches struct. func (s *searches) init(core *Core) { s.core = core s.reconfigure = make(chan chan error, 1) @@ -65,12 +67,13 @@ func (s *searches) init(core *Core) { // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { now := time.Now() - for dest, sinfo := range s.searches { - if now.Sub(sinfo.time) > time.Minute { - delete(s.searches, dest) - } - } + //for dest, sinfo := range s.searches { + // if now.Sub(sinfo.time) > time.Minute { + // delete(s.searches, dest) + // } + //} info := searchInfo{ + core: s.core, dest: *dest, mask: *mask, time: now.Add(-time.Second), @@ -82,30 +85,29 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba //////////////////////////////////////////////////////////////////////////////// -// Checks if there's an ongoing search relaed to a dhtRes. +// Checks if there's an ongoing search related to a dhtRes. // If there is, it adds the response info to the search and triggers a new search step. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. -func (s *searches) handleDHTRes(res *dhtRes) { - sinfo, isIn := s.searches[res.Dest] - if !isIn || s.checkDHTRes(sinfo, res) { +func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { + if res == nil || sinfo.checkDHTRes(res) { // Either we don't recognize this search, or we just finished it return } // Add to the search and continue - s.addToSearch(sinfo, res) - s.doSearchStep(sinfo) + sinfo.addToSearch(res) + sinfo.doSearchStep() } // Adds the information from a dhtRes to an ongoing search. // Info about a node that has already been visited is not re-added to the search. // Duplicate information about nodes toVisit is deduplicated (the newest information is kept). // The toVisit list is sorted in ascending order of keyspace distance from the destination. -func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { +func (sinfo *searchInfo) addToSearch(res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} sinfo.visited[*from.getNodeID()] = true for _, info := range res.Infos { - if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { + if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { @@ -135,10 +137,10 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // If there are no nodes left toVisit, then this cleans up the search. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. -func (s *searches) doSearchStep(sinfo *searchInfo) { +func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup - delete(s.searches, sinfo.dest) + delete(sinfo.core.searches.searches, sinfo.dest) go sinfo.callback(nil, errors.New("search reached dead end")) return } @@ -146,31 +148,32 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] rq := dhtReqKey{next.key, sinfo.dest} - s.core.dht.addCallback(&rq, s.handleDHTRes) - s.core.dht.ping(next, &sinfo.dest) + sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) + sinfo.core.dht.ping(next, &sinfo.dest) } // If we've recenty sent a ping for this search, do nothing. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. -func (s *searches) continueSearch(sinfo *searchInfo) { +func (sinfo *searchInfo) continueSearch() { if time.Since(sinfo.time) < search_RETRY_TIME { return } sinfo.time = time.Now() - s.doSearchStep(sinfo) + sinfo.doSearchStep() // In case the search dies, try to spawn another thread later // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later retryLater := func() { - newSearchInfo := s.searches[sinfo.dest] + // FIXME this keeps the search alive forever if not for the searches map, fix that + newSearchInfo := sinfo.core.searches.searches[sinfo.dest] if newSearchInfo != sinfo { return } - s.continueSearch(sinfo) + sinfo.continueSearch() } go func() { time.Sleep(search_RETRY_TIME) - s.core.router.admin <- retryLater + sinfo.core.router.admin <- retryLater }() } @@ -185,37 +188,37 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb // Checks if a dhtRes is good (called by handleDHTRes). // If the response is from the target, get/create a session, trigger a session ping, and return true. // Otherwise return false. -func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { +func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { them := crypto.GetNodeID(&res.Key) var destMasked crypto.NodeID var themMasked crypto.NodeID for idx := 0; idx < crypto.NodeIDLen; idx++ { - destMasked[idx] = info.dest[idx] & info.mask[idx] - themMasked[idx] = them[idx] & info.mask[idx] + destMasked[idx] = sinfo.dest[idx] & sinfo.mask[idx] + themMasked[idx] = them[idx] & sinfo.mask[idx] } if themMasked != destMasked { return false } // They match, so create a session and send a sessionRequest - sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key) + sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) if !isIn { - sinfo = s.core.sessions.createSession(&res.Key) - if sinfo == nil { + sess = sinfo.core.sessions.createSession(&res.Key) + if sess == nil { // nil if the DHT search finished but the session wasn't allowed - go info.callback(nil, errors.New("session not allowed")) + go sinfo.callback(nil, errors.New("session not allowed")) return true } - _, isIn := s.core.sessions.getByTheirPerm(&res.Key) + _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) if !isIn { panic("This should never happen") } } // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? - sinfo.coords = res.Coords - sinfo.packet = info.packet - s.core.sessions.ping(sinfo) - go info.callback(sinfo, nil) + sess.coords = res.Coords + sess.packet = sinfo.packet + sinfo.core.sessions.ping(sess) + go sinfo.callback(sess, nil) // Cleanup - delete(s.searches, res.Dest) + delete(sinfo.core.searches.searches, res.Dest) return true } From 5df110ac7985e87d1f11dc840cdb55dd6b69cbe4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 28 Jun 2019 18:42:31 -0500 Subject: [PATCH 2/5] make Dial block until the search finishes, and use it as such --- src/tuntap/iface.go | 55 ++++++++--- src/tuntap/tun.go | 2 + src/yggdrasil/conn.go | 199 ++++++++++++---------------------------- src/yggdrasil/dialer.go | 5 + src/yggdrasil/search.go | 6 +- 5 files changed, 111 insertions(+), 156 deletions(-) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index d70a1305..f6cfec9c 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -134,7 +134,7 @@ func (tun *TunAdapter) reader() error { } // Then offset the buffer so that we can now just treat it as an IP // packet from now on - bs = bs[offset:] + bs = bs[offset:] // FIXME this breaks bs for the next read and means n is the wrong value } // From the IP header, work out what our source and destination addresses // and node IDs are. We will need these in order to work out where to send @@ -225,21 +225,46 @@ func (tun *TunAdapter) reader() error { panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") } // Dial to the remote node - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - // We've been given a connection so prepare the session wrapper - if s, err := tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) - } else { - // Update our reference to the connection - session, isIn = s, true + go func() { + // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes + tun.mutex.Lock() + _, known := tun.dials[*dstNodeID] + packet := append(util.GetBytes(), bs[:n]...) + tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) + for len(tun.dials[*dstNodeID]) > 32 { + util.PutBytes(tun.dials[*dstNodeID][0]) + tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] } - } else { - // We weren't able to dial for some reason so there's no point in - // continuing this iteration - skip to the next one - continue - } + tun.mutex.Unlock() + if known { + return + } + var tc *tunConn + if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { + // We've been given a connection so prepare the session wrapper + if tc, err = tun.wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + } + } + tun.mutex.Lock() + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + tun.mutex.Unlock() + if tc != nil { + for _, packet := range packets { + select { + case tc.send <- packet: + default: + util.PutBytes(packet) + } + } + } + }() + // While the dial is going on we can't do much else + // continuing this iteration - skip to the next one + continue } // If we have a connection now, try writing to it if isIn && session != nil { diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 683b83ac..a21f8711 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -49,6 +49,7 @@ type TunAdapter struct { mutex sync.RWMutex // Protects the below addrToConn map[address.Address]*tunConn subnetToConn map[address.Subnet]*tunConn + dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes isOpen bool } @@ -113,6 +114,7 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener tun.dialer = dialer tun.addrToConn = make(map[address.Address]*tunConn) tun.subnetToConn = make(map[address.Subnet]*tunConn) + tun.dials = make(map[crypto.NodeID][][]byte) } // Start the setup process for the TUN/TAP adapter. If successful, starts the diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 7216be91..5c9a4136 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -45,23 +45,18 @@ type Conn struct { mutex sync.RWMutex closed bool session *sessionInfo - readDeadline atomic.Value // time.Time // TODO timer - writeDeadline atomic.Value // time.Time // TODO timer - searching atomic.Value // bool - searchwait chan struct{} // Never reset this, it's only used for the initial search - writebuf [][]byte // Packets to be sent if/when the search finishes + readDeadline atomic.Value // time.Time // TODO timer + writeDeadline atomic.Value // time.Time // TODO timer } // TODO func NewConn() that initializes additional fields as needed func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { conn := Conn{ - core: core, - nodeID: nodeID, - nodeMask: nodeMask, - session: session, - searchwait: make(chan struct{}), + core: core, + nodeID: nodeID, + nodeMask: nodeMask, + session: session, } - conn.searching.Store(false) return &conn } @@ -69,91 +64,38 @@ func (c *Conn) String() string { return fmt.Sprintf("conn=%p", c) } -// This method should only be called from the router goroutine -func (c *Conn) startSearch() { - // The searchCompleted callback is given to the search - searchCompleted := func(sinfo *sessionInfo, err error) { - defer c.searching.Store(false) - // If the search failed for some reason, e.g. it hit a dead end or timed - // out, then do nothing - if err != nil { - c.core.log.Debugln(c.String(), "DHT search failed:", err) - return - } - // Take the connection mutex - c.mutex.Lock() - defer c.mutex.Unlock() - // Were we successfully given a sessionInfo pointer? - if sinfo != nil { - // Store it, and update the nodeID and nodeMask (which may have been - // wildcarded before now) with their complete counterparts - c.core.log.Debugln(c.String(), "DHT search completed") - c.session = sinfo - c.nodeID = crypto.GetNodeID(&sinfo.theirPermPub) - for i := range c.nodeMask { - c.nodeMask[i] = 0xFF +// This should only be called from the router goroutine +func (c *Conn) search() error { + sinfo, isIn := c.core.searches.searches[*c.nodeID] + if !isIn { + done := make(chan struct{}, 1) + var sess *sessionInfo + var err error + searchCompleted := func(sinfo *sessionInfo, e error) { + sess = sinfo + err = e + // FIXME close can be called multiple times, do a non-blocking send instead + select { + case done <- struct{}{}: + default: } - // Make sure that any blocks on read/write operations are lifted - defer func() { recover() }() // So duplicate searches don't panic - close(c.searchwait) - } else { - // No session was returned - this shouldn't really happen because we - // should always return an error reason if we don't return a session - panic("DHT search didn't return an error or a sessionInfo") } - if c.closed { - // Things were closed before the search returned - // Go ahead and close it again to make sure the session is cleaned up - go c.Close() - } else { - // Send any messages we may have buffered - var msgs [][]byte - msgs, c.writebuf = c.writebuf, nil - go func() { - for _, msg := range msgs { - c.Write(msg) - util.PutBytes(msg) - } - }() - } - } - // doSearch will be called below in response to one or more conditions - doSearch := func() { - c.searching.Store(true) - // Check to see if there is a search already matching the destination - sinfo, isIn := c.core.searches.searches[*c.nodeID] - if !isIn { - // Nothing was found, so create a new search - sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) - } - // Continue the search + sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo.continueSearch() - } - // Take a copy of the session object, in case it changes later - c.mutex.RLock() - sinfo := c.session - c.mutex.RUnlock() - if c.session == nil { - // No session object is present so previous searches, if we ran any, have - // not yielded a useful result (dead end, remote host not found) - doSearch() - } else { - sinfo.worker <- func() { - switch { - case !sinfo.init: - doSearch() - case time.Since(sinfo.time) > 6*time.Second: - if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { - // TODO double check that the above condition is correct - doSearch() - } else { - c.core.sessions.ping(sinfo) - } - default: // Don't do anything, to keep traffic throttled - } + <-done + c.session = sess + if c.session == nil && err == nil { + panic("search failed but returend no error") } + c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) + for i := range c.nodeMask { + c.nodeMask[i] = 0xFF + } + return err + } else { + return errors.New("search already exists") } + return nil } func getDeadlineTimer(value *atomic.Value) *time.Timer { @@ -167,30 +109,9 @@ func getDeadlineTimer(value *atomic.Value) *time.Timer { func (c *Conn) Read(b []byte) (int, error) { // Take a copy of the session object - c.mutex.RLock() sinfo := c.session - c.mutex.RUnlock() timer := getDeadlineTimer(&c.readDeadline) defer util.TimerStop(timer) - // If there is a search in progress then wait for the result - if sinfo == nil { - // Wait for the search to complete - select { - case <-c.searchwait: - case <-timer.C: - return 0, ConnError{errors.New("timeout"), true, false, 0} - } - // Retrieve our session info again - c.mutex.RLock() - sinfo = c.session - c.mutex.RUnlock() - // If sinfo is still nil at this point then the search failed and the - // searchwait channel has been recreated, so might as well give up and - // return an error code - if sinfo == nil { - return 0, errors.New("search failed") - } - } for { // Wait for some traffic to come through from the session select { @@ -253,32 +174,7 @@ func (c *Conn) Read(b []byte) (int, error) { } func (c *Conn) Write(b []byte) (bytesWritten int, err error) { - c.mutex.RLock() sinfo := c.session - c.mutex.RUnlock() - // If the session doesn't exist, or isn't initialised (which probably means - // that the search didn't complete successfully) then we may need to wait for - // the search to complete or start the search again - if sinfo == nil || !sinfo.init { - // Is a search already taking place? - if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) { - // No search was already taking place so start a new one - c.core.router.doAdmin(c.startSearch) - } - // Buffer the packet to be sent if/when the search is finished - c.mutex.Lock() - defer c.mutex.Unlock() - c.writebuf = append(c.writebuf, append(util.GetBytes(), b...)) - for len(c.writebuf) > 32 { - util.PutBytes(c.writebuf[0]) - c.writebuf = c.writebuf[1:] - } - return len(b), nil - } else { - // This triggers some session keepalive traffic - // FIXME this desparately needs to be refactored, since the ping case needlessly goes through the router goroutine just to have it pass a function to the session worker when it determines that a session already exists. - c.core.router.doAdmin(c.startSearch) - } var packet []byte done := make(chan struct{}) written := len(b) @@ -301,6 +197,34 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } packet = p.encode() sinfo.bytesSent += uint64(len(b)) + // The rest of this work is session keep-alive traffic + doSearch := func() { + routerWork := func() { + // Check to see if there is a search already matching the destination + sinfo, isIn := c.core.searches.searches[*c.nodeID] + if !isIn { + // Nothing was found, so create a new search + searchCompleted := func(sinfo *sessionInfo, e error) {} + sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) + } + // Continue the search + sinfo.continueSearch() + } + go func() { c.core.router.admin <- routerWork }() + } + switch { + case !sinfo.init: + doSearch() + case time.Since(sinfo.time) > 6*time.Second: + if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { + // TODO double check that the above condition is correct + doSearch() + } else { + sinfo.core.sessions.ping(sinfo) + } + default: // Don't do anything, to keep traffic throttled + } } // Set up a timer so this doesn't block forever timer := getDeadlineTimer(&c.writeDeadline) @@ -327,7 +251,6 @@ func (c *Conn) Close() error { if c.session != nil { // Close the session, if it hasn't been closed already c.session.close() - c.session = nil } // This can't fail yet - TODO? c.closed = true diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 1943c859..1e3e0d6e 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -14,6 +14,8 @@ type Dialer struct { core *Core } +// TODO DialContext that allows timeouts/cancellation, Dial should just call this with no timeout set in the context + // Dial opens a session to the given node. The first paramter should be "nodeid" // and the second parameter should contain a hexadecimal representation of the // target node ID. @@ -58,5 +60,8 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) { // NodeID parameters. func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { conn := newConn(d.core, nodeID, nodeMask, nil) + if err := conn.search(); err != nil { + return nil, err + } return conn, nil } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 576034bf..b43f0e46 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -141,7 +141,7 @@ func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup delete(sinfo.core.searches.searches, sinfo.dest) - go sinfo.callback(nil, errors.New("search reached dead end")) + sinfo.callback(nil, errors.New("search reached dead end")) return } // Send to the next search target @@ -205,7 +205,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { sess = sinfo.core.sessions.createSession(&res.Key) if sess == nil { // nil if the DHT search finished but the session wasn't allowed - go sinfo.callback(nil, errors.New("session not allowed")) + sinfo.callback(nil, errors.New("session not allowed")) return true } _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) @@ -217,7 +217,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { sess.coords = res.Coords sess.packet = sinfo.packet sinfo.core.sessions.ping(sess) - go sinfo.callback(sess, nil) + sinfo.callback(sess, nil) // Cleanup delete(sinfo.core.searches.searches, res.Dest) return true From c808be514ffea600ec8cf3ef02b48dd1612e142a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 28 Jun 2019 19:11:28 -0500 Subject: [PATCH 3/5] make tunAdapter.wrap return the right thing --- src/tuntap/tun.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index a21f8711..ed5d2d45 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -237,6 +237,7 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { stop: make(chan struct{}), alive: make(chan struct{}, 1), } + c = &s // Get the remote address and subnet of the other side remoteNodeID := conn.RemoteAddr() s.addr = *address.AddrForNodeID(&remoteNodeID) From e7cb76cea3cad1ac735c712b1b32b2a7b1d34b53 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 28 Jun 2019 19:21:44 -0500 Subject: [PATCH 4/5] clean up unused old session maps --- src/yggdrasil/session.go | 60 +--------------------------------------- 1 file changed, 1 insertion(+), 59 deletions(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 22118476..55b0ed43 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -118,12 +118,8 @@ type sessions struct { isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed isAllowedMutex sync.RWMutex // Protects the above permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot - sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info - conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections - byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle + sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle - addrToPerm map[address.Address]*crypto.BoxPubKey - subnetToPerm map[address.Subnet]*crypto.BoxPubKey } // Initializes the session struct. @@ -149,10 +145,7 @@ func (ss *sessions) init(core *Core) { }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) - ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) - ss.addrToPerm = make(map[address.Address]*crypto.BoxPubKey) - ss.subnetToPerm = make(map[address.Subnet]*crypto.BoxPubKey) ss.lastCleanup = time.Now() } @@ -175,16 +168,6 @@ func (ss *sessions) getSessionForHandle(handle *crypto.Handle) (*sessionInfo, bo return sinfo, isIn } -// Gets a session corresponding to an ephemeral session key used by this node. -func (ss *sessions) getByMySes(key *crypto.BoxPubKey) (*sessionInfo, bool) { - h, isIn := ss.byMySes[*key] - if !isIn { - return nil, false - } - sinfo, isIn := ss.getSessionForHandle(h) - return sinfo, isIn -} - // Gets a session corresponding to a permanent key used by the remote node. func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) { h, isIn := ss.byTheirPerm[*key] @@ -195,26 +178,6 @@ func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) { return sinfo, isIn } -// Gets a session corresponding to an IPv6 address used by the remote node. -func (ss *sessions) getByTheirAddr(addr *address.Address) (*sessionInfo, bool) { - p, isIn := ss.addrToPerm[*addr] - if !isIn { - return nil, false - } - sinfo, isIn := ss.getByTheirPerm(p) - return sinfo, isIn -} - -// Gets a session corresponding to an IPv6 /64 subnet used by the remote node/network. -func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool) { - p, isIn := ss.subnetToPerm[*snet] - if !isIn { - return nil, false - } - sinfo, isIn := ss.getByTheirPerm(p) - return sinfo, isIn -} - // Creates a new session and lazily cleans up old existing sessions. This // includse initializing session info to sane defaults (e.g. lowest supported // MTU). @@ -263,10 +226,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.worker = make(chan func(), 1) sinfo.recv = make(chan *wire_trafficPacket, 32) ss.sinfos[sinfo.myHandle] = &sinfo - ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle - ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub - ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub go sinfo.workerMain() return &sinfo } @@ -291,36 +251,18 @@ func (ss *sessions) cleanup() { sinfos[k] = v } ss.sinfos = sinfos - byMySes := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byMySes)) - for k, v := range ss.byMySes { - byMySes[k] = v - } - ss.byMySes = byMySes byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm)) for k, v := range ss.byTheirPerm { byTheirPerm[k] = v } ss.byTheirPerm = byTheirPerm - addrToPerm := make(map[address.Address]*crypto.BoxPubKey, len(ss.addrToPerm)) - for k, v := range ss.addrToPerm { - addrToPerm[k] = v - } - ss.addrToPerm = addrToPerm - subnetToPerm := make(map[address.Subnet]*crypto.BoxPubKey, len(ss.subnetToPerm)) - for k, v := range ss.subnetToPerm { - subnetToPerm[k] = v - } - ss.subnetToPerm = subnetToPerm ss.lastCleanup = time.Now() } // Closes a session, removing it from sessions maps and killing the worker goroutine. func (sinfo *sessionInfo) close() { delete(sinfo.core.sessions.sinfos, sinfo.myHandle) - delete(sinfo.core.sessions.byMySes, sinfo.mySesPub) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) - delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr) - delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet) close(sinfo.worker) } From e88bef35c0be4b60577fb344f2c27fc6b54e4196 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 28 Jun 2019 20:02:58 -0500 Subject: [PATCH 5/5] get rid of old buffered session packets --- src/yggdrasil/conn.go | 2 +- src/yggdrasil/search.go | 2 -- src/yggdrasil/session.go | 14 ++------------ 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 5c9a4136..a4036c78 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -215,7 +215,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } switch { case !sinfo.init: - doSearch() + sinfo.core.sessions.ping(sinfo) case time.Since(sinfo.time) > 6*time.Second: if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { // TODO double check that the above condition is correct diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index b43f0e46..d8c9049a 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -37,7 +37,6 @@ type searchInfo struct { dest crypto.NodeID mask crypto.NodeID time time.Time - packet []byte toVisit []*dhtInfo visited map[crypto.NodeID]bool callback func(*sessionInfo, error) @@ -215,7 +214,6 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { } // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? sess.coords = res.Coords - sess.packet = sinfo.packet sinfo.core.sessions.ping(sess) sinfo.callback(sess, nil) // Cleanup diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 55b0ed43..dc3f01e8 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -39,7 +39,6 @@ type sessionInfo struct { pingTime time.Time // time the first ping was sent since the last received packet pingSend time.Time // time the last ping was sent coords []byte // coords of destination - packet []byte // a buffered packet, sent immediately on ping/pong init bool // Reset if coords change tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation bytesSent uint64 // Bytes of real traffic sent in this session @@ -325,8 +324,8 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { } packet := p.encode() ss.core.router.out(packet) - if !isPong { - sinfo.pingSend = time.Now() + if sinfo.pingTime.Before(sinfo.time) { + sinfo.pingTime = time.Now() } } @@ -367,15 +366,6 @@ func (ss *sessions) handlePing(ping *sessionPing) { if !ping.IsPong { ss.sendPingPong(sinfo, true) } - if sinfo.packet != nil { - /* FIXME this needs to live in the net.Conn or something, needs work in Write - // send - var bs []byte - bs, sinfo.packet = sinfo.packet, nil - ss.core.router.sendPacket(bs) // FIXME this needs to live in the net.Conn or something, needs work in Write - */ - sinfo.packet = nil - } }) }