switch to a separate queue per stream of traffic, FIXME for some reason this makes distance calculations more expensive in handleIdle?

This commit is contained in:
Arceliar 2018-06-24 17:39:43 -05:00
parent 189628b381
commit 9c028e1d0d
2 changed files with 79 additions and 16 deletions

View File

@ -51,12 +51,12 @@ ip netns exec node4 ip link set lo up
ip netns exec node5 ip link set lo up ip netns exec node5 ip link set lo up
ip netns exec node6 ip link set lo up ip netns exec node6 ip link set lo up
ip netns exec node1 ./run --autoconf --pprof &> /dev/null & ip netns exec node1 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node2 ./run --autoconf --pprof &> /dev/null & ip netns exec node2 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node3 ./run --autoconf --pprof &> /dev/null & ip netns exec node3 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node4 ./run --autoconf --pprof &> /dev/null & ip netns exec node4 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node5 ./run --autoconf --pprof &> /dev/null & ip netns exec node5 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node6 ./run --autoconf --pprof &> /dev/null & ip netns exec node6 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
echo "Started, to continue you should (possibly w/ sudo):" echo "Started, to continue you should (possibly w/ sudo):"
echo "kill" $(jobs -p) echo "kill" $(jobs -p)

View File

@ -520,17 +520,39 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
} }
} }
// Get the coords of a packet without decoding
func switch_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Returns a unique string for each stream of traffic
// Equal to type+coords+handle for traffic packets
// Equal to type+coords+toKey+fromKey for protocol traffic packets
func switch_getPacketStreamID(packet []byte) string {
pType, pTypeLen := wire_decode_uint64(packet)
_, coordLen := wire_decode_coords(packet[pTypeLen:])
end := pTypeLen + coordLen
switch {
case pType == wire_Traffic:
end += handleLen // handle
case pType == wire_ProtocolTraffic:
end += 2 * boxPubKeyLen
default:
end = 0
}
if end > len(packet) {
end = len(packet)
}
return string(packet[:end])
}
// Handle an incoming packet // Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free // Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued // Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
// Get the coords, skipping the first byte (the pType) coords := switch_getPacketCoords(packet)
_, pTypeLen := wire_decode_uint64(packet)
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
if coordLen >= len(packet) {
util_putBytes(packet)
return true
} // No payload
ports := t.core.peers.getPorts() ports := t.core.peers.getPorts()
if t.selfIsClosest(coords) { if t.selfIsClosest(coords) {
// TODO? call the router directly, and remove the whole concept of a self peer? // TODO? call the router directly, and remove the whole concept of a self peer?
@ -564,6 +586,10 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool
} }
} }
/*
FIXME for some reason the new version is a *lot* slower than this one was
It seems to be from the switchLocator.dist(coords) calls
// Handles incoming idle notifications // Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send // Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list // Returns true if the peer is no longer idle, false if it should be added to the idle list
@ -584,10 +610,45 @@ func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool {
} }
return false return false
} }
*/
// Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool {
to := t.core.peers.getPorts()[port]
if to == nil {
return true
}
var best string
var bestSize int
for streamID, packets := range stacks {
// Filter over the streams that this node is closer to
packet := packets[len(packets)-1]
if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(packet, port) {
best = streamID
bestSize = len(packets)
}
}
if bestSize != 0 {
packets := stacks[best]
var packet []byte
packet, packets = packets[len(packets)-1], packets[:len(packets)-1]
if len(packets) == 0 {
delete(stacks, best)
} else {
stacks[best] = packets
}
to.sendPacket(packet)
return true
} else {
return false
}
}
// The switch worker does routing lookups and sends packets to where they need to be // The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() { func (t *switchTable) doWorker() {
var packets [][]byte // Should really be a linked list stacks := make(map[string][][]byte) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things idle := make(map[switchPort]struct{}) // this is to deduplicate things
for { for {
select { select {
@ -595,15 +656,17 @@ func (t *switchTable) doWorker() {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end) // Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t.handleIn(packet, idle) { if !t.handleIn(packet, idle) {
// There's nobody free to take it right now, so queue it for later // There's nobody free to take it right now, so queue it for later
packets = append(packets, packet) streamID := switch_getPacketStreamID(packet)
packets := append(stacks[streamID], packet)
for len(packets) > 32 { for len(packets) > 32 {
util_putBytes(packets[0]) util_putBytes(packets[0])
packets = packets[1:] packets = packets[1:]
} }
stacks[streamID] = packets
} }
case port := <-t.idleIn: case port := <-t.idleIn:
// Try to find something to send to this peer // Try to find something to send to this peer
if !t.handleIdle(port, &packets) { if !t.handleIdle(port, stacks) {
// Didn't find anything ready to send yet, so stay idle // Didn't find anything ready to send yet, so stay idle
idle[port] = struct{}{} idle[port] = struct{}{}
} }