From 625b97c511c155b0bfc0088638125953056e9ce5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Jan 2018 12:55:45 -0600 Subject: [PATCH] add and use a thread-safe way of reading router internal state, and include active sessions in the admin query --- src/yggdrasil/address.go | 2 +- src/yggdrasil/admin.go | 53 +++++++++++++++++++++++++++------------- src/yggdrasil/peer.go | 2 +- src/yggdrasil/router.go | 16 ++++++++++++ yggdrasil.go | 2 +- 5 files changed, 55 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/address.go b/src/yggdrasil/address.go index 4718a5d6..2a492c7a 100644 --- a/src/yggdrasil/address.go +++ b/src/yggdrasil/address.go @@ -96,7 +96,7 @@ func (a *address) getNodeIDandMask() (*NodeID, *NodeID) { } func (s *subnet) getNodeIDandMask() (*NodeID, *NodeID) { - // As witht he address version, but visible parts of the subnet prefix instead + // As with the address version, but visible parts of the subnet prefix instead var nid NodeID var mask NodeID ones := int(s[len(address_prefix)] & 0x7f) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 5576a365..eec1d71e 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -66,31 +66,50 @@ func (a *admin) handleRequest(conn net.Conn) { peerID := address_addrForNodeID(getNodeID(&peerentry.box)) addr := net.IP(peerID[:]).String() var index [mDepth]switchPort - copy(index[:mDepth], tableentry.locator.coords[:]) + copy(index[:], tableentry.locator.coords) m[index] = addr } } } - // Look up everything we know from DHT - for i := 0; i < a.core.dht.nBuckets(); i++ { - b := a.core.dht.getBucket(i) - for _, v := range b.infos { - var destPorts []switchPort - for offset := 0; ; { - coord, length := wire_decode_uint64(v.coords[offset:]) - if length == 0 { - break - } - destPorts = append(destPorts, switchPort(coord)) - offset += length + getPorts := func(coords []byte) []switchPort { + var ports []switchPort + for offset := 0; ; { + coord, length := wire_decode_uint64(coords[offset:]) + if length == 0 { + break + } + ports = append(ports, switchPort(coord)) + offset += length + } + return ports + } + + // Look up everything we know from DHT + getDHT := func() { + for i := 0; i < a.core.dht.nBuckets(); i++ { + b := a.core.dht.getBucket(i) + for _, v := range b.infos { + destPorts := getPorts(v.coords) + addr := net.IP(address_addrForNodeID(v.nodeID_hidden)[:]).String() + var index [mDepth]switchPort + copy(index[:], destPorts) + m[index] = addr } - addr := net.IP(address_addrForNodeID(v.nodeID_hidden)[:]).String() - var index [mDepth]switchPort - copy(index[:mDepth], destPorts[:]) - m[index] = addr } } + a.core.router.doAdmin(getDHT) + + // Look up everything we know from active sessions + getSessions := func() { + for _, sinfo := range a.core.sessions.sinfos { + destPorts := getPorts(sinfo.coords) + var index [mDepth]switchPort + copy(index[:], destPorts) + m[index] = net.IP(sinfo.theirAddr[:]).String() + } + } + a.core.router.doAdmin(getSessions) // Now print it all out conn.Write([]byte(fmt.Sprintf("graph {\n"))) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 2b757bbe..4c24e233 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -37,7 +37,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { } type peer struct { - // Rolling approximation of bandwidth, in bps, used by switch, updated by tcp + // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends // use get/update methods only! (atomic accessors as float64) bandwidth uint64 // BUG: sync/atomic, 32 bit platforms need the above to be the first element diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 668181de..b96815fe 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -35,6 +35,7 @@ type router struct { recv chan<- []byte // place where the tun pulls received packets from send <-chan []byte // place where the tun puts outgoing packets reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff } func (r *router) init(core *Core) { @@ -57,6 +58,7 @@ func (r *router) init(core *Core) { r.core.tun.recv = recv r.core.tun.send = send r.reset = make(chan struct{}, 1) + r.admin = make(chan func()) go r.mainLoop() } @@ -79,6 +81,8 @@ func (r *router) mainLoop() { r.core.dht.doMaintenance() util_getBytes() // To slowly drain things } + case f := <-r.admin: + f() } } } @@ -302,3 +306,15 @@ func (r *router) handleSearchRes(bs []byte) { } r.core.searches.handleSearchRes(&res) } + +func (r *router) doAdmin(f func()) { + // Pass this a function that needs to be run by the router's main goroutine + // It will pass the function to the router and wait for the router to finish + done := make(chan struct{}) + newF := func() { + f() + close(done) + } + r.admin <- newF + <-done +} diff --git a/yggdrasil.go b/yggdrasil.go index cde55c55..2b7e85b7 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -96,7 +96,7 @@ func generateConfig() *nodeConfig { spub, spriv := core.DEBUG_newSigKeys() cfg := nodeConfig{} cfg.Listen = "[::]:0" - cfg.AdminListen = "[::1]:9001" + cfg.AdminListen = "localhost:9001" cfg.BoxPub = hex.EncodeToString(bpub[:]) cfg.BoxPriv = hex.EncodeToString(bpriv[:]) cfg.SigPub = hex.EncodeToString(spub[:])