diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 1a05bd83..2452a3d6 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -57,7 +57,6 @@ type Conn struct { core *Core readDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer - cancel util.Cancellation mutex sync.RWMutex // protects the below nodeID *crypto.NodeID nodeMask *crypto.NodeID @@ -71,7 +70,6 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session nodeID: nodeID, nodeMask: nodeMask, session: session, - cancel: util.NewCancellation(), } return &conn } @@ -142,10 +140,10 @@ func (c *Conn) doSearch() { func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation { if deadline, ok := value.Load().(time.Time); ok { // A deadline is set, so return a Cancellation that uses it - return util.CancellationWithDeadline(c.cancel, deadline) + return util.CancellationWithDeadline(c.session.cancel, deadline) } else { // No cancellation was set, so return a child cancellation with no timeout - return util.CancellationChild(c.cancel) + return util.CancellationChild(c.session.cancel) } } @@ -241,10 +239,9 @@ func (c *Conn) Close() (err error) { defer c.mutex.Unlock() if c.session != nil { // Close the session, if it hasn't been closed already - c.core.router.doAdmin(c.session.close) - } - if e := c.cancel.Cancel(errors.New("connection closed")); e != nil { - err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { + err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + } } return } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 6ce2e8ac..db5d5a4d 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -69,7 +69,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er defer t.Stop() select { case <-conn.session.init: - conn.session.startWorkers(conn.cancel) + conn.session.startWorkers() return conn, nil case <-t.C: conn.Close() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 161ae60a..a11f6ae5 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -161,8 +161,7 @@ func (r *router) handleTraffic(packet []byte) { return } sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle) - if !isIn || sinfo.cancel == nil { - // FIXME make sure sinfo.cancel can never be nil + if !isIn { util.PutBytes(p.Payload) return } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index c39f60de..f9c38faa 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -208,6 +208,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.pingTime = now sinfo.pingSend = now sinfo.init = make(chan struct{}) + sinfo.cancel = util.NewCancellation() higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { @@ -232,6 +233,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.send = make(chan []byte, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle + go func() { + // Run cleanup when the session is canceled + <-sinfo.cancel.Finished() + sinfo.core.router.doAdmin(sinfo.close) + }() return &sinfo } @@ -362,7 +368,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } - conn.session.startWorkers(conn.cancel) + conn.session.startWorkers() ss.listener.conn <- conn } ss.listenerMutex.Unlock() @@ -431,8 +437,7 @@ func (ss *sessions) reset() { //////////////////////////// Worker Functions Below //////////////////////////// //////////////////////////////////////////////////////////////////////////////// -func (sinfo *sessionInfo) startWorkers(cancel util.Cancellation) { - sinfo.cancel = cancel +func (sinfo *sessionInfo) startWorkers() { go sinfo.recvWorker() go sinfo.sendWorker() }