more conn migration

This commit is contained in:
Arceliar 2019-08-24 00:17:37 -05:00
parent 6ecbc439f0
commit da9f7151e3
3 changed files with 47 additions and 29 deletions

View File

@ -61,6 +61,7 @@ type Conn struct {
nodeID *crypto.NodeID nodeID *crypto.NodeID
nodeMask *crypto.NodeID nodeMask *crypto.NodeID
session *sessionInfo session *sessionInfo
mtu uint16
} }
// TODO func NewConn() that initializes additional fields as needed // TODO func NewConn() that initializes additional fields as needed
@ -80,6 +81,10 @@ func (c *Conn) String() string {
return s return s
} }
func (c *Conn) setMTU(from phony.IActor, mtu uint16) {
c.EnqueueFrom(from, func() { c.mtu = mtu })
}
// This should never be called from the router goroutine, used in the dial functions // This should never be called from the router goroutine, used in the dial functions
func (c *Conn) search() error { func (c *Conn) search() error {
var sinfo *searchInfo var sinfo *searchInfo
@ -112,6 +117,7 @@ func (c *Conn) search() error {
for i := range c.nodeMask { for i := range c.nodeMask {
c.nodeMask[i] = 0xFF c.nodeMask[i] = 0xFF
} }
c.session.conn = c
} }
return err return err
} else { } else {
@ -120,7 +126,7 @@ func (c *Conn) search() error {
return nil return nil
} }
// Used in session keep-alive traffic in Conn.Write // Used in session keep-alive traffic
func (c *Conn) doSearch() { func (c *Conn) doSearch() {
routerWork := func() { routerWork := func() {
// Check to see if there is a search already matching the destination // Check to see if there is a search already matching the destination
@ -134,7 +140,7 @@ func (c *Conn) doSearch() {
sinfo.continueSearch() sinfo.continueSearch()
} }
} }
go c.core.router.doAdmin(routerWork) c.core.router.EnqueueFrom(c.session, routerWork)
} }
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
@ -187,16 +193,14 @@ func (c *Conn) Read(b []byte) (int, error) {
return n, err return n, err
} }
// Used internally by Write, the caller must not reuse the argument bytes when no error occurs func (c *Conn) _write(msg FlowKeyMessage) error {
func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { if len(msg.Message) > int(c.mtu) {
var err error return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)}
sessionFunc := func() {
// Does the packet exceed the permitted size for the session?
if uint16(len(msg.Message)) > c.session._getMTU() {
err = ConnError{errors.New("packet too big"), true, false, false, int(c.session._getMTU())}
return
} }
// The rest of this work is session keep-alive traffic c.session.EnqueueFrom(c, func() {
// Send the packet
c.session._send(msg)
// Session keep-alive, while we wait for the crypto workers from send
switch { switch {
case time.Since(c.session.time) > 6*time.Second: case time.Since(c.session.time) > 6*time.Second:
if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second { if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
@ -209,15 +213,16 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
c.session.ping(c.session) // TODO send from self if this becomes an actor c.session.ping(c.session) // TODO send from self if this becomes an actor
default: // Don't do anything, to keep traffic throttled default: // Don't do anything, to keep traffic throttled
} }
})
return nil
} }
c.session.doFunc(sessionFunc)
if err == nil { // Used internally by Write, the caller must not reuse the argument bytes when no error occurs
func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
var cancel util.Cancellation var cancel util.Cancellation
var doCancel bool var doCancel bool
<-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) })
if doCancel { var err error
defer cancel.Cancel(nil)
}
select { select {
case <-cancel.Finished(): case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError { if cancel.Error() == util.CancellationTimeoutError {
@ -225,8 +230,8 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
} else { } else {
err = ConnError{errors.New("session closed"), false, false, true, 0} err = ConnError{errors.New("session closed"), false, false, true, 0}
} }
case <-c.session.SyncExec(func() { c.session._send(msg) }): default:
} <-c.SyncExec(func() { err = c._write(msg) })
} }
return err return err
} }

View File

@ -65,6 +65,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er
conn.Close() conn.Close()
return nil, err return nil, err
} }
conn.session.setConn(nil, conn)
t := time.NewTimer(6 * time.Second) // TODO use a context instead t := time.NewTimer(6 * time.Second) // TODO use a context instead
defer t.Stop() defer t.Stop()
select { select {

View File

@ -71,6 +71,7 @@ type sessionInfo struct {
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers cancel util.Cancellation // Used to terminate workers
toConn chan []byte // Decrypted packets go here, picked up by the associated Conn toConn chan []byte // Decrypted packets go here, picked up by the associated Conn
conn *Conn // The associated Conn object
callbacks []chan func() // Finished work from crypto workers callbacks []chan func() // Finished work from crypto workers
} }
@ -112,6 +113,9 @@ func (s *sessionInfo) _update(p *sessionPing) bool {
} }
if p.MTU >= 1280 || p.MTU == 0 { if p.MTU >= 1280 || p.MTU == 0 {
s.theirMTU = p.MTU s.theirMTU = p.MTU
if s.conn != nil {
s.conn.setMTU(s, s._getMTU())
}
} }
if !bytes.Equal(s.coords, p.Coords) { if !bytes.Equal(s.coords, p.Coords) {
// allocate enough space for additional coords // allocate enough space for additional coords
@ -368,6 +372,13 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
} }
} }
func (sinfo *sessionInfo) setConn(from phony.IActor, conn *Conn) {
sinfo.EnqueueFrom(from, func() {
sinfo.conn = conn
sinfo.conn.setMTU(sinfo, sinfo._getMTU())
})
}
// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful.
// If the session has a packet cached (common when first setting up a session), it will be sent. // If the session has a packet cached (common when first setting up a session), it will be sent.
func (ss *sessions) handlePing(ping *sessionPing) { func (ss *sessions) handlePing(ping *sessionPing) {
@ -390,6 +401,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
for i := range conn.nodeMask { for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF conn.nodeMask[i] = 0xFF
} }
sinfo.setConn(ss.router, conn)
c := ss.listener.conn c := ss.listener.conn
go func() { c <- conn }() go func() { c <- conn }()
} }