diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 0357ccad..53d2551e 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -89,11 +89,13 @@ func (c *Conn) search() error { <-done c.session = sess if c.session == nil && err == nil { - panic("search failed but returend no error") + panic("search failed but returned no error") } - c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) - for i := range c.nodeMask { - c.nodeMask[i] = 0xFF + if c.session != nil { + c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) + for i := range c.nodeMask { + c.nodeMask[i] = 0xFF + } } return err } else { @@ -218,8 +220,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { go func() { c.core.router.admin <- routerWork }() } switch { - case !sinfo.init: - sinfo.core.sessions.ping(sinfo) case time.Since(sinfo.time) > 6*time.Second: if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { // TODO double check that the above condition is correct @@ -227,6 +227,8 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } else { sinfo.core.sessions.ping(sinfo) } + case sinfo.reset && sinfo.pingTime.Before(sinfo.time): + sinfo.core.sessions.ping(sinfo) default: // Don't do anything, to keep traffic throttled } } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 1e3e0d6e..6b24cfb4 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -5,6 +5,7 @@ import ( "errors" "strconv" "strings" + "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" ) @@ -61,7 +62,16 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) { func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { conn := newConn(d.core, nodeID, nodeMask, nil) if err := conn.search(); err != nil { + conn.Close() return nil, err } - return conn, nil + t := time.NewTimer(6 * time.Second) // TODO use a context instead + defer t.Stop() + select { + case <-conn.session.init: + return conn, nil + case <-t.C: + conn.Close() + return nil, errors.New("session handshake timeout") + } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 2e32fb6b..514d14fa 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -119,7 +119,7 @@ func (r *router) mainLoop() { case info := <-r.core.dht.peers: r.core.dht.insertPeer(info) case <-r.reset: - r.core.sessions.resetInits() + r.core.sessions.reset() r.core.dht.reset() case <-ticker.C: { diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index dc3f01e8..98a12c7b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -39,12 +39,13 @@ type sessionInfo struct { pingTime time.Time // time the first ping was sent since the last received packet pingSend time.Time // time the last ping was sent coords []byte // coords of destination - init bool // Reset if coords change + reset bool // reset if coords change tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation bytesSent uint64 // Bytes of real traffic sent in this session bytesRecvd uint64 // Bytes of real traffic received in this session worker chan func() // Channel to send work to the session worker recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn + init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use } func (sinfo *sessionInfo) doWorker(f func()) { @@ -101,7 +102,14 @@ func (s *sessionInfo) update(p *sessionPing) bool { } s.time = time.Now() s.tstamp = p.Tstamp - s.init = true + s.reset = false + defer func() { recover() }() // Recover if the below panics + select { + case <-s.init: + default: + // Unblock anything waiting for the session to initialize + close(s.init) + } return true } @@ -203,6 +211,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.mtuTime = now sinfo.pingTime = now sinfo.pingSend = now + sinfo.init = make(chan struct{}) higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { @@ -410,10 +419,10 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { // Resets all sessions to an uninitialized state. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. -func (ss *sessions) resetInits() { +func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { sinfo.doWorker(func() { - sinfo.init = false + sinfo.reset = true }) } }