From 38e77041612c99253fb5a6e91958cab90f109d7d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 27 May 2018 13:37:35 -0500 Subject: [PATCH 1/2] use backpressure instead of estimated bandwidth, sorted by uptime to break ties --- src/yggdrasil/peer.go | 21 +++++---------------- src/yggdrasil/switch.go | 30 +++++++++++++++--------------- src/yggdrasil/tcp.go | 22 +++++----------------- src/yggdrasil/udp.go | 5 ++--- 4 files changed, 27 insertions(+), 51 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4ce1a780..ed73ee44 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -25,7 +25,6 @@ package yggdrasil import "time" import "sync" import "sync/atomic" -import "math" //import "fmt" @@ -86,7 +85,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { type peer struct { // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends // use get/update methods only! (atomic accessors as float64) - bandwidth uint64 + queueSize int64 bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element @@ -116,22 +115,12 @@ type peer struct { const peer_Throttle = 1 -func (p *peer) getBandwidth() float64 { - bits := atomic.LoadUint64(&p.bandwidth) - return math.Float64frombits(bits) +func (p *peer) getQueueSize() int64 { + return atomic.LoadInt64(&p.queueSize) } -func (p *peer) updateBandwidth(bytes int, duration time.Duration) { - if p == nil { - return - } - for ok := false; !ok; { - oldBits := atomic.LoadUint64(&p.bandwidth) - oldBandwidth := math.Float64frombits(oldBits) - bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds() - bits := math.Float64bits(bandwidth) - ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits) - } +func (p *peer) updateQueueSize(delta int64) { + atomic.AddInt64(&p.queueSize, delta) } func (ps *peers) newPeer(box *boxPubKey, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a005b106..bbc28392 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,6 +12,7 @@ package yggdrasil // A little annoying to do with constant changes from bandwidth estimates import "time" +import "sort" import "sync" import "sync/atomic" @@ -397,37 +398,36 @@ func (t *switchTable) updateTable() { port: pinfo.port, }) } + sort.SliceStable(newTable.elems, func(i, j int) bool { + return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen) + }) t.table.Store(newTable) } func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { t.updater.Load().(*sync.Once).Do(t.updateTable) table := t.table.Load().(lookupTable) - ports := t.core.peers.getPorts() - getBandwidth := func(port switchPort) float64 { - var bandwidth float64 - if p, isIn := ports[port]; isIn { - bandwidth = p.getBandwidth() - } - return bandwidth - } - var best switchPort myDist := table.self.dist(dest) //getDist(table.self.coords) if !(uint64(myDist) < ttl) { return 0, 0 } - // score is in units of bandwidth / distance - bestScore := float64(-1) + // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow + ports := t.core.peers.getPorts() + var best switchPort + bestCost := int64(^uint64(0) >> 1) for _, info := range table.elems { dist := info.locator.dist(dest) //getDist(info.locator.coords) if !(dist < myDist) { continue } - score := getBandwidth(info.port) - score /= float64(1 + dist) - if score > bestScore { + p, isIn := ports[info.port] + if !isIn { + continue + } + cost := int64(dist) + p.getQueueSize() + if cost < bestCost { best = info.port - bestScore = score + bestCost = cost } } //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 869b6afd..f02aff5b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -190,26 +190,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { buf := bufio.NewWriterSize(sock, tcp_msgSize) send := func(msg []byte) { msgLen := wire_encode_uint64(uint64(len(msg))) - before := buf.Buffered() - start := time.Now() buf.Write(tcp_msg[:]) buf.Write(msgLen) buf.Write(msg) - timed := time.Since(start) - after := buf.Buffered() - written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after - if written > 0 { - p.updateBandwidth(written, timed) - } + p.updateQueueSize(-1) util_putBytes(msg) } - flush := func() { - size := buf.Buffered() - start := time.Now() - buf.Flush() - timed := time.Since(start) - p.updateBandwidth(size, timed) - } go func() { var stack [][]byte put := func(msg []byte) { @@ -217,6 +203,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] + p.updateQueueSize(-1) } } for msg := range out { @@ -226,7 +213,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg, ok := <-out: if !ok { - flush() + buf.Flush() return } put(msg) @@ -236,13 +223,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { send(msg) } } - flush() + buf.Flush() } }() p.out = func(msg []byte) { defer func() { recover() }() select { case out <- msg: + p.updateQueueSize(1) default: util_putBytes(msg) } diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 02fb9d6d..68f53a8e 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -265,6 +265,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { defer func() { recover() }() select { case conn.out <- msg: + conn.peer.updateQueueSize(1) default: util_putBytes(msg) } @@ -282,16 +283,14 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { if len(chunks) > 255 { continue } - start := time.Now() for idx, bs := range chunks { nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut out = udp_encode(out[:0], nChunks, nChunk, count, bs) //iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs)) iface.sock.WriteToUDP(out, udpAddr) } - timed := time.Since(start) conn.countOut += 1 - conn.peer.updateBandwidth(len(msg), timed) + conn.peer.updateQueueSize(-1) util_putBytes(msg) } }() From fad6f6b50e4d1b63a8695a82855ea3c646e7e44d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 16:57:36 -0500 Subject: [PATCH 2/2] remove udp.go --- src/yggdrasil/udp.go | 374 ------------------------------------------- 1 file changed, 374 deletions(-) delete mode 100644 src/yggdrasil/udp.go diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go deleted file mode 100644 index 68f53a8e..00000000 --- a/src/yggdrasil/udp.go +++ /dev/null @@ -1,374 +0,0 @@ -package yggdrasil - -// This communicates with peers via UDP -// It's not as well tested or debugged as the TCP transport -// It's intended to use UDP, so debugging/optimzing this is a high priority -// TODO? use golang.org/x/net/ipv6.PacketConn's ReadBatch and WriteBatch? -// To send all chunks of a message / recv all available chunks in one syscall -// That might be faster on supported platforms, but it needs investigation -// Chunks are currently murged, but outgoing messages aren't chunked -// This is just to support chunking in the future, if it's needed and debugged -// Basically, right now we might send UDP packets that are too large - -// TODO remove old/unused code and better document live code - -import "net" -import "time" -import "sync" -import "fmt" - -type udpInterface struct { - core *Core - sock *net.UDPConn // Or more general PacketConn? - mutex sync.RWMutex // each conn has an owner goroutine - conns map[connAddr]*connInfo -} - -type connAddr struct { - ip [16]byte - port int - zone string -} - -func (c *connAddr) fromUDPAddr(u *net.UDPAddr) { - copy(c.ip[:], u.IP.To16()) - c.port = u.Port - c.zone = u.Zone -} - -func (c *connAddr) toUDPAddr() *net.UDPAddr { - var u net.UDPAddr - u.IP = make([]byte, 16) - copy(u.IP, c.ip[:]) - u.Port = c.port - u.Zone = c.zone - return &u -} - -type connInfo struct { - name string - addr connAddr - peer *peer - linkIn chan []byte - keysIn chan *udpKeys - closeIn chan *udpKeys - timeout int // count of how many heartbeats have been missed - in func([]byte) - out chan []byte - countIn uint8 - countOut uint8 - chunkSize uint16 -} - -type udpKeys struct { - box boxPubKey - sig sigPubKey -} - -func (iface *udpInterface) init(core *Core, addr string) (err error) { - iface.core = core - udpAddr, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - return - } - iface.sock, err = net.ListenUDP("udp", udpAddr) - if err != nil { - return - } - iface.conns = make(map[connAddr]*connInfo) - go iface.reader() - return -} - -func (iface *udpInterface) sendKeys(addr connAddr) { - udpAddr := addr.toUDPAddr() - msg := []byte{} - msg = udp_encode(msg, 0, 0, 0, nil) - msg = append(msg, iface.core.boxPub[:]...) - msg = append(msg, iface.core.sigPub[:]...) - iface.sock.WriteToUDP(msg, udpAddr) -} - -func (iface *udpInterface) sendClose(addr connAddr) { - udpAddr := addr.toUDPAddr() - msg := []byte{} - msg = udp_encode(msg, 0, 1, 0, nil) - msg = append(msg, iface.core.boxPub[:]...) - msg = append(msg, iface.core.sigPub[:]...) - iface.sock.WriteToUDP(msg, udpAddr) -} - -func udp_isKeys(msg []byte) bool { - keyLen := 3 + boxPubKeyLen + sigPubKeyLen - return len(msg) == keyLen && msg[0] == 0x00 && msg[1] == 0x00 -} - -func udp_isClose(msg []byte) bool { - keyLen := 3 + boxPubKeyLen + sigPubKeyLen - return len(msg) == keyLen && msg[0] == 0x00 && msg[1] == 0x01 -} - -func (iface *udpInterface) startConn(info *connInfo) { - ticker := time.NewTicker(6 * time.Second) - defer ticker.Stop() - defer func() { - // Cleanup - iface.mutex.Lock() - delete(iface.conns, info.addr) - iface.mutex.Unlock() - iface.core.peers.removePeer(info.peer.port) - close(info.linkIn) - close(info.keysIn) - close(info.closeIn) - close(info.out) - iface.core.log.Println("Removing peer:", info.name) - }() - for { - select { - case ks := <-info.closeIn: - { - if ks.box == info.peer.box && ks.sig == info.peer.sig { - // TODO? secure this somehow - // Maybe add a signature and sequence number (timestamp) to close and keys? - return - } - } - case ks := <-info.keysIn: - { - // FIXME? need signatures/sequence-numbers or something - // Spoofers could lock out a peer with fake/bad keys - if ks.box == info.peer.box && ks.sig == info.peer.sig { - info.timeout = 0 - } - } - case <-ticker.C: - { - if info.timeout > 10 { - return - } - info.timeout++ - iface.sendKeys(info.addr) - } - } - } -} - -func (iface *udpInterface) handleClose(msg []byte, addr connAddr) { - //defer util_putBytes(msg) - var ks udpKeys - _, _, _, bs := udp_decode(msg) - switch { - case !wire_chop_slice(ks.box[:], &bs): - return - case !wire_chop_slice(ks.sig[:], &bs): - return - } - if ks.box == iface.core.boxPub { - return - } - if ks.sig == iface.core.sigPub { - return - } - iface.mutex.RLock() - conn, isIn := iface.conns[addr] - iface.mutex.RUnlock() - if !isIn { - return - } - func() { - defer func() { recover() }() - select { - case conn.closeIn <- &ks: - default: - } - }() -} - -func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { - //defer util_putBytes(msg) - var ks udpKeys - _, _, _, bs := udp_decode(msg) - switch { - case !wire_chop_slice(ks.box[:], &bs): - return - case !wire_chop_slice(ks.sig[:], &bs): - return - } - if ks.box == iface.core.boxPub { - return - } - if ks.sig == iface.core.sigPub { - return - } - iface.mutex.RLock() - conn, isIn := iface.conns[addr] - iface.mutex.RUnlock() - if !isIn { - udpAddr := addr.toUDPAddr() - // Check if we're authorized to connect to this key / IP - // TODO monitor and always allow outgoing connections - if !iface.core.peers.isAllowedEncryptionPublicKey(&ks.box) { - // Allow unauthorized peers if they're link-local - if !udpAddr.IP.IsLinkLocalUnicast() { - return - } - } - themNodeID := getNodeID(&ks.box) - themAddr := address_addrForNodeID(themNodeID) - themAddrString := net.IP(themAddr[:]).String() - themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String()) - conn = &connInfo{ - name: themString, - addr: connAddr(addr), - peer: iface.core.peers.newPeer(&ks.box, &ks.sig), - linkIn: make(chan []byte, 1), - keysIn: make(chan *udpKeys, 1), - closeIn: make(chan *udpKeys, 1), - out: make(chan []byte, 32), - chunkSize: 576 - 60 - 8 - 3, // max safe - max ip - udp header - chunk overhead - } - if udpAddr.IP.IsLinkLocalUnicast() { - ifce, err := net.InterfaceByName(udpAddr.Zone) - if ifce != nil && err == nil { - conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3 - } - } - var inChunks uint8 - var inBuf []byte - conn.in = func(bs []byte) { - //defer util_putBytes(bs) - chunks, chunk, count, payload := udp_decode(bs) - if count != conn.countIn { - if len(inBuf) > 0 { - // Something went wrong - // Forward whatever we have - // Maybe the destination can do something about it - msg := append(util_getBytes(), inBuf...) - conn.peer.handlePacket(msg, conn.linkIn) - } - inChunks = 0 - inBuf = inBuf[:0] - conn.countIn = count - } - if chunk <= chunks && chunk == inChunks+1 { - inChunks += 1 - inBuf = append(inBuf, payload...) - if chunks != chunk { - return - } - msg := append(util_getBytes(), inBuf...) - conn.peer.handlePacket(msg, conn.linkIn) - inBuf = inBuf[:0] - } - } - conn.peer.out = func(msg []byte) { - defer func() { recover() }() - select { - case conn.out <- msg: - conn.peer.updateQueueSize(1) - default: - util_putBytes(msg) - } - } - go func() { - var out []byte - var chunks [][]byte - for msg := range conn.out { - chunks = chunks[:0] - bs := msg - for len(bs) > int(conn.chunkSize) { - chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:] - } - chunks = append(chunks, bs) - if len(chunks) > 255 { - continue - } - for idx, bs := range chunks { - nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut - out = udp_encode(out[:0], nChunks, nChunk, count, bs) - //iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs)) - iface.sock.WriteToUDP(out, udpAddr) - } - conn.countOut += 1 - conn.peer.updateQueueSize(-1) - util_putBytes(msg) - } - }() - //*/ - conn.peer.close = func() { iface.sendClose(conn.addr) } - iface.mutex.Lock() - iface.conns[addr] = conn - iface.mutex.Unlock() - iface.core.log.Println("Adding peer:", conn.name) - go iface.startConn(conn) - go conn.peer.linkLoop(conn.linkIn) - iface.sendKeys(conn.addr) - } - func() { - defer func() { recover() }() - select { - case conn.keysIn <- &ks: - default: - } - }() -} - -func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) { - iface.mutex.RLock() - if conn, isIn := iface.conns[addr]; isIn { - conn.in(msg) - } - iface.mutex.RUnlock() -} - -func (iface *udpInterface) reader() { - iface.core.log.Println("Listening for UDP on:", iface.sock.LocalAddr().String()) - bs := make([]byte, 65536) // This needs to be large enough for everything... - for { - n, udpAddr, err := iface.sock.ReadFromUDP(bs) - //iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n) - if err != nil { - panic(err) - break - } - msg := bs[:n] - var addr connAddr - addr.fromUDPAddr(udpAddr) - switch { - case udp_isKeys(msg): - var them address - copy(them[:], udpAddr.IP.To16()) - if them.isValid() { - continue - } - if udpAddr.IP.IsLinkLocalUnicast() { - if len(iface.core.ifceExpr) == 0 { - break - } - for _, expr := range iface.core.ifceExpr { - if expr.MatchString(udpAddr.Zone) { - iface.handleKeys(msg, addr) - break - } - } - } - case udp_isClose(msg): - iface.handleClose(msg, addr) - default: - iface.handlePacket(msg, addr) - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - -func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { - if len(bs) >= 3 { - chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:] - } - return -} - -func udp_encode(out []byte, chunks, chunk, count uint8, payload []byte) []byte { - return append(append(out, chunks, chunk, count), payload...) -}