diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 8bada261..803ca7be 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -15,6 +15,7 @@ import "time" import "errors" import "sync" import "fmt" +import "bufio" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense @@ -148,7 +149,12 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() - defer close(blockChan) + defer func() { + iface.mutex.Lock() + delete(iface.conns, info) + iface.mutex.Unlock() + close(blockChan) + }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces linkIn := make(chan []byte, 1) @@ -158,23 +164,29 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) + buf := bufio.NewWriterSize(sock, 65535) send := func(msg []byte) { - buf := net.Buffers{tcp_msg[:], - wire_encode_uint64(uint64(len(msg))), - msg} - size := 0 - for _, bs := range buf { - size += len(bs) - } + msgLen := wire_encode_uint64(uint64(len(msg))) + before := buf.Buffered() start := time.Now() - buf.WriteTo(sock) + buf.Write(tcp_msg[:]) + buf.Write(msgLen) + buf.Write(msg) timed := time.Since(start) - pType, _ := wire_decode_uint64(msg) - if pType == wire_LinkProtocolTraffic { - p.updateBandwidth(size, timed) + after := buf.Buffered() + written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after + if written > 0 { + p.updateBandwidth(written, timed) } util_putBytes(msg) } + flush := func() { + size := buf.Buffered() + start := time.Now() + buf.Flush() + timed := time.Since(start) + p.updateBandwidth(size, timed) + } go func() { var stack [][]byte put := func(msg []byte) { @@ -191,6 +203,7 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { select { case msg, ok := <-out: if !ok { + flush() return } put(msg) @@ -200,6 +213,7 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { send(msg) } } + flush() } }() p.out = func(msg []byte) { diff --git a/yggdrasil.go b/yggdrasil.go index 959ba474..110cd37f 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -118,7 +118,7 @@ func generateConfig() *nodeConfig { cfg.Multicast = true cfg.LinkLocal = "" cfg.IfName = "auto" - cfg.IfMTU = 65535 + cfg.IfMTU = 1280 //65535 if runtime.GOOS == "windows" { cfg.IfTAPMode = true } else { @@ -177,7 +177,7 @@ func (n *node) listen() { saddr := addr.String() //if _, isIn := n.peers[saddr]; isIn { continue } //n.peers[saddr] = struct{}{} - n.core.DEBUG_maybeSendUDPKeys(saddr) // FIXME? can result in 2 connections per peer + n.core.DEBUG_addTCPConn(saddr) // FIXME? can result in 2 connections per peer //fmt.Println("DEBUG:", "added multicast peer:", saddr) } } @@ -188,7 +188,7 @@ func (n *node) announce() { panic(err) } var anAddr net.TCPAddr - myAddr := n.core.DEBUG_getGlobalUDPAddr() + myAddr := n.core.DEBUG_getGlobalTCPAddr() anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) if err != nil {