diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 4fb3026d..ec8bca46 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -53,8 +53,8 @@ type tcpInterface struct { type tcpInfo struct { box crypto.BoxPubKey sig crypto.SigPubKey + localAddr string remoteAddr string - remotePort string } // Wrapper function to set additional options for specific connection types. @@ -150,6 +150,22 @@ func (iface *tcpInterface) listener() { } } +// Checks if we already have a connection to this node +func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.conns[info] + return isIn +} + +// Checks if we already are calling this address +func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.calls[saddr] + return isIn +} + // Checks if a connection already exists. // If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. // If the dial is successful, it launches the handler. @@ -161,25 +177,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - quit := false - iface.mutex.Lock() - if _, isIn := iface.calls[callname]; isIn { - quit = true - } else { - iface.calls[callname] = struct{}{} - defer func() { - // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() - }() - } - iface.mutex.Unlock() - if quit { + if iface.isAlreadyCalling(saddr) { return } + iface.calls[callname] = struct{}{} + defer func() { + // Block new calls for a little while, to mitigate livelock scenarios + time.Sleep(default_tcp_timeout) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + iface.mutex.Lock() + delete(iface.calls, callname) + iface.mutex.Unlock() + }() var conn net.Conn var err error if socksaddr != nil { @@ -284,9 +293,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? return } + remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) + localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) + if e1 != nil || e2 != nil { + return + } info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, + box: meta.box, + sig: meta.sig, + localAddr: localAddr, + remoteAddr: remoteAddr, + } + if iface.isAlreadyConnected(info) { + return } // Quit the parent call if this is a connection to ourself equiv := func(k1, k2 []byte) bool { @@ -297,14 +316,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } return true } - if equiv(info.box[:], iface.core.boxPub[:]) { + if equiv(meta.box[:], iface.core.boxPub[:]) { return } - if equiv(info.sig[:], iface.core.sigPub[:]) { + if equiv(meta.sig[:], iface.core.sigPub[:]) { return } // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) { + if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { // Allow unauthorized peers if they're link-local raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) raddr := net.ParseIP(raddrStr) @@ -313,14 +332,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } } // Check if we already have a connection to this node, close and block if yes - info.remoteAddr, info.remotePort, _ = net.SplitHostPort(sock.RemoteAddr().String()) iface.mutex.Lock() - if blockChan, isIn := iface.conns[info]; isIn { + /*if blockChan, isIn := iface.conns[info]; isIn { iface.mutex.Unlock() sock.Close() <-blockChan return - } + }*/ blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() @@ -332,7 +350,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) + p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) in := func(bs []byte) { p.handlePacket(bs) @@ -394,7 +412,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&info.box) + themNodeID := crypto.GetNodeID(&meta.box) themAddr := address.AddrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them)