From ef1e506a0c05dc163b5db89b5bf4e30be4cbf761 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 10:23:55 -0500 Subject: [PATCH] work-in-progress on more cleanup --- src/yggdrasil/api.go | 6 ++-- src/yggdrasil/core.go | 6 ++-- src/yggdrasil/link.go | 70 +++++++++++++++++++------------------- src/yggdrasil/simlink.go | 2 +- src/yggdrasil/stream.go | 2 +- src/yggdrasil/switch.go | 2 +- src/yggdrasil/tcp.go | 50 +++++++++++++-------------- src/yggdrasil/tcp_linux.go | 6 ++-- src/yggdrasil/tls.go | 4 +-- 9 files changed, 74 insertions(+), 74 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 66ee9b81..b5b8d362 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -257,14 +257,14 @@ func (c *Core) ConnDialer() (*Dialer, error) { // "Listen" configuration item, e.g. // tcp://a.b.c.d:e func (c *Core) ListenTCP(uri string) (*TcpListener, error) { - return c.link.tcp.listen(uri, nil) + return c.links.tcp.listen(uri, nil) } // ListenTLS starts a new TLS listener. The input URI should match that of the // "Listen" configuration item, e.g. // tls://a.b.c.d:e func (c *Core) ListenTLS(uri string) (*TcpListener, error) { - return c.link.tcp.listen(uri, c.link.tcp.tls.forListener) + return c.links.tcp.listen(uri, c.links.tcp.tls.forListener) } // NodeID gets the node ID. This is derived from your router encryption keys. @@ -463,7 +463,7 @@ func (c *Core) RemovePeer(addr string, sintf string) error { // This does not add the peer to the peer list, so if the connection drops, the // peer will not be called again automatically. func (c *Core) CallPeer(addr string, sintf string) error { - return c.link.call(addr, sintf) + return c.links.call(addr, sintf) } // DisconnectPeer disconnects a peer once. This should be specified as a port diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index f7664942..4ac678df 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -29,7 +29,7 @@ type Core struct { switchTable switchTable peers peers router router - link link + links links log *log.Logger addPeerTimer *time.Timer } @@ -165,7 +165,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState return nil, err } - if err := c.link.init(c); err != nil { + if err := c.links.init(c); err != nil { c.log.Errorln("Failed to start link interfaces") return nil, err } @@ -197,7 +197,7 @@ func (c *Core) _stop() { if c.addPeerTimer != nil { c.addPeerTimer.Stop() } - c.link.stop() + c.links.stop() /* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown for _, peer := range c.GetPeers() { c.DisconnectPeer(peer.Port) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index dc61892f..9776ee50 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,7 +20,7 @@ import ( "github.com/Arceliar/phony" ) -type link struct { +type links struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface @@ -37,7 +37,7 @@ type linkInfo struct { remote string // Remote name or address } -type linkInterfaceMsgIO interface { +type linkMsgIO interface { readMsg() ([]byte, error) writeMsgs([][]byte) (int, error) close() error @@ -48,9 +48,9 @@ type linkInterfaceMsgIO interface { type linkInterface struct { lname string - link *link + links *links peer *peer - msgIO linkInterfaceMsgIO + msgIO linkMsgIO info linkInfo incoming bool force bool @@ -66,7 +66,7 @@ type linkInterface struct { blocked bool // True if we've blocked the peer in the switch } -func (l *link) init(c *Core) error { +func (l *links) init(c *Core) error { l.core = c l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) @@ -81,11 +81,11 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) reconfigure() { +func (l *links) reconfigure() { l.tcp.reconfigure() } -func (l *link) call(uri string, sintf string) error { +func (l *links) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) @@ -104,7 +104,7 @@ func (l *link) call(uri string, sintf string) error { return nil } -func (l *link) listen(uri string) error { +func (l *links) listen(uri string) error { u, err := url.Parse(uri) if err != nil { return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err) @@ -121,11 +121,11 @@ func (l *link) listen(uri string) error { } } -func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { +func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging intf := linkInterface{ lname: name, - link: l, + links: l, msgIO: msgIO, info: linkInfo{ linkType: linkType, @@ -142,7 +142,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st return &intf, nil } -func (l *link) stop() error { +func (l *links) stop() error { close(l.stopped) if err := l.tcp.stop(); err != nil { return err @@ -163,8 +163,8 @@ func (intf *linkInterface) handler() error { }) myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() - meta.box = intf.link.core.boxPub - meta.sig = intf.link.core.sigPub + meta.box = intf.links.core.boxPub + meta.sig = intf.links.core.sigPub meta.link = *myLinkPub metaBytes := meta.encode() // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) @@ -187,12 +187,12 @@ func (intf *linkInterface) handler() error { } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP - if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + if intf.incoming && !intf.force && !intf.links.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() return nil @@ -200,12 +200,12 @@ func (intf *linkInterface) handler() error { // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig - intf.link.mutex.Lock() - if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { - intf.link.mutex.Unlock() + intf.links.mutex.Lock() + if oldIntf, isIn := intf.links.interfaces[intf.info]; isIn { + intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. - intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) + intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.msgIO.close() if !intf.incoming { // Block outgoing connection attempts until the existing connection closes @@ -214,21 +214,21 @@ func (intf *linkInterface) handler() error { return nil } else { intf.closed = make(chan struct{}) - intf.link.interfaces[intf.info] = intf + intf.links.interfaces[intf.info] = intf defer func() { - intf.link.mutex.Lock() - delete(intf.link.interfaces, intf.info) - intf.link.mutex.Unlock() + intf.links.mutex.Lock() + delete(intf.links.interfaces, intf.info) + intf.links.mutex.Unlock() close(intf.closed) }() - intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) + intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name) } - intf.link.mutex.Unlock() + intf.links.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - phony.Block(&intf.link.core.peers, func() { + phony.Block(&intf.links.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) + intf.peer = intf.links.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -240,7 +240,7 @@ func (intf *linkInterface) handler() error { themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.link.core.log.Infof("Connected %s: %s, source %s", + intf.links.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start things go intf.peer.start() @@ -252,7 +252,7 @@ func (intf *linkInterface) handler() error { defer close(done) go func() { select { - case <-intf.link.stopped: + case <-intf.links.stopped: intf.msgIO.close() case <-done: } @@ -260,10 +260,10 @@ func (intf *linkInterface) handler() error { err = <-intf.reader.err // TODO don't report an error if it's just a 'use of closed network connection' if err != nil { - intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", + intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) } else { - intf.link.core.log.Infof("Disconnected %s: %s, source %s", + intf.links.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } return err @@ -355,7 +355,7 @@ func (intf *linkInterface) notifyBlockedSend() { if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. intf.blocked = true - intf.link.core.switchTable.blockPeer(intf, intf.peer.port) + intf.links.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -392,7 +392,7 @@ func (intf *linkInterface) notifyStalled() { intf.stallTimer.Stop() intf.stallTimer = nil intf.blocked = true - intf.link.core.switchTable.blockPeer(intf, intf.peer.port) + intf.links.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -419,7 +419,7 @@ func (intf *linkInterface) notifyRead(size int) { } if intf.blocked { intf.blocked = false - intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + intf.links.core.switchTable.unblockPeer(intf, intf.peer.port) } }) } diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index f830c215..6c04a8c0 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -58,7 +58,7 @@ func (c *Core) NewSimlink() *Simlink { s := &Simlink{rch: make(chan []byte, 1)} n := "Simlink" var err error - s.link, err = c.link.create(s, n, n, n, n, false, true) + s.link, err = c.links.create(s, n, n, n, n, false, true) if err != nil { panic(err) } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index be1398fc..afa97c76 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -9,7 +9,7 @@ import ( ) // Test that this matches the interface we expect -var _ = linkInterfaceMsgIO(&stream{}) +var _ = linkMsgIO(&stream{}) type stream struct { rwc io.ReadWriteCloser diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6ab9a02b..a5c099ba 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -188,7 +188,7 @@ func (t *switchTable) init(core *Core) { func (t *switchTable) reconfigure() { // This is where reconfiguration would go, if we had anything useful to do. - t.core.link.reconfigure() + t.core.links.reconfigure() t.core.peers.reconfigure() } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 9cca4193..17b34e91 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -33,7 +33,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { - link *link + links *links waitgroup sync.WaitGroup mutex sync.Mutex // Protecting the below listeners map[string]*TcpListener @@ -86,8 +86,8 @@ func (t *tcp) getAddr() *net.TCPAddr { } // Initializes the struct. -func (t *tcp) init(l *link) error { - t.link = l +func (t *tcp) init(l *links) error { + t.links = l t.tls.init(t) t.mutex.Lock() t.calls = make(map[string]struct{}) @@ -95,9 +95,9 @@ func (t *tcp) init(l *link) error { t.listeners = make(map[string]*TcpListener) t.mutex.Unlock() - t.link.core.config.Mutex.RLock() - defer t.link.core.config.Mutex.RUnlock() - for _, listenaddr := range t.link.core.config.Current.Listen { + t.links.core.config.Mutex.RLock() + defer t.links.core.config.Mutex.RUnlock() + for _, listenaddr := range t.links.core.config.Current.Listen { switch listenaddr[:6] { case "tcp://": if _, err := t.listen(listenaddr[6:], nil); err != nil { @@ -108,7 +108,7 @@ func (t *tcp) init(l *link) error { return err } default: - t.link.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") } } @@ -126,35 +126,35 @@ func (t *tcp) stop() error { } func (t *tcp) reconfigure() { - t.link.core.config.Mutex.RLock() - added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) - deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen) - t.link.core.config.Mutex.RUnlock() + t.links.core.config.Mutex.RLock() + added := util.Difference(t.links.core.config.Current.Listen, t.links.core.config.Previous.Listen) + deleted := util.Difference(t.links.core.config.Previous.Listen, t.links.core.config.Current.Listen) + t.links.core.config.Mutex.RUnlock() if len(added) > 0 || len(deleted) > 0 { for _, a := range added { switch a[:6] { case "tcp://": if _, err := t.listen(a[6:], nil); err != nil { - t.link.core.log.Errorln("Error adding TCP", a[6:], "listener:", err) + t.links.core.log.Errorln("Error adding TCP", a[6:], "listener:", err) } case "tls://": if _, err := t.listen(a[6:], t.tls.forListener); err != nil { - t.link.core.log.Errorln("Error adding TLS", a[6:], "listener:", err) + t.links.core.log.Errorln("Error adding TLS", a[6:], "listener:", err) } default: - t.link.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") } } for _, d := range deleted { if d[:6] != "tcp://" && d[:6] != "tls://" { - t.link.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") continue } t.mutex.Lock() if listener, ok := t.listeners[d[6:]]; ok { t.mutex.Unlock() listener.Stop() - t.link.core.log.Infoln("Stopped TCP listener:", d[6:]) + t.links.core.log.Infoln("Stopped TCP listener:", d[6:]) } else { t.mutex.Unlock() } @@ -202,13 +202,13 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { } // And here we go! defer func() { - t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) + t.links.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) l.Listener.Close() t.mutex.Lock() delete(t.listeners, listenaddr) t.mutex.Unlock() }() - t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) + t.links.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) go func() { <-l.stop l.Listener.Close() @@ -217,7 +217,7 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { for { sock, err := l.Listener.Accept() if err != nil { - t.link.core.log.Errorln("Failed to accept connection:", err) + t.links.core.log.Errorln("Failed to accept connection:", err) return } t.waitgroup.Add(1) @@ -344,7 +344,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp } conn, err = dialer.Dial("tcp", dst.String()) if err != nil { - t.link.core.log.Debugf("Failed to dial %s: %s", callproto, err) + t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err) return } t.waitgroup.Add(1) @@ -361,7 +361,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade if upgrade != nil { var err error if sock, err = upgrade.upgrade(sock); err != nil { - t.link.core.log.Errorln("TCP handler upgrade failed:", err) + t.links.core.log.Errorln("TCP handler upgrade failed:", err) return } else { upgraded = true @@ -387,12 +387,12 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) } force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() - link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force) + link, err := t.links.create(&stream, name, proto, local, remote, incoming, force) if err != nil { - t.link.core.log.Println(err) + t.links.core.log.Println(err) panic(err) } - t.link.core.log.Debugln("DEBUG: starting handler for", name) + t.links.core.log.Debugln("DEBUG: starting handler for", name) err = link.handler() - t.link.core.log.Debugln("DEBUG: stopped handler for", name, err) + t.links.core.log.Debugln("DEBUG: stopped handler for", name, err) } diff --git a/src/yggdrasil/tcp_linux.go b/src/yggdrasil/tcp_linux.go index 9ec3c10f..e18f92b1 100644 --- a/src/yggdrasil/tcp_linux.go +++ b/src/yggdrasil/tcp_linux.go @@ -20,10 +20,10 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { // Log any errors if bbr != nil { - t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr) + t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr) } if control != nil { - t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control) + t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control) } // Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal @@ -38,7 +38,7 @@ func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) err } c.Control(btd) if err != nil { - t.link.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf) + t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf) } return t.tcpContext(network, address, c) } diff --git a/src/yggdrasil/tls.go b/src/yggdrasil/tls.go index 7212c4df..e2861aca 100644 --- a/src/yggdrasil/tls.go +++ b/src/yggdrasil/tls.go @@ -34,7 +34,7 @@ func (t *tcptls) init(tcp *tcp) { } edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize) - copy(edpriv[:], tcp.link.core.sigPriv[:]) + copy(edpriv[:], tcp.links.core.sigPriv[:]) certBuf := &bytes.Buffer{} @@ -42,7 +42,7 @@ func (t *tcptls) init(tcp *tcp) { pubtemp := x509.Certificate{ SerialNumber: big.NewInt(1), Subject: pkix.Name{ - CommonName: hex.EncodeToString(tcp.link.core.sigPub[:]), + CommonName: hex.EncodeToString(tcp.links.core.sigPub[:]), }, NotBefore: time.Now(), NotAfter: time.Now().Add(time.Hour * 24 * 365),