From cd86c338505003dc7779423e427c0e936885f727 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 21:11:16 +0000 Subject: [PATCH] Try to tidy up a bit, move checks for if we are already calling/connected Something I noticed when working on reconfigure support for the "Listen" option is that we have some rather huge weaknesses in our multicasting design. Right now if we change our Listen address, it's not really possible for remote nodes to know whether they are still connected to us, so they start connecting in response to our changed beacons. They can't know that they already know about us until *after* the handshake but this registers in the local client log as repeated Connect/Disconnects even though the existing peerings never actually drop. --- src/yggdrasil/tcp.go | 74 +++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 4fb3026d..ec8bca46 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -53,8 +53,8 @@ type tcpInterface struct { type tcpInfo struct { box crypto.BoxPubKey sig crypto.SigPubKey + localAddr string remoteAddr string - remotePort string } // Wrapper function to set additional options for specific connection types. @@ -150,6 +150,22 @@ func (iface *tcpInterface) listener() { } } +// Checks if we already have a connection to this node +func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.conns[info] + return isIn +} + +// Checks if we already are calling this address +func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.calls[saddr] + return isIn +} + // Checks if a connection already exists. // If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. // If the dial is successful, it launches the handler. @@ -161,25 +177,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - quit := false - iface.mutex.Lock() - if _, isIn := iface.calls[callname]; isIn { - quit = true - } else { - iface.calls[callname] = struct{}{} - defer func() { - // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() - }() - } - iface.mutex.Unlock() - if quit { + if iface.isAlreadyCalling(saddr) { return } + iface.calls[callname] = struct{}{} + defer func() { + // Block new calls for a little while, to mitigate livelock scenarios + time.Sleep(default_tcp_timeout) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + iface.mutex.Lock() + delete(iface.calls, callname) + iface.mutex.Unlock() + }() var conn net.Conn var err error if socksaddr != nil { @@ -284,9 +293,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? return } + remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) + localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) + if e1 != nil || e2 != nil { + return + } info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, + box: meta.box, + sig: meta.sig, + localAddr: localAddr, + remoteAddr: remoteAddr, + } + if iface.isAlreadyConnected(info) { + return } // Quit the parent call if this is a connection to ourself equiv := func(k1, k2 []byte) bool { @@ -297,14 +316,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } return true } - if equiv(info.box[:], iface.core.boxPub[:]) { + if equiv(meta.box[:], iface.core.boxPub[:]) { return } - if equiv(info.sig[:], iface.core.sigPub[:]) { + if equiv(meta.sig[:], iface.core.sigPub[:]) { return } // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) { + if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { // Allow unauthorized peers if they're link-local raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) raddr := net.ParseIP(raddrStr) @@ -313,14 +332,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } } // Check if we already have a connection to this node, close and block if yes - info.remoteAddr, info.remotePort, _ = net.SplitHostPort(sock.RemoteAddr().String()) iface.mutex.Lock() - if blockChan, isIn := iface.conns[info]; isIn { + /*if blockChan, isIn := iface.conns[info]; isIn { iface.mutex.Unlock() sock.Close() <-blockChan return - } + }*/ blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() @@ -332,7 +350,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) + p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) in := func(bs []byte) { p.handlePacket(bs) @@ -394,7 +412,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&info.box) + themNodeID := crypto.GetNodeID(&meta.box) themAddr := address.AddrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them)