diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d84b935b..6acd4735 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -66,7 +66,7 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan []byte, 32) // TODO something better than this... + in := make(chan []byte, 1) // TODO something better than this... p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { in <- packet } r.in = in @@ -322,9 +322,6 @@ func (r *router) sendPacket(bs []byte) { // Don't continue - drop the packet return } - - sinfo.doSend(bs) - return sinfo.send <- bs } } @@ -404,8 +401,6 @@ func (r *router) handleTraffic(packet []byte) { if !isIn { return } - sinfo.doRecv(&p) - return sinfo.recv <- &p } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index bffc149b..012af579 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -304,7 +304,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) - //go sinfo.doWorker() + go sinfo.doWorker() ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -525,17 +525,36 @@ func (ss *sessions) resetInits() { // It handles calling the relatively expensive crypto operations. // It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. func (sinfo *sessionInfo) doWorker() { + send := make(chan []byte, 32) + defer close(send) + go func() { + for bs := range send { + sinfo.doSend(bs) + } + }() + recv := make(chan *wire_trafficPacket, 32) + defer close(recv) + go func() { + for p := range recv { + sinfo.doRecv(p) + } + }() for { select { case p, ok := <-sinfo.recv: if ok { - sinfo.doRecv(p) + select { + case recv <- p: + default: + // We need something to not block, and it's best to drop it before we decrypt + util.PutBytes(p.Payload) + } } else { return } case bs, ok := <-sinfo.send: if ok { - sinfo.doSend(bs) + send <- bs } else { return } @@ -625,10 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { sinfo.updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) - sinfo.core.router.recvPacket(bs, sinfo) - return - select { - case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}: - default: // avoid deadlocks, maybe do this somewhere else?... - } + sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo} }