diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index c5654132..8bada261 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -158,6 +158,23 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) + send := func(msg []byte) { + buf := net.Buffers{tcp_msg[:], + wire_encode_uint64(uint64(len(msg))), + msg} + size := 0 + for _, bs := range buf { + size += len(bs) + } + start := time.Now() + buf.WriteTo(sock) + timed := time.Since(start) + pType, _ := wire_decode_uint64(msg) + if pType == wire_LinkProtocolTraffic { + p.updateBandwidth(size, timed) + } + util_putBytes(msg) + } go func() { var stack [][]byte put := func(msg []byte) { @@ -167,25 +184,6 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { stack = stack[1:] } } - send := func() { - msg := stack[len(stack)-1] - stack = stack[:len(stack)-1] - buf := net.Buffers{tcp_msg[:], - wire_encode_uint64(uint64(len(msg))), - msg} - size := 0 - for _, bs := range buf { - size += len(bs) - } - start := time.Now() - buf.WriteTo(sock) - timed := time.Since(start) - pType, _ := wire_decode_uint64(msg) - if pType == wire_LinkProtocolTraffic { - p.updateBandwidth(size, timed) - } - util_putBytes(msg) - } for msg := range out { put(msg) for len(stack) > 0 { @@ -197,7 +195,9 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } put(msg) default: - send() + msg := stack[len(stack)-1] + stack = stack[:len(stack)-1] + send(msg) } } } diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 990a0890..5c2a7ab1 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -46,16 +46,17 @@ func (c *connAddr) toUDPAddr() *net.UDPAddr { } type connInfo struct { - name string - addr connAddr - peer *peer - linkIn chan []byte - keysIn chan *udpKeys - timeout int // count of how many heartbeats have been missed - in func([]byte) - out chan []byte - countIn uint8 - countOut uint8 + name string + addr connAddr + peer *peer + linkIn chan []byte + keysIn chan *udpKeys + timeout int // count of how many heartbeats have been missed + in func([]byte) + out chan []byte + countIn uint8 + countOut uint8 + chunkSize uint16 } type udpKeys struct { @@ -73,6 +74,8 @@ func (iface *udpInterface) init(core *Core, addr string) { if err != nil { panic(err) } + //iface.sock.SetReadBuffer(1048576) + //iface.sock.SetWriteBuffer(1048576) iface.conns = make(map[connAddr]*connInfo) go iface.reader() } @@ -162,12 +165,16 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String()) conn = &connInfo{ - name: themString, - addr: connAddr(addr), - peer: iface.core.peers.newPeer(&ks.box, &ks.sig), - linkIn: make(chan []byte, 1), - keysIn: make(chan *udpKeys, 1), - out: make(chan []byte, 32), + name: themString, + addr: connAddr(addr), + peer: iface.core.peers.newPeer(&ks.box, &ks.sig), + linkIn: make(chan []byte, 1), + keysIn: make(chan *udpKeys, 1), + out: make(chan []byte, 32), + chunkSize: 576 - 60 - 8 - 3, // max save - max ip - udp header - chunk overhead + } + if udpAddr.IP.IsLinkLocalUnicast() { + conn.chunkSize = 65535 - 8 - 3 } /* conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } @@ -236,8 +243,8 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { for msg := range conn.out { chunks = chunks[:0] bs := msg - for len(bs) > udp_chunkSize { - chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:] + for len(bs) > int(conn.chunkSize) { + chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:] } chunks = append(chunks, bs) //iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg)) @@ -284,7 +291,7 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) { } func (iface *udpInterface) reader() { - bs := make([]byte, 2048) // This needs to be large enough for everything... + bs := make([]byte, 65536) // This needs to be large enough for everything... for { //iface.core.log.Println("Starting read") n, udpAddr, err := iface.sock.ReadFromUDP(bs) @@ -293,9 +300,7 @@ func (iface *udpInterface) reader() { panic(err) break } - if n > 1500 { - panic(n) - } + //iface.core.log.Println("DEBUG: recv len:", n) //msg := append(util_getBytes(), bs[:n]...) msg := bs[:n] var addr connAddr @@ -319,7 +324,8 @@ func (iface *udpInterface) reader() { //////////////////////////////////////////////////////////////////////////////// -const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size +//const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size +//const udp_chunkSize = 65535 - 3 - 8 func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { if len(bs) >= 3 { diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index c4a6485b..379ef249 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -21,7 +21,7 @@ const ( wire_DHTLookupResponse // inside protocol traffic header wire_SearchRequest // inside protocol traffic header wire_SearchResponse // inside protocol traffic header - //wire_Keys // udp key packet (boxPub, sigPub) + wire_Keys // udp key packet (boxPub, sigPub) ) // Encode uint64 using a variable length scheme diff --git a/yggdrasil.go b/yggdrasil.go index 5abd6b09..959ba474 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -177,7 +177,7 @@ func (n *node) listen() { saddr := addr.String() //if _, isIn := n.peers[saddr]; isIn { continue } //n.peers[saddr] = struct{}{} - n.core.DEBUG_addTCPConn(saddr) // FIXME? can result in 2 connections per peer + n.core.DEBUG_maybeSendUDPKeys(saddr) // FIXME? can result in 2 connections per peer //fmt.Println("DEBUG:", "added multicast peer:", saddr) } } @@ -188,8 +188,8 @@ func (n *node) announce() { panic(err) } var anAddr net.TCPAddr - tcpAddr := n.core.DEBUG_getGlobalTCPAddr() - anAddr.Port = tcpAddr.Port + myAddr := n.core.DEBUG_getGlobalUDPAddr() + anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) if err != nil { panic(err)