try using a simpler FIFO order for each backpressure buffer, since there are other mechanisms to penalize the flooding node, leads to better TCP throughput without affecting traffic between other nodes (does affect traffic in the same session, but there's hypothetically workarounds to that)

This commit is contained in:
Arceliar 2018-06-24 20:20:07 -05:00
parent 4ad2446557
commit 7695a3fcbf
2 changed files with 22 additions and 28 deletions

View File

@ -589,17 +589,17 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool
// Handles incoming idle notifications // Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send // Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list // Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool {
to := t.core.peers.getPorts()[port] to := t.core.peers.getPorts()[port]
if to == nil { if to == nil {
return true return true
} }
var best string var best string
var bestSize int var bestSize int
for streamID, packets := range stacks { for streamID, packets := range buffs {
// Filter over the streams that this node is closer to // Filter over the streams that this node is closer to
// Keep the one with the smallest queue // Keep the one with the smallest queue
packet := packets[len(packets)-1] packet := packets[0]
coords := switch_getPacketCoords(packet) coords := switch_getPacketCoords(packet)
if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) {
best = streamID best = streamID
@ -607,13 +607,13 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo
} }
} }
if bestSize != 0 { if bestSize != 0 {
packets := stacks[best] packets := buffs[best]
var packet []byte var packet []byte
packet, packets = packets[len(packets)-1], packets[:len(packets)-1] packet, packets = packets[0], packets[1:]
if len(packets) == 0 { if len(packets) == 0 {
delete(stacks, best) delete(buffs, best)
} else { } else {
stacks[best] = packets buffs[best] = packets
} }
to.sendPacket(packet) to.sendPacket(packet)
return true return true
@ -624,7 +624,7 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo
// The switch worker does routing lookups and sends packets to where they need to be // The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() { func (t *switchTable) doWorker() {
stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) buffs := make(map[string][][]byte) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things idle := make(map[switchPort]struct{}) // this is to deduplicate things
for { for {
select { select {
@ -633,16 +633,16 @@ func (t *switchTable) doWorker() {
if !t.handleIn(packet, idle) { if !t.handleIn(packet, idle) {
// There's nobody free to take it right now, so queue it for later // There's nobody free to take it right now, so queue it for later
streamID := switch_getPacketStreamID(packet) streamID := switch_getPacketStreamID(packet)
packets := append(stacks[streamID], packet) packets := append(buffs[streamID], packet)
for len(packets) > 32 { for len(packets) > 32 {
util_putBytes(packets[0]) util_putBytes(packets[0])
packets = packets[1:] packets = packets[1:]
} }
stacks[streamID] = packets buffs[streamID] = packets
} }
case port := <-t.idleIn: case port := <-t.idleIn:
// Try to find something to send to this peer // Try to find something to send to this peer
if !t.handleIdle(port, stacks) { if !t.handleIdle(port, buffs) {
// Didn't find anything ready to send yet, so stay idle // Didn't find anything ready to send yet, so stay idle
idle[port] = struct{}{} idle[port] = struct{}{}
} }

View File

@ -246,19 +246,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
defer close(out) defer close(out)
go func() { go func() {
// This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
send := make(chan []byte) send := func(msg []byte) {
defer close(send) msgLen := wire_encode_uint64(uint64(len(msg)))
go func() { buf := net.Buffers{tcp_msg[:], msgLen, msg}
// This goroutine does the actual socket write operations buf.WriteTo(sock)
// The parent goroutine aggregates things for it and feeds them in atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
for msg := range send { util_putBytes(msg)
msgLen := wire_encode_uint64(uint64(len(msg))) }
buf := net.Buffers{tcp_msg[:], msgLen, msg}
buf.WriteTo(sock)
atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
util_putBytes(msg)
}
}()
timerInterval := tcp_timeout * 2 / 3 timerInterval := tcp_timeout * 2 / 3
timer := time.NewTimer(timerInterval) timer := time.NewTimer(timerInterval)
defer timer.Stop() defer timer.Stop()
@ -266,7 +260,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
select { select {
case msg := <-p.linkOut: case msg := <-p.linkOut:
// Always send outgoing link traffic first, if needed // Always send outgoing link traffic first, if needed
send <- msg send(msg)
continue continue
default: default:
} }
@ -279,14 +273,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
timer.Reset(timerInterval) timer.Reset(timerInterval)
select { select {
case _ = <-timer.C: case _ = <-timer.C:
send <- nil // TCP keep-alive traffic send(nil) // TCP keep-alive traffic
case msg := <-p.linkOut: case msg := <-p.linkOut:
send <- msg send(msg)
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
return return
} }
send <- msg // Block until the socket writer has the packet send(msg) // Block until the socket write has finished
// Now inform the switch that we're ready for more traffic // Now inform the switch that we're ready for more traffic
p.core.switchTable.idleIn <- p.port p.core.switchTable.idleIn <- p.port
} }