diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index cd42560c..599ee90c 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -453,16 +453,16 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.linkLoop() - plinkIn := make(chan []byte, 1) - qlinkIn := make(chan []byte, 1) + p.linkIn, q.linkIn = make(chan []byte, 32), make(chan []byte, 32) + p.linkOut, q.linkOut = q.linkIn, p.linkIn p.out = func(bs []byte) { - go q.handlePacket(bs, qlinkIn) + go q.handlePacket(bs) } q.out = func(bs []byte) { - go p.handlePacket(bs, plinkIn) + go p.handlePacket(bs) } - go p.linkLoop(plinkIn) - go q.linkLoop(qlinkIn) + go p.linkLoop() + go q.linkLoop() } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index fa1a2789..0113470e 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -111,6 +111,9 @@ type peer struct { close func() // To allow the peer to call close if idle for too long lastAnc time.Time + // used for protocol traffic (to bypass queues) + linkIn (chan []byte) // handlePacket sends, linkLoop recvs + linkOut (chan []byte) } const peer_Throttle = 1 @@ -123,8 +126,7 @@ func (p *peer) updateQueueSize(delta int64) { atomic.AddInt64(&p.queueSize, delta) } -func (ps *peers) newPeer(box *boxPubKey, - sig *sigPubKey) *peer { +func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -170,14 +172,14 @@ func (ps *peers) removePeer(port switchPort) { } } -func (p *peer) linkLoop(in <-chan []byte) { +func (p *peer) linkLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() var counter uint8 var lastRSeq uint64 for { select { - case packet, ok := <-in: + case packet, ok := <-p.linkIn: if !ok { return } @@ -214,7 +216,7 @@ func (p *peer) linkLoop(in <-chan []byte) { } } -func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { +func (p *peer) handlePacket(packet []byte) { // TODO See comment in sendPacket about atomics technically being done wrong atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) pType, pTypeLen := wire_decode_uint64(packet) @@ -227,12 +229,7 @@ func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen) case wire_LinkProtocolTraffic: - { - select { - case linkIn <- packet: - default: - } - } + p.linkIn <- packet default: /*panic(pType) ;*/ return } @@ -284,7 +281,7 @@ func (p *peer) sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - p.sendPacket(packet) + p.linkOut <- packet } func (p *peer) handleLinkTraffic(bs []byte) { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b848a792..a8797d5f 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -55,7 +55,7 @@ func (r *router) init(core *Core) { } } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet, nil) } // The caller is responsible for go-ing if it needs to not block + r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block recv := make(chan []byte, 32) send := make(chan []byte, 32) r.recv = recv diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index acde0344..e21522b8 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -208,10 +208,11 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) + p.linkIn = make(chan []byte, 1) + p.linkOut = make(chan []byte, 1) in := func(bs []byte) { - p.handlePacket(bs, linkIn) + p.handlePacket(bs) } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) @@ -221,10 +222,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { buf.Write(tcp_msg[:]) buf.Write(msgLen) buf.Write(msg) - p.updateQueueSize(-1) util_putBytes(msg) } go func() { + defer buf.Flush() var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) @@ -234,14 +235,22 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { p.updateQueueSize(-1) } } - for msg := range out { - put(msg) + for { + select { + case msg := <-p.linkOut: + send(msg) + case msg, ok := <-out: + if !ok { + return + } + put(msg) + } for len(stack) > 0 { - // Keep trying to fill the stack (LIFO order) while sending select { + case msg := <-p.linkOut: + send(msg) case msg, ok := <-out: if !ok { - buf.Flush() return } put(msg) @@ -249,6 +258,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { msg := stack[len(stack)-1] stack = stack[:len(stack)-1] send(msg) + p.updateQueueSize(-1) } } buf.Flush() @@ -265,11 +275,11 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } p.close = func() { sock.Close() } setNoDelay(sock, true) - go p.linkLoop(linkIn) + go p.linkLoop() defer func() { // Put all of our cleanup here... p.core.peers.removePeer(p.port) - close(linkIn) + close(p.linkIn) }() them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) themNodeID := getNodeID(&info.box)