diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ad4b1fac..3cd344d5 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -216,8 +216,8 @@ func (intf *linkInterface) handler() error { case signalReady <- struct{}{}: default: } - intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) + //intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local) } } }() @@ -237,9 +237,9 @@ func (intf *linkInterface) handler() error { recvTimer := time.NewTimer(recvTime) defer util.TimerStop(recvTimer) for { - intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, - isAlive, isReady, sendTimerRunning, recvTimerRunning) + //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local, + // isAlive, isReady, sendTimerRunning, recvTimerRunning) select { case gotMsg, ok := <-signalAlive: if !ok { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 99e69828..d84b935b 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -68,17 +68,34 @@ func (r *router) init(core *Core) { r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) - p.out = func(packet []byte) { - // This is to make very sure it never blocks - select { - case in <- packet: - return - default: - util.PutBytes(packet) - } - } + p.out = func(packet []byte) { in <- packet } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block + out := make(chan []byte, 32) + go func() { + for packet := range out { + p.handlePacket(packet) + } + }() + out2 := make(chan []byte, 32) + go func() { + // This worker makes sure r.out never blocks + // It will buffer traffic long enough for the switch worker to take it + // If (somehow) you can send faster than the switch can receive, then this would use unbounded memory + // But crypto slows sends enough that the switch should always be able to take the packets... + var buf [][]byte + for { + buf = append(buf, <-out2) + for len(buf) > 0 { + select { + case bs := <-out2: + buf = append(buf, bs) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + r.out = func(packet []byte) { out2 <- packet } r.toRecv = make(chan router_recvPacket, 32) recv := make(chan []byte, 32) send := make(chan []byte, 32) @@ -306,6 +323,8 @@ func (r *router) sendPacket(bs []byte) { return } + sinfo.doSend(bs) + return sinfo.send <- bs } } @@ -385,6 +404,8 @@ func (r *router) handleTraffic(packet []byte) { if !isIn { return } + sinfo.doRecv(&p) + return sinfo.recv <- &p } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index cdabaf2b..bffc149b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -304,7 +304,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) - go sinfo.doWorker() + //go sinfo.doWorker() ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -625,6 +625,8 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { sinfo.updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) + sinfo.core.router.recvPacket(bs, sinfo) + return select { case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}: default: // avoid deadlocks, maybe do this somewhere else?... diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 9b8c8398..d45b8855 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -668,10 +668,12 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool //nothing case coordLen < bestCoordLen: update = true - case coordLen > bestCoordLen: - //nothing - case port < best.port: - update = true + /* + case coordLen > bestCoordLen: + //nothing + case port < best.port: + update = true + */ default: //nothing } @@ -800,7 +802,7 @@ func (t *switchTable) doWorker() { t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { - t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) + //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) select { case bytes := <-t.packetIn: // Try to send it somewhere (or drop it if it's corrupt or at a dead end)