diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ae1c391d..6c5fd21d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,17 +589,17 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string var bestSize int - for streamID, packets := range stacks { + for streamID, packets := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[len(packets)-1] + packet := packets[0] coords := switch_getPacketCoords(packet) if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID @@ -607,13 +607,13 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo } } if bestSize != 0 { - packets := stacks[best] + packets := buffs[best] var packet []byte - packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + packet, packets = packets[0], packets[1:] if len(packets) == 0 { - delete(stacks, best) + delete(buffs, best) } else { - stacks[best] = packets + buffs[best] = packets } to.sendPacket(packet) return true @@ -624,7 +624,7 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) + buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -633,16 +633,16 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(stacks[streamID], packet) + packets := append(buffs[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } - stacks[streamID] = packets + buffs[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, stacks) { + if !t.handleIdle(port, buffs) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 12147d4e..80ae2847 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -246,19 +246,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer close(out) go func() { // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := make(chan []byte) - defer close(send) - go func() { - // This goroutine does the actual socket write operations - // The parent goroutine aggregates things for it and feeds them in - for msg := range send { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } - }() + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -266,7 +260,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: // Always send outgoing link traffic first, if needed - send <- msg + send(msg) continue default: } @@ -279,14 +273,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send <- nil // TCP keep-alive traffic + send(nil) // TCP keep-alive traffic case msg := <-p.linkOut: - send <- msg + send(msg) case msg, ok := <-out: if !ok { return } - send <- msg // Block until the socket writer has the packet + send(msg) // Block until the socket write has finished // Now inform the switch that we're ready for more traffic p.core.switchTable.idleIn <- p.port }