diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 40982cdd..ec530745 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -117,20 +117,21 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { errors := 0 - components := []chan chan error{ - c.router.searches.reconfigure, - c.router.dht.reconfigure, - c.router.sessions.reconfigure, + // Each reconfigure function should pass any errors to the channel, then close it + components := []func(chan error){ + c.link.reconfigure, c.peers.reconfigure, c.router.reconfigure, + c.router.dht.reconfigure, + c.router.searches.reconfigure, + c.router.sessions.reconfigure, c.switchTable.reconfigure, - c.link.reconfigure, } for _, component := range components { response := make(chan error) - component <- response - if err := <-response; err != nil { + go component(response) + for err := range response { c.log.Errorln(err) errors++ } diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index adfc40e7..4f380363 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -65,11 +65,10 @@ type dhtReqKey struct { // The main DHT struct. type dht struct { - router *router - reconfigure chan chan error - nodeID crypto.NodeID - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks + router *router + nodeID crypto.NodeID + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -78,18 +77,16 @@ type dht struct { // Initializes the DHT. func (t *dht) init(r *router) { t.router = r - t.reconfigure = make(chan chan error, 1) - go func() { - for { - e := <-t.reconfigure - e <- nil - } - }() t.nodeID = *t.router.core.NodeID() t.callbacks = make(map[dhtReqKey][]dht_callbackInfo) t.reset() } +func (t *dht) reconfigure(e chan error) { + defer close(e) + // This is where reconfiguration would go, if we had anything to do +} + // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index d4779ead..bfbcc99b 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -21,11 +21,10 @@ import ( ) type link struct { - core *Core - reconfigure chan chan error - mutex sync.RWMutex // protects interfaces below - interfaces map[linkInfo]*linkInterface - tcp tcp // TCP interface support + core *Core + mutex sync.RWMutex // protects interfaces below + interfaces map[linkInfo]*linkInterface + tcp tcp // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -61,7 +60,6 @@ func (l *link) init(c *Core) error { l.core = c l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) - l.reconfigure = make(chan chan error) l.mutex.Unlock() if err := l.tcp.init(l); err != nil { @@ -69,22 +67,18 @@ func (l *link) init(c *Core) error { return err } - go func() { - for { - e := <-l.reconfigure - tcpresponse := make(chan error) - l.tcp.reconfigure <- tcpresponse - if err := <-tcpresponse; err != nil { - e <- err - continue - } - e <- nil - } - }() - return nil } +func (l *link) reconfigure(e chan error) { + defer close(e) + tcpResponse := make(chan error) + go l.tcp.reconfigure(tcpResponse) + for err := range tcpResponse { + e <- err + } +} + func (l *link) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index aa31bb1b..989d9ee1 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -21,10 +21,9 @@ import ( // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { - core *Core - reconfigure chan chan error - mutex sync.Mutex // Synchronize writes to atomic - ports atomic.Value //map[switchPort]*peer, use CoW semantics + core *Core + mutex sync.Mutex // Synchronize writes to atomic + ports atomic.Value //map[switchPort]*peer, use CoW semantics } // Initializes the peers struct. @@ -33,13 +32,11 @@ func (ps *peers) init(c *Core) { defer ps.mutex.Unlock() ps.putPorts(make(map[switchPort]*peer)) ps.core = c - ps.reconfigure = make(chan chan error, 1) - go func() { - for { - e := <-ps.reconfigure - e <- nil - } - }() +} + +func (ps *peers) reconfigure(e chan error) { + defer close(e) + // This is where reconfiguration would go, if we had anything to do } // Returns true if an incoming peer connection to a key is allowed, either diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 7b6a9b80..002905bc 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -35,24 +35,22 @@ import ( ) // The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. -// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. +// The router's phony.Inbox goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { phony.Inbox - core *Core - reconfigure chan chan error - addr address.Address - subnet address.Subnet - out func([]byte) // packets we're sending to the network, link to peer's "in" - dht dht - nodeinfo nodeinfo - searches searches - sessions sessions + core *Core + addr address.Address + subnet address.Subnet + out func([]byte) // packets we're sending to the network, link to peer's "in" + dht dht + nodeinfo nodeinfo + searches searches + sessions sessions } // Initializes the router struct, which includes setting up channels to/from the adapter. func (r *router) init(core *Core) { r.core = core - r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) self := linkInterface{ @@ -75,10 +73,26 @@ func (r *router) init(core *Core) { r.sessions.init(r) } -// Starts the mainLoop goroutine. +func (r *router) reconfigure(e chan error) { + defer close(e) + var errs []error + // Reconfigure the router + <-r.SyncExec(func() { + current := r.core.config.GetCurrent() + err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) + if err != nil { + errs = append(errs, err) + } + }) + for _, err := range errs { + e <- err + } +} + +// Starts the tickerLoop goroutine. func (r *router) start() error { r.core.log.Infoln("Starting router") - go r._mainLoop() + go r.tickerLoop() return nil } @@ -108,24 +122,17 @@ func (r *router) reset(from phony.Actor) { // TODO remove reconfigure so this is just a ticker loop // and then find something better than a ticker loop to schedule things... -func (r *router) _mainLoop() { +func (r *router) tickerLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - select { - case <-ticker.C: - <-r.SyncExec(func() { - // Any periodic maintenance stuff goes here - r.core.switchTable.doMaintenance() - r.dht.doMaintenance() - r.sessions.cleanup() - }) - case e := <-r.reconfigure: - <-r.SyncExec(func() { - current := r.core.config.GetCurrent() - e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) - }) - } + <-ticker.C + <-r.SyncExec(func() { + // Any periodic maintenance stuff goes here + r.core.switchTable.doMaintenance() + r.dht.doMaintenance() + r.sessions.cleanup() + }) } } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 397c28a9..5fb36584 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -45,24 +45,21 @@ type searchInfo struct { // This stores a map of active searches. type searches struct { - router *router - reconfigure chan chan error - searches map[crypto.NodeID]*searchInfo + router *router + searches map[crypto.NodeID]*searchInfo } // Initializes the searches struct. func (s *searches) init(r *router) { s.router = r - s.reconfigure = make(chan chan error, 1) - go func() { - for { - e := <-s.reconfigure - e <- nil - } - }() s.searches = make(map[crypto.NodeID]*searchInfo) } +func (s *searches) reconfigure(e chan error) { + defer close(e) + // This is where reconfiguration would go, if we had anything to do +} + // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { info := searchInfo{ diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index f1263379..e448cf25 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -40,7 +40,6 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[0] } type sessionInfo struct { phony.Inbox // Protects all of the below, use it any time you read/change the contents of a session sessions *sessions // - reconfigure chan chan error // theirAddr address.Address // theirSubnet address.Subnet // theirPermPub crypto.BoxPubKey // @@ -74,6 +73,11 @@ type sessionInfo struct { callbacks []chan func() // Finished work from crypto workers } +func (sinfo *sessionInfo) reconfigure(e chan error) { + defer close(e) + // This is where reconfiguration would go, if we had anything to do +} + // TODO remove this, call SyncExec directly func (sinfo *sessionInfo) doFunc(f func()) { <-sinfo.SyncExec(f) @@ -140,7 +144,6 @@ type sessions struct { router *router listener *Listener listenerMutex sync.Mutex - reconfigure chan chan error lastCleanup time.Time isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed isAllowedMutex sync.RWMutex // Protects the above @@ -152,30 +155,28 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(r *router) { ss.router = r - ss.reconfigure = make(chan chan error, 1) - go func() { - for { - e := <-ss.reconfigure - responses := make(map[crypto.Handle]chan error) - for index, session := range ss.sinfos { - responses[index] = make(chan error) - session.reconfigure <- responses[index] - } - for _, response := range responses { - if err := <-response; err != nil { - e <- err - continue - } - } - e <- nil - } - }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) ss.lastCleanup = time.Now() } +func (ss *sessions) reconfigure(e chan error) { + defer close(e) + responses := make(map[crypto.Handle]chan error) + <-ss.router.SyncExec(func() { + for index, session := range ss.sinfos { + responses[index] = make(chan error) + go session.reconfigure(responses[index]) + } + }) + for _, response := range responses { + for err := range response { + e <- err + } + } +} + // Determines whether the session with a given publickey is allowed based on // session firewall rules. func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { @@ -215,7 +216,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { } sinfo := sessionInfo{} sinfo.sessions = ss - sinfo.reconfigure = make(chan chan error, 1) sinfo.theirPermPub = *theirPermKey sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub) pub, priv := crypto.NewBoxKeys() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index db63a845..cb5cf1eb 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -165,7 +165,6 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core - reconfigure chan chan error key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root @@ -186,7 +185,6 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024 func (t *switchTable) init(core *Core) { now := time.Now() t.core = core - t.reconfigure = make(chan chan error, 1) t.key = t.core.sigPub locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) @@ -201,6 +199,13 @@ func (t *switchTable) init(core *Core) { }) } +func (t *switchTable) reconfigure(e chan error) { + go func() { + defer close(e) + // This is where reconfiguration would go, if we had anything useful to do. + }() +} + // Safely gets a copy of this node's locator. func (t *switchTable) getLocator() switchLocator { t.mutex.RLock() @@ -566,12 +571,7 @@ func (t *switchTable) getTable() lookupTable { // Starts the switch worker func (t *switchTable) start() error { t.core.log.Infoln("Starting switch") - go func() { - // TODO find a better way to handle reconfiguration... and have the switch do something with the new configuration - for ch := range t.reconfigure { - ch <- nil - } - }() + // There's actually nothing to do to start it... return nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index dfb41510..ccb488f9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -33,12 +33,11 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { - link *link - reconfigure chan chan error - mutex sync.Mutex // Protecting the below - listeners map[string]*TcpListener - calls map[string]struct{} - conns map[linkInfo](chan struct{}) + link *link + mutex sync.Mutex // Protecting the below + listeners map[string]*TcpListener + calls map[string]struct{} + conns map[linkInfo](chan struct{}) } // TcpListener is a stoppable TCP listener interface. These are typically @@ -76,49 +75,12 @@ func (t *tcp) getAddr() *net.TCPAddr { // Initializes the struct. func (t *tcp) init(l *link) error { t.link = l - t.reconfigure = make(chan chan error, 1) t.mutex.Lock() t.calls = make(map[string]struct{}) t.conns = make(map[linkInfo](chan struct{})) t.listeners = make(map[string]*TcpListener) t.mutex.Unlock() - go func() { - for { - e := <-t.reconfigure - t.link.core.config.Mutex.RLock() - added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) - deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen) - t.link.core.config.Mutex.RUnlock() - if len(added) > 0 || len(deleted) > 0 { - for _, a := range added { - if a[:6] != "tcp://" { - continue - } - if _, err := t.listen(a[6:]); err != nil { - e <- err - continue - } - } - for _, d := range deleted { - if d[:6] != "tcp://" { - continue - } - t.mutex.Lock() - if listener, ok := t.listeners[d[6:]]; ok { - t.mutex.Unlock() - listener.Stop <- true - } else { - t.mutex.Unlock() - } - } - e <- nil - } else { - e <- nil - } - } - }() - t.link.core.config.Mutex.RLock() defer t.link.core.config.Mutex.RUnlock() for _, listenaddr := range t.link.core.config.Current.Listen { @@ -133,6 +95,36 @@ func (t *tcp) init(l *link) error { return nil } +func (t *tcp) reconfigure(e chan error) { + defer close(e) + t.link.core.config.Mutex.RLock() + added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) + deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen) + t.link.core.config.Mutex.RUnlock() + if len(added) > 0 || len(deleted) > 0 { + for _, a := range added { + if a[:6] != "tcp://" { + continue + } + if _, err := t.listen(a[6:]); err != nil { + e <- err + } + } + for _, d := range deleted { + if d[:6] != "tcp://" { + continue + } + t.mutex.Lock() + if listener, ok := t.listeners[d[6:]]; ok { + t.mutex.Unlock() + listener.Stop <- true + } else { + t.mutex.Unlock() + } + } + } +} + func (t *tcp) listen(listenaddr string) (*TcpListener, error) { var err error