diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index cb28a6f2..9af5d240 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -102,11 +102,12 @@ func (c *Conn) search() error { if sinfo != nil { // Need to clean up to avoid a session leak sinfo.cancel.Cancel(nil) + sinfo.sessions.removeSession(sinfo) } default: if sinfo != nil { // Finish initializing the session - sinfo.conn = c + sinfo.setConn(nil, c) } c.session = sinfo err = e diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 0b3947e2..2f5e0af3 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -416,8 +416,11 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Only call this from the router actor. func (ss *sessions) reset() { - for _, sinfo := range ss.sinfos { - sinfo.reset = true + for _, _sinfo := range ss.sinfos { + sinfo := _sinfo // So we can safely put it in a closure + sinfo.Act(ss.router, func() { + sinfo.reset = true + }) } } @@ -474,12 +477,6 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { util.PutBytes(bs) return } - if sinfo.conn == nil { - // There's no connection associated with this session for some reason - // TODO: Figure out why this happens sometimes, it shouldn't - util.PutBytes(bs) - return - } sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a8c23623..ffda6b76 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -207,11 +207,12 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { } // Checks if we already are calling this address -func (t *tcp) isAlreadyCalling(saddr string) bool { +func (t *tcp) startCalling(saddr string) bool { t.mutex.Lock() defer t.mutex.Unlock() _, isIn := t.calls[saddr] - return isIn + t.calls[saddr] = struct{}{} + return !isIn } // Checks if a connection already exists. @@ -225,12 +226,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if t.isAlreadyCalling(callname) { + if !t.startCalling(callname) { return } - t.mutex.Lock() - t.calls[callname] = struct{}{} - t.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios time.Sleep(default_timeout)