diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b5df107e..464a4778 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -161,12 +161,6 @@ func (r *router) _handleTraffic(packet []byte) { return } sinfo.recv(r, &p) - return - select { - case sinfo.fromRouter <- p: - case <-sinfo.cancel.Finished(): - util.PutBytes(p.Payload) - } } // Handles protocol traffic by decrypting it, checking its type, and passing it to the appropriate handler for that traffic type. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 84ea92af..d17b119f 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -7,7 +7,6 @@ package yggdrasil import ( "bytes" "container/heap" - "errors" "sync" "time" @@ -71,9 +70,7 @@ type sessionInfo struct { bytesRecvd uint64 // Bytes of real traffic received in this session init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use cancel util.Cancellation // Used to terminate workers - fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session toConn chan []byte // Decrypted packets go here, picked up by the associated Conn - fromConn chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent callbacks []chan func() // Finished work from crypto workers } @@ -253,9 +250,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.fromRouter = make(chan wire_trafficPacket, 1) sinfo.toConn = make(chan []byte, 32) - sinfo.fromConn = make(chan FlowKeyMessage, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle go func() { @@ -479,207 +474,11 @@ func (ss *sessions) reset() { //////////////////////////// Worker Functions Below //////////////////////////// //////////////////////////////////////////////////////////////////////////////// -func (sinfo *sessionInfo) startWorkers() { - go sinfo.recvWorker() - go sinfo.sendWorker() -} - type FlowKeyMessage struct { FlowKey uint64 Message []byte } -func (sinfo *sessionInfo) recvWorker() { - // TODO move theirNonce etc into a struct that gets stored here, passed in over a channel - // Since there's no reason for anywhere else in the session code to need to *read* it... - // Only needs to be updated from the outside if a ping resets it... - // That would get rid of the need to take a mutex for the sessionFunc - var callbacks []chan func() - doRecv := func(p wire_trafficPacket) { - var bs []byte - var err error - var k crypto.BoxSharedKey - sessionFunc := func() { - if !sinfo._nonceIsOK(&p.Nonce) { - err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} - return - } - k = sinfo.sharedSesKey - } - sinfo.doFunc(sessionFunc) - if err != nil { - util.PutBytes(p.Payload) - return - } - var isOK bool - ch := make(chan func(), 1) - poolFunc := func() { - bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) - callback := func() { - util.PutBytes(p.Payload) - if !isOK { - util.PutBytes(bs) - return - } - sessionFunc = func() { - if k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { - // The session updated in the mean time, so return an error - err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0} - return - } - sinfo._updateNonce(&p.Nonce) - sinfo.time = time.Now() - sinfo.bytesRecvd += uint64(len(bs)) - } - sinfo.doFunc(sessionFunc) - if err != nil { - // Not sure what else to do with this packet, I guess just drop it - util.PutBytes(bs) - } else { - // Pass the packet to the buffer for Conn.Read - select { - case <-sinfo.cancel.Finished(): - util.PutBytes(bs) - case sinfo.toConn <- bs: - } - } - } - ch <- callback - } - // Send to the worker and wait for it to finish - util.WorkerGo(poolFunc) - callbacks = append(callbacks, ch) - } - fromHelper := make(chan wire_trafficPacket, 1) - go func() { - var buf []wire_trafficPacket - for { - for len(buf) > 0 { - select { - case <-sinfo.cancel.Finished(): - return - case p := <-sinfo.fromRouter: - buf = append(buf, p) - for len(buf) > 64 { // Based on nonce window size - util.PutBytes(buf[0].Payload) - buf = buf[1:] - } - case fromHelper <- buf[0]: - buf = buf[1:] - } - } - select { - case <-sinfo.cancel.Finished(): - return - case p := <-sinfo.fromRouter: - buf = append(buf, p) - } - } - }() - select { - case <-sinfo.cancel.Finished(): - return - case <-sinfo.init: - // Wait until the session has finished initializing before processing any packets - } - for { - for len(callbacks) > 0 { - select { - case f := <-callbacks[0]: - callbacks = callbacks[1:] - f() - case <-sinfo.cancel.Finished(): - return - case p := <-fromHelper: - doRecv(p) - } - } - select { - case <-sinfo.cancel.Finished(): - return - case p := <-fromHelper: - doRecv(p) - } - } -} - -func (sinfo *sessionInfo) sendWorker() { - // TODO move info that this worker needs here, send updates via a channel - // Otherwise we need to take a mutex to avoid races with update() - var callbacks []chan func() - doSend := func(msg FlowKeyMessage) { - var p wire_trafficPacket - var k crypto.BoxSharedKey - sessionFunc := func() { - sinfo.bytesSent += uint64(len(msg.Message)) - p = wire_trafficPacket{ - Coords: append([]byte(nil), sinfo.coords...), - Handle: sinfo.theirHandle, - Nonce: sinfo.myNonce, - } - if msg.FlowKey != 0 { - // Helps ensure that traffic from this flow ends up in a separate queue from other flows - // The zero padding relies on the fact that the self-peer is always on port 0 - p.Coords = append(p.Coords, 0) - p.Coords = wire_put_uint64(msg.FlowKey, p.Coords) - } - sinfo.myNonce.Increment() - k = sinfo.sharedSesKey - } - // Get the mutex-protected info needed to encrypt the packet - sinfo.doFunc(sessionFunc) - ch := make(chan func(), 1) - poolFunc := func() { - // Encrypt the packet - p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) - // The callback will send the packet - callback := func() { - // Encoding may block on a util.GetBytes(), so kept out of the worker pool - packet := p.encode() - // Cleanup - util.PutBytes(msg.Message) - util.PutBytes(p.Payload) - // Send the packet - // TODO replace this with a send to the peer struct if that becomes an actor - sinfo.sessions.router.EnqueueFrom(sinfo, func() { - sinfo.sessions.router.out(packet) - }) - } - ch <- callback - } - // Send to the worker and wait for it to finish - util.WorkerGo(poolFunc) - callbacks = append(callbacks, ch) - } - select { - case <-sinfo.cancel.Finished(): - return - case <-sinfo.init: - // Wait until the session has finished initializing before processing any packets - } - for { - for len(callbacks) > 0 { - select { - case f := <-callbacks[0]: - callbacks = callbacks[1:] - f() - case <-sinfo.cancel.Finished(): - return - case msg := <-sinfo.fromConn: - doSend(msg) - } - } - select { - case <-sinfo.cancel.Finished(): - return - case bs := <-sinfo.fromConn: - doSend(bs) - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) { sinfo.EnqueueFrom(from, func() { sinfo._recvPacket(packet)