diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index fa05fff0..7121fc15 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -61,6 +61,7 @@ type Conn struct { nodeID *crypto.NodeID nodeMask *crypto.NodeID session *sessionInfo + mtu uint16 } // TODO func NewConn() that initializes additional fields as needed @@ -80,6 +81,10 @@ func (c *Conn) String() string { return s } +func (c *Conn) setMTU(from phony.IActor, mtu uint16) { + c.EnqueueFrom(from, func() { c.mtu = mtu }) +} + // This should never be called from the router goroutine, used in the dial functions func (c *Conn) search() error { var sinfo *searchInfo @@ -112,6 +117,7 @@ func (c *Conn) search() error { for i := range c.nodeMask { c.nodeMask[i] = 0xFF } + c.session.conn = c } return err } else { @@ -120,7 +126,7 @@ func (c *Conn) search() error { return nil } -// Used in session keep-alive traffic in Conn.Write +// Used in session keep-alive traffic func (c *Conn) doSearch() { routerWork := func() { // Check to see if there is a search already matching the destination @@ -134,7 +140,7 @@ func (c *Conn) doSearch() { sinfo.continueSearch() } } - go c.core.router.doAdmin(routerWork) + c.core.router.EnqueueFrom(c.session, routerWork) } func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { @@ -187,16 +193,14 @@ func (c *Conn) Read(b []byte) (int, error) { return n, err } -// Used internally by Write, the caller must not reuse the argument bytes when no error occurs -func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { - var err error - sessionFunc := func() { - // Does the packet exceed the permitted size for the session? - if uint16(len(msg.Message)) > c.session._getMTU() { - err = ConnError{errors.New("packet too big"), true, false, false, int(c.session._getMTU())} - return - } - // The rest of this work is session keep-alive traffic +func (c *Conn) _write(msg FlowKeyMessage) error { + if len(msg.Message) > int(c.mtu) { + return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)} + } + c.session.EnqueueFrom(c, func() { + // Send the packet + c.session._send(msg) + // Session keep-alive, while we wait for the crypto workers from send switch { case time.Since(c.session.time) > 6*time.Second: if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second { @@ -209,24 +213,25 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { c.session.ping(c.session) // TODO send from self if this becomes an actor default: // Don't do anything, to keep traffic throttled } - } - c.session.doFunc(sessionFunc) - if err == nil { - var cancel util.Cancellation - var doCancel bool - <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) - if doCancel { - defer cancel.Cancel(nil) - } - select { - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - err = ConnError{errors.New("write timeout"), true, false, false, 0} - } else { - err = ConnError{errors.New("session closed"), false, false, true, 0} - } - case <-c.session.SyncExec(func() { c.session._send(msg) }): + }) + return nil +} + +// Used internally by Write, the caller must not reuse the argument bytes when no error occurs +func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { + var cancel util.Cancellation + var doCancel bool + <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) + var err error + select { + case <-cancel.Finished(): + if cancel.Error() == util.CancellationTimeoutError { + err = ConnError{errors.New("write timeout"), true, false, false, 0} + } else { + err = ConnError{errors.New("session closed"), false, false, true, 0} } + default: + <-c.SyncExec(func() { err = c._write(msg) }) } return err } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 6b24cfb4..04410855 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -65,6 +65,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er conn.Close() return nil, err } + conn.session.setConn(nil, conn) t := time.NewTimer(6 * time.Second) // TODO use a context instead defer t.Stop() select { diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 3f5e913e..854eb24d 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -71,6 +71,7 @@ type sessionInfo struct { init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use cancel util.Cancellation // Used to terminate workers toConn chan []byte // Decrypted packets go here, picked up by the associated Conn + conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers } @@ -112,6 +113,9 @@ func (s *sessionInfo) _update(p *sessionPing) bool { } if p.MTU >= 1280 || p.MTU == 0 { s.theirMTU = p.MTU + if s.conn != nil { + s.conn.setMTU(s, s._getMTU()) + } } if !bytes.Equal(s.coords, p.Coords) { // allocate enough space for additional coords @@ -368,6 +372,13 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { } } +func (sinfo *sessionInfo) setConn(from phony.IActor, conn *Conn) { + sinfo.EnqueueFrom(from, func() { + sinfo.conn = conn + sinfo.conn.setMTU(sinfo, sinfo._getMTU()) + }) +} + // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // If the session has a packet cached (common when first setting up a session), it will be sent. func (ss *sessions) handlePing(ping *sessionPing) { @@ -390,6 +401,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } + sinfo.setConn(ss.router, conn) c := ss.listener.conn go func() { c <- conn }() }