From 68eb684f6df3aa158ff1a6f40864df93bcaabcda Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 16 Jan 2019 19:27:44 +0000 Subject: [PATCH 01/27] Fix mobile.go now that multicast interfaces are handled internally --- src/yggdrasil/mobile.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/yggdrasil/mobile.go b/src/yggdrasil/mobile.go index 2ffeffb9..52215f06 100644 --- a/src/yggdrasil/mobile.go +++ b/src/yggdrasil/mobile.go @@ -7,7 +7,6 @@ import ( "encoding/json" "log" "os" - "regexp" "time" hjson "github.com/hjson/hjson-go" @@ -52,10 +51,6 @@ func (c *Core) StartAutoconfigure() error { if hostname, err := os.Hostname(); err == nil { nc.NodeInfo = map[string]interface{}{"name": hostname} } - ifceExpr, err := regexp.Compile(".*") - if err == nil { - c.ifceExpr = append(c.ifceExpr, ifceExpr) - } if err := c.Start(nc, logger); err != nil { return err } @@ -77,15 +72,6 @@ func (c *Core) StartJSON(configjson []byte) error { return err } nc.IfName = "dummy" - //c.log.Println(nc.MulticastInterfaces) - for _, ll := range nc.MulticastInterfaces { - //c.log.Println("Processing MC", ll) - ifceExpr, err := regexp.Compile(ll) - if err != nil { - panic(err) - } - c.AddMulticastInterfaceExpr(ifceExpr) - } if err := c.Start(nc, logger); err != nil { return err } From 6fe3b01e905b6f8ce482faa83f2e90315fcf72c0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 19 Jan 2019 00:14:10 +0000 Subject: [PATCH 02/27] Rename awdl.go to link.go, add stream.go, update tcp.go --- src/yggdrasil/core.go | 12 +-- src/yggdrasil/{awdl.go => link.go} | 55 ++++++------- src/yggdrasil/stream.go | 111 +++++++++++++++++++++++++ src/yggdrasil/tcp.go | 128 ++++++++--------------------- 4 files changed, 177 insertions(+), 129 deletions(-) rename src/yggdrasil/{awdl.go => link.go} (72%) create mode 100644 src/yggdrasil/stream.go diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index ed075813..fa5d02ad 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -44,7 +44,7 @@ type Core struct { searches searches multicast multicast tcp tcpInterface - awdl awdl + link link log *log.Logger } @@ -198,10 +198,12 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.awdl.init(c); err != nil { - c.log.Println("Failed to start AWDL interface") - return err - } + /* + if err := c.awdl.init(c); err != nil { + c.log.Println("Failed to start AWDL interface") + return err + } + */ if nc.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize { c.switchTable.queueTotalMaxSize = nc.SwitchOptions.MaxTotalQueueSize diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/link.go similarity index 72% rename from src/yggdrasil/awdl.go rename to src/yggdrasil/link.go index 633d5f9c..1bbd0c34 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/link.go @@ -11,34 +11,35 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/util" ) -type awdl struct { +type link struct { core *Core mutex sync.RWMutex // protects interfaces below - interfaces map[string]*awdlInterface + interfaces map[string]*linkInterface } -type awdlInterface struct { - awdl *awdl - fromAWDL chan []byte - toAWDL chan []byte +type linkInterface struct { + link *link + fromlink chan []byte + tolink chan []byte shutdown chan bool peer *peer + stream stream } -func (l *awdl) init(c *Core) error { +func (l *link) init(c *Core) error { l.core = c l.mutex.Lock() - l.interfaces = make(map[string]*awdlInterface) + l.interfaces = make(map[string]*linkInterface) l.mutex.Unlock() return nil } -func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte /*boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey*/, name string) (*awdlInterface, error) { - intf := awdlInterface{ - awdl: l, - fromAWDL: fromAWDL, - toAWDL: toAWDL, +func (l *link) create(fromlink chan []byte, tolink chan []byte /*boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey*/, name string) (*linkInterface, error) { + intf := linkInterface{ + link: l, + fromlink: fromlink, + tolink: tolink, shutdown: make(chan bool), } l.mutex.Lock() @@ -50,35 +51,29 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte /*boxPubKey *cryp meta.sig = l.core.sigPub meta.link = *myLinkPub metaBytes := meta.encode() - l.core.log.Println("toAWDL <- metaBytes") - toAWDL <- metaBytes - l.core.log.Println("metaBytes = <-fromAWDL") - metaBytes = <-fromAWDL - l.core.log.Println("version_metadata{}") + tolink <- metaBytes + metaBytes = <-fromlink meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { return nil, errors.New("Metadata decode failure") } - l.core.log.Println("version_getBaseMetadata{}") base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { return nil, errors.New("Failed to connect to node: " + name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) } - l.core.log.Println("crypto.GetSharedKey") shared := crypto.GetSharedKey(myLinkPriv, &meta.link) //shared := crypto.GetSharedKey(&l.core.boxPriv, boxPubKey) - l.core.log.Println("l.core.peers.newPeer") intf.peer = l.core.peers.newPeer(&meta.box, &meta.sig, shared, name) if intf.peer != nil { intf.peer.linkOut = make(chan []byte, 1) // protocol traffic intf.peer.out = func(msg []byte) { defer func() { recover() }() - intf.toAWDL <- msg + intf.tolink <- msg } // called by peer.sendPacket() l.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle intf.peer.close = func() { - close(intf.fromAWDL) - close(intf.toAWDL) + close(intf.fromlink) + close(intf.tolink) } go intf.handler() go intf.peer.linkLoop() @@ -88,7 +83,7 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte /*boxPubKey *cryp return nil, errors.New("l.core.peers.newPeer failed") } -func (l *awdl) getInterface(identity string) *awdlInterface { +func (l *link) getInterface(identity string) *linkInterface { l.mutex.RLock() defer l.mutex.RUnlock() if intf, ok := l.interfaces[identity]; ok { @@ -97,7 +92,7 @@ func (l *awdl) getInterface(identity string) *awdlInterface { return nil } -func (l *awdl) shutdown(identity string) error { +func (l *link) shutdown(identity string) error { if intf, ok := l.interfaces[identity]; ok { intf.shutdown <- true l.core.peers.removePeer(intf.peer.port) @@ -110,9 +105,9 @@ func (l *awdl) shutdown(identity string) error { } } -func (ai *awdlInterface) handler() { +func (ai *linkInterface) handler() { send := func(msg []byte) { - ai.toAWDL <- msg + ai.tolink <- msg atomic.AddUint64(&ai.peer.bytesSent, uint64(len(msg))) util.PutBytes(msg) } @@ -138,9 +133,9 @@ func (ai *awdlInterface) handler() { case p := <-ai.peer.linkOut: send(p) continue - case r := <-ai.fromAWDL: + case r := <-ai.fromlink: ai.peer.handlePacket(r) - ai.awdl.core.switchTable.idleIn <- ai.peer.port + ai.link.core.switchTable.idleIn <- ai.peer.port case <-ai.shutdown: return } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go new file mode 100644 index 00000000..5d67aceb --- /dev/null +++ b/src/yggdrasil/stream.go @@ -0,0 +1,111 @@ +package yggdrasil + +import ( + "errors" + "fmt" + + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) + +type stream struct { + buffer []byte + cursor int +} + +const streamMsgSize = 2048 + 65535 + +var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" + +func (s *stream) init() { + s.buffer = make([]byte, 2*streamMsgSize) + s.cursor = 0 +} + +// This reads from the channel into a []byte buffer for incoming messages. It +// copies completed messages out of the cache into a new slice, and passes them +// to the peer struct via the provided `in func([]byte)` argument. Then it +// shifts the incomplete fragments of data forward so future reads won't +// overwrite it. +func (s *stream) write(bs []byte, in func([]byte)) error { + frag := s.buffer[:0] + if n := len(bs); n > 0 { + frag = append(frag, bs[:n]...) + msg, ok, err2 := stream_chopMsg(&frag) + if err2 != nil { + return fmt.Errorf("message error: %v", err2) + } + if !ok { + // We didn't get the whole message yet + return nil + } + newMsg := append(util.GetBytes(), msg...) + in(newMsg) + util.Yield() + } + return nil +} + +// This takes a pointer to a slice as an argument. It checks if there's a +// complete message and, if so, slices out those parts and returns the message, +// true, and nil. If there's no error, but also no complete message, it returns +// nil, false, and nil. If there's an error, it returns nil, false, and the +// error, which the reader then handles (currently, by returning from the +// reader, which causes the connection to close). +func stream_chopMsg(bs *[]byte) ([]byte, bool, error) { + // Returns msg, ok, err + if len(*bs) < len(streamMsg) { + return nil, false, nil + } + for idx := range streamMsg { + if (*bs)[idx] != streamMsg[idx] { + return nil, false, errors.New("bad message") + } + } + msgLen, msgLenLen := wire_decode_uint64((*bs)[len(streamMsg):]) + if msgLen > streamMsgSize { + return nil, false, errors.New("oversized message") + } + msgBegin := len(streamMsg) + msgLenLen + msgEnd := msgBegin + int(msgLen) + if msgLenLen == 0 || len(*bs) < msgEnd { + // We don't have the full message + // Need to buffer this and wait for the rest to come in + return nil, false, nil + } + msg := (*bs)[msgBegin:msgEnd] + (*bs) = (*bs)[msgEnd:] + return msg, true, nil +} + +/* +func (s *stream) chopMsg() ([]byte, bool, error) { + // Returns msg, ok, err + if len(s.buffer) < len(streamMsg) { + fmt.Println("*** too short") + return nil, false, nil + } + for idx := range streamMsg { + if s.buffer[idx] != streamMsg[idx] { + fmt.Println("*** bad message") + return nil, false, errors.New("bad message") + } + } + msgLen, msgLenLen := wire_decode_uint64((s.buffer)[len(streamMsg):]) + if msgLen > streamMsgSize { + fmt.Println("*** oversized message") + return nil, false, errors.New("oversized message") + } + msgBegin := len(streamMsg) + msgLenLen + msgEnd := msgBegin + int(msgLen) + if msgLenLen == 0 || len(s.buffer) < msgEnd { + // We don't have the full message + // Need to buffer this and wait for the rest to come in + fmt.Println("*** still waiting") + return nil, false, nil + } + msg := s.buffer[msgBegin:msgEnd] + s.buffer = s.buffer[msgEnd:] + fmt.Println("*** done") + return msg, true, nil +} +*/ diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 78e39efc..682796ec 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -16,9 +16,7 @@ package yggdrasil import ( "context" - "errors" "fmt" - "io" "math/rand" "net" "sync" @@ -32,21 +30,21 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/util" ) -const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense -const default_tcp_timeout = 6 * time.Second -const tcp_ping_interval = (default_tcp_timeout * 2 / 3) +const default_timeout = 6 * time.Second +const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core reconfigure chan chan error serv net.Listener - serv_stop chan bool - tcp_timeout time.Duration - tcp_addr string + stop chan bool + timeout time.Duration + addr string mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) + stream stream } // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. @@ -86,7 +84,7 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { // Initializes the struct. func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core - iface.serv_stop = make(chan bool, 1) + iface.stop = make(chan bool, 1) iface.reconfigure = make(chan chan error, 1) go func() { for { @@ -95,7 +93,7 @@ func (iface *tcpInterface) init(core *Core) (err error) { updated := iface.core.config.Listen != iface.core.configOld.Listen iface.core.configMutex.RUnlock() if updated { - iface.serv_stop <- true + iface.stop <- true iface.serv.Close() e <- iface.listen() } else { @@ -111,19 +109,19 @@ func (iface *tcpInterface) listen() error { var err error iface.core.configMutex.RLock() - iface.tcp_addr = iface.core.config.Listen - iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond + iface.addr = iface.core.config.Listen + iface.timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond iface.core.configMutex.RUnlock() - if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { - iface.tcp_timeout = default_tcp_timeout + if iface.timeout >= 0 && iface.timeout < default_timeout { + iface.timeout = default_timeout } ctx := context.Background() lc := net.ListenConfig{ Control: iface.tcpContext, } - iface.serv, err = lc.Listen(ctx, "tcp", iface.tcp_addr) + iface.serv, err = lc.Listen(ctx, "tcp", iface.addr) if err == nil { iface.mutex.Lock() iface.calls = make(map[string]struct{}) @@ -147,7 +145,7 @@ func (iface *tcpInterface) listener() { return } select { - case <-iface.serv_stop: + case <-iface.stop: iface.core.log.Println("Stopping listener") return default: @@ -194,7 +192,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { iface.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) + time.Sleep(default_timeout) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) iface.mutex.Lock() delete(iface.calls, callname) @@ -299,8 +297,8 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { if err != nil { return } - if iface.tcp_timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout)) + if iface.timeout > 0 { + sock.SetReadDeadline(time.Now().Add(iface.timeout)) } _, err = sock.Read(metaBytes) if err != nil { @@ -389,9 +387,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic send := func(msg []byte) { msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf := net.Buffers{streamMsg[:], msgLen, msg} buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + atomic.AddUint64(&p.bytesSent, uint64(len(streamMsg)+len(msgLen)+len(msg))) util.PutBytes(msg) } timerInterval := tcp_ping_interval @@ -445,7 +443,21 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - err = iface.reader(sock, in) // In this goroutine, because of defers + iface.stream.init() + bs := make([]byte, 2*streamMsgSize) + var n int + for { + if iface.timeout > 0 { + sock.SetReadDeadline(time.Now().Add(iface.timeout)) + } + n, err = sock.Read(bs) + if err != nil { + break + } + if n > 0 { + iface.stream.write(bs[:n], in) + } + } if err == nil { iface.core.log.Printf("Disconnected: %s, source: %s", themString, us) } else { @@ -453,75 +465,3 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } return } - -// This reads from the socket into a []byte buffer for incomping messages. -// It copies completed messages out of the cache into a new slice, and passes them to the peer struct via the provided `in func([]byte)` argument. -// Then it shifts the incomplete fragments of data forward so future reads won't overwrite it. -func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) error { - bs := make([]byte, 2*tcp_msgSize) - frag := bs[:0] - for { - if iface.tcp_timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout)) - } - n, err := sock.Read(bs[len(frag):]) - if n > 0 { - frag = bs[:len(frag)+n] - for { - msg, ok, err2 := tcp_chop_msg(&frag) - if err2 != nil { - return fmt.Errorf("Message error: %v", err2) - } - if !ok { - // We didn't get the whole message yet - break - } - newMsg := append(util.GetBytes(), msg...) - in(newMsg) - util.Yield() - } - frag = append(bs[:0], frag...) - } - if err != nil || n == 0 { - if err != io.EOF { - return err - } - return nil - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - -// These are 4 bytes of padding used to catch if something went horribly wrong with the tcp connection. -var tcp_msg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" - -// This takes a pointer to a slice as an argument. -// It checks if there's a complete message and, if so, slices out those parts and returns the message, true, and nil. -// If there's no error, but also no complete message, it returns nil, false, and nil. -// If there's an error, it returns nil, false, and the error, which the reader then handles (currently, by returning from the reader, which causes the connection to close). -func tcp_chop_msg(bs *[]byte) ([]byte, bool, error) { - // Returns msg, ok, err - if len(*bs) < len(tcp_msg) { - return nil, false, nil - } - for idx := range tcp_msg { - if (*bs)[idx] != tcp_msg[idx] { - return nil, false, errors.New("Bad message!") - } - } - msgLen, msgLenLen := wire_decode_uint64((*bs)[len(tcp_msg):]) - if msgLen > tcp_msgSize { - return nil, false, errors.New("Oversized message!") - } - msgBegin := len(tcp_msg) + msgLenLen - msgEnd := msgBegin + int(msgLen) - if msgLenLen == 0 || len(*bs) < msgEnd { - // We don't have the full message - // Need to buffer this and wait for the rest to come in - return nil, false, nil - } - msg := (*bs)[msgBegin:msgEnd] - (*bs) = (*bs)[msgEnd:] - return msg, true, nil -} From c51a3340b14402d496082bcd29fef5ee91a3acbf Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 19 Jan 2019 00:42:53 +0000 Subject: [PATCH 03/27] Update awdl.go to use new link stuff (untested) --- src/yggdrasil/awdl.go | 86 +++++++++++++++++++++++++++++++++++++++++++ src/yggdrasil/core.go | 13 +++---- src/yggdrasil/link.go | 4 +- 3 files changed, 94 insertions(+), 9 deletions(-) create mode 100644 src/yggdrasil/awdl.go diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go new file mode 100644 index 00000000..43db934a --- /dev/null +++ b/src/yggdrasil/awdl.go @@ -0,0 +1,86 @@ +package yggdrasil + +import ( + "fmt" + "sync" +) + +type awdl struct { + core *Core + mutex sync.RWMutex // protects interfaces below + interfaces map[string]*awdlInterface +} + +type awdlInterface struct { + awdl *awdl + fromAWDL chan []byte + toAWDL chan []byte + shutdown chan bool + peer *peer + link *linkInterface +} + +func (l *awdl) init(c *Core) error { + l.core = c + l.mutex.Lock() + l.interfaces = make(map[string]*awdlInterface) + l.mutex.Unlock() + + return nil +} + +func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*awdlInterface, error) { + link, err := l.core.link.create(fromAWDL, toAWDL, name) + if err != nil { + return nil, err + } + intf := awdlInterface{ + awdl: l, + link: link, + fromAWDL: fromAWDL, + toAWDL: toAWDL, + shutdown: make(chan bool), + } + l.mutex.Lock() + l.interfaces[name] = &intf + l.mutex.Unlock() + return &intf, nil +} + +func (l *awdl) getInterface(identity string) *awdlInterface { + l.mutex.RLock() + defer l.mutex.RUnlock() + if intf, ok := l.interfaces[identity]; ok { + return intf + } + return nil +} + +func (l *awdl) shutdown(identity string) error { + if err := l.core.link.shutdown(identity); err != nil { + return err + } + if intf, ok := l.interfaces[identity]; ok { + intf.shutdown <- true + l.mutex.Lock() + delete(l.interfaces, identity) + l.mutex.Unlock() + return nil + } + return fmt.Errorf("interface '%s' doesn't exist or already shutdown", identity) +} + +func (ai *awdlInterface) handler() { + for { + select { + case <-ai.shutdown: + return + case <-ai.link.shutdown: + return + case in := <-ai.fromAWDL: + ai.link.fromlink <- in + case out := <-ai.link.tolink: + ai.toAWDL <- out + } + } +} diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index fa5d02ad..1f26b69a 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -44,7 +44,8 @@ type Core struct { searches searches multicast multicast tcp tcpInterface - link link + link link // TODO: not sure if this wants to be here? + awdl awdl log *log.Logger } @@ -198,12 +199,10 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - /* - if err := c.awdl.init(c); err != nil { - c.log.Println("Failed to start AWDL interface") - return err - } - */ + if err := c.awdl.init(c); err != nil { + c.log.Println("Failed to start AWDL interface") + return err + } if nc.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize { c.switchTable.queueTotalMaxSize = nc.SwitchOptions.MaxTotalQueueSize diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 1bbd0c34..386fef88 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -35,7 +35,7 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(fromlink chan []byte, tolink chan []byte /*boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey*/, name string) (*linkInterface, error) { +func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) { intf := linkInterface{ link: l, fromlink: fromlink, @@ -101,7 +101,7 @@ func (l *link) shutdown(identity string) error { l.mutex.Unlock() return nil } else { - return errors.New(fmt.Sprintf("Interface '%s' doesn't exist or already shutdown", identity)) + return fmt.Errorf("interface '%s' doesn't exist or already shutdown", identity) } } From 41a410f2a1d019fc061efa3b265e201844bae967 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 19 Jan 2019 12:19:24 +0000 Subject: [PATCH 04/27] Initialise awdl.go from link.go, remove deadlock between awdl.create and link.create, other bits and pieces --- src/yggdrasil/awdl.go | 8 ++++- src/yggdrasil/core.go | 4 +-- src/yggdrasil/link.go | 75 ++++++++++++++++++++++++++--------------- src/yggdrasil/stream.go | 35 ------------------- 4 files changed, 57 insertions(+), 65 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 43db934a..4c4d83ca 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -18,6 +18,7 @@ type awdlInterface struct { shutdown chan bool peer *peer link *linkInterface + stream stream } func (l *awdl) init(c *Core) error { @@ -41,6 +42,8 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a toAWDL: toAWDL, shutdown: make(chan bool), } + intf.stream.init() + go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() @@ -71,6 +74,9 @@ func (l *awdl) shutdown(identity string) error { } func (ai *awdlInterface) handler() { + inPacket := func(packet []byte) { + ai.link.fromlink <- packet + } for { select { case <-ai.shutdown: @@ -78,7 +84,7 @@ func (ai *awdlInterface) handler() { case <-ai.link.shutdown: return case in := <-ai.fromAWDL: - ai.link.fromlink <- in + ai.stream.write(in, inPacket) case out := <-ai.link.tolink: ai.toAWDL <- out } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 1f26b69a..70905619 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -199,8 +199,8 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.awdl.init(c); err != nil { - c.log.Println("Failed to start AWDL interface") + if err := c.link.init(c); err != nil { + c.log.Println("Failed to start link interfaces") return err } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 386fef88..32b5ea70 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -18,6 +18,7 @@ type link struct { } type linkInterface struct { + name string link *link fromlink chan []byte tolink chan []byte @@ -32,55 +33,75 @@ func (l *link) init(c *Core) error { l.interfaces = make(map[string]*linkInterface) l.mutex.Unlock() + if err := l.core.awdl.init(c); err != nil { + l.core.log.Println("Failed to start AWDL interface") + return err + } + return nil } func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + if _, ok := l.interfaces[name]; ok { + return nil, errors.New("Interface with this name already exists") + } intf := linkInterface{ + name: name, link: l, fromlink: fromlink, tolink: tolink, shutdown: make(chan bool), } - l.mutex.Lock() - l.interfaces[name] = &intf - l.mutex.Unlock() + l.interfaces[intf.name] = &intf + go intf.start() + return &intf, nil +} + +func (intf *linkInterface) start() { myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() - meta.box = l.core.boxPub - meta.sig = l.core.sigPub + meta.box = intf.link.core.boxPub + meta.sig = intf.link.core.sigPub meta.link = *myLinkPub metaBytes := meta.encode() - tolink <- metaBytes - metaBytes = <-fromlink + //intf.link.core.log.Println("start: intf.tolink <- metaBytes") + intf.tolink <- metaBytes + //intf.link.core.log.Println("finish: intf.tolink <- metaBytes") + //intf.link.core.log.Println("start: metaBytes = <-intf.fromlink") + metaBytes = <-intf.fromlink + //intf.link.core.log.Println("finish: metaBytes = <-intf.fromlink") meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { - return nil, errors.New("Metadata decode failure") + intf.link.core.log.Println("Metadata decode failure") + return } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - return nil, errors.New("Failed to connect to node: " + name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + return } shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - //shared := crypto.GetSharedKey(&l.core.boxPriv, boxPubKey) - intf.peer = l.core.peers.newPeer(&meta.box, &meta.sig, shared, name) - if intf.peer != nil { - intf.peer.linkOut = make(chan []byte, 1) // protocol traffic - intf.peer.out = func(msg []byte) { - defer func() { recover() }() - intf.tolink <- msg - } // called by peer.sendPacket() - l.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - intf.peer.close = func() { - close(intf.fromlink) - close(intf.tolink) - } - go intf.handler() - go intf.peer.linkLoop() - return &intf, nil + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + if intf.peer == nil { + intf.link.mutex.Lock() + delete(intf.link.interfaces, intf.name) + intf.link.mutex.Unlock() + return } - delete(l.interfaces, name) - return nil, errors.New("l.core.peers.newPeer failed") + intf.peer.linkOut = make(chan []byte, 1) // protocol traffic + intf.peer.out = func(msg []byte) { + defer func() { recover() }() + intf.tolink <- msg + } // called by peer.sendPacket() + intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + intf.peer.close = func() { + close(intf.fromlink) + close(intf.tolink) + } + go intf.handler() + go intf.peer.linkLoop() } func (l *link) getInterface(identity string) *linkInterface { diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 5d67aceb..a4e84c9e 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -9,7 +9,6 @@ import ( type stream struct { buffer []byte - cursor int } const streamMsgSize = 2048 + 65535 @@ -18,7 +17,6 @@ var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" func (s *stream) init() { s.buffer = make([]byte, 2*streamMsgSize) - s.cursor = 0 } // This reads from the channel into a []byte buffer for incoming messages. It @@ -76,36 +74,3 @@ func stream_chopMsg(bs *[]byte) ([]byte, bool, error) { (*bs) = (*bs)[msgEnd:] return msg, true, nil } - -/* -func (s *stream) chopMsg() ([]byte, bool, error) { - // Returns msg, ok, err - if len(s.buffer) < len(streamMsg) { - fmt.Println("*** too short") - return nil, false, nil - } - for idx := range streamMsg { - if s.buffer[idx] != streamMsg[idx] { - fmt.Println("*** bad message") - return nil, false, errors.New("bad message") - } - } - msgLen, msgLenLen := wire_decode_uint64((s.buffer)[len(streamMsg):]) - if msgLen > streamMsgSize { - fmt.Println("*** oversized message") - return nil, false, errors.New("oversized message") - } - msgBegin := len(streamMsg) + msgLenLen - msgEnd := msgBegin + int(msgLen) - if msgLenLen == 0 || len(s.buffer) < msgEnd { - // We don't have the full message - // Need to buffer this and wait for the rest to come in - fmt.Println("*** still waiting") - return nil, false, nil - } - msg := s.buffer[msgBegin:msgEnd] - s.buffer = s.buffer[msgEnd:] - fmt.Println("*** done") - return msg, true, nil -} -*/ From c8e1be0f73255dddbc5ac5845c0ba2c1edc6666a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 19 Jan 2019 16:37:45 -0600 Subject: [PATCH 05/27] link/stream refactoring bugfixes and gofmt --- contrib/ansible/genkeys.go | 20 ++++++++++---------- src/yggdrasil/awdl.go | 10 +++++----- src/yggdrasil/stream.go | 21 +++++++++++---------- src/yggdrasil/tcp.go | 7 ++----- 4 files changed, 28 insertions(+), 30 deletions(-) diff --git a/contrib/ansible/genkeys.go b/contrib/ansible/genkeys.go index 22418a0c..81397386 100644 --- a/contrib/ansible/genkeys.go +++ b/contrib/ansible/genkeys.go @@ -35,22 +35,22 @@ func main() { } var encryptionKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { encryptionKeys = append(encryptionKeys, newBoxKey()) } encryptionKeys = sortKeySetArray(encryptionKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - encryptionKeys[0] = newBoxKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + encryptionKeys[0] = newBoxKey() encryptionKeys = bubbleUpTo(encryptionKeys, 0) } var signatureKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { signatureKeys = append(signatureKeys, newSigKey()) } signatureKeys = sortKeySetArray(signatureKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - signatureKeys[0] = newSigKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + signatureKeys[0] = newSigKey() signatureKeys = bubbleUpTo(signatureKeys, 0) } @@ -112,11 +112,11 @@ func sortKeySetArray(sets []keySet) []keySet { } func bubbleUpTo(sets []keySet, num int) []keySet { - for i := 0; i < len(sets) - num - 1; i++ { - if isBetter(sets[i + 1].id, sets[i].id) { + for i := 0; i < len(sets)-num-1; i++ { + if isBetter(sets[i+1].id, sets[i].id) { var tmp = sets[i] - sets[i] = sets[i + 1] - sets[i + 1] = tmp + sets[i] = sets[i+1] + sets[i+1] = tmp } } return sets diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 4c4d83ca..573b6e72 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -42,7 +42,10 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a toAWDL: toAWDL, shutdown: make(chan bool), } - intf.stream.init() + inPacket := func(packet []byte) { + intf.link.fromlink <- packet + } + intf.stream.init(inPacket) go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf @@ -74,9 +77,6 @@ func (l *awdl) shutdown(identity string) error { } func (ai *awdlInterface) handler() { - inPacket := func(packet []byte) { - ai.link.fromlink <- packet - } for { select { case <-ai.shutdown: @@ -84,7 +84,7 @@ func (ai *awdlInterface) handler() { case <-ai.link.shutdown: return case in := <-ai.fromAWDL: - ai.stream.write(in, inPacket) + ai.stream.handleInput(in) case out := <-ai.link.tolink: ai.toAWDL <- out } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index a4e84c9e..43eff3ff 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -8,15 +8,16 @@ import ( ) type stream struct { - buffer []byte + inputBuffer []byte + handlePacket func([]byte) } const streamMsgSize = 2048 + 65535 var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" -func (s *stream) init() { - s.buffer = make([]byte, 2*streamMsgSize) +func (s *stream) init(in func([]byte)) { + s.handlePacket = in } // This reads from the channel into a []byte buffer for incoming messages. It @@ -24,11 +25,10 @@ func (s *stream) init() { // to the peer struct via the provided `in func([]byte)` argument. Then it // shifts the incomplete fragments of data forward so future reads won't // overwrite it. -func (s *stream) write(bs []byte, in func([]byte)) error { - frag := s.buffer[:0] - if n := len(bs); n > 0 { - frag = append(frag, bs[:n]...) - msg, ok, err2 := stream_chopMsg(&frag) +func (s *stream) handleInput(bs []byte) error { + if len(bs) > 0 { + s.inputBuffer = append(s.inputBuffer, bs...) + msg, ok, err2 := stream_chopMsg(&s.inputBuffer) if err2 != nil { return fmt.Errorf("message error: %v", err2) } @@ -37,8 +37,9 @@ func (s *stream) write(bs []byte, in func([]byte)) error { return nil } newMsg := append(util.GetBytes(), msg...) - in(newMsg) - util.Yield() + s.inputBuffer = append(s.inputBuffer[:0], s.inputBuffer...) + s.handlePacket(newMsg) + util.Yield() // Make sure we give up control to the scheduler } return nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 682796ec..1d4ec994 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -378,9 +378,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // E.g. over different interfaces p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) - in := func(bs []byte) { - p.handlePacket(bs) - } out := make(chan []byte, 1) defer close(out) go func() { @@ -443,7 +440,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init() + iface.stream.init(p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for { @@ -455,7 +452,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { break } if n > 0 { - iface.stream.write(bs[:n], in) + iface.stream.handleInput(bs[:n]) } } if err == nil { From 5a4d6481ddad8161061d396e2f4fa3d1dc0f14ff Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 21 Jan 2019 21:27:52 -0600 Subject: [PATCH 06/27] Work in progress, add a linkInterfaceMsgIO interface type and make stream implement it, this will be used by link --- src/yggdrasil/awdl.go | 2 +- src/yggdrasil/link.go | 9 ++++ src/yggdrasil/stream.go | 103 ++++++++++++++++++++++++++++++++++++++-- src/yggdrasil/tcp.go | 2 +- 4 files changed, 110 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 573b6e72..7207b22e 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -45,7 +45,7 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a inPacket := func(packet []byte) { intf.link.fromlink <- packet } - intf.stream.init(inPacket) + intf.stream.init(nil, inPacket) // FIXME nil = ReadWriteCloser go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 32b5ea70..423a968f 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -17,6 +17,15 @@ type link struct { interfaces map[string]*linkInterface } +type linkInterfaceMsgIO interface { + readMsg() ([]byte, error) + writeMsg([]byte) (int, error) + close() error + // These are temporary workarounds to stream semantics + _sendMetaBytes([]byte) error + _recvMetaBytes() ([]byte, error) +} + type linkInterface struct { name string link *link diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 43eff3ff..ecfa245f 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -3,21 +3,115 @@ package yggdrasil import ( "errors" "fmt" + "io" "github.com/yggdrasil-network/yggdrasil-go/src/util" ) +// Test that this matches the interface we expect +var _ = linkInterfaceMsgIO(&stream{}) + type stream struct { - inputBuffer []byte + rwc io.ReadWriteCloser + inputBuffer []byte // Incoming packet stream + didFirstSend bool // Used for metadata exchange + didFirstRecv bool // Used for metadata exchange + // TODO remove the rest, it shouldn't matter in the long run handlePacket func([]byte) } +func (s *stream) close() error { + return s.rwc.Close() +} + const streamMsgSize = 2048 + 65535 var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" -func (s *stream) init(in func([]byte)) { +func (s *stream) init(rwc io.ReadWriteCloser, in func([]byte)) { + // TODO have this also do the metadata handshake and create the peer struct + s.rwc = rwc s.handlePacket = in + + // TODO call something to do the metadata exchange +} + +// writeMsg writes a message with stream padding, and is *not* thread safe. +func (s *stream) writeMsg(bs []byte) (int, error) { + buf := util.GetBytes() + defer util.PutBytes(buf) + buf = append(buf, streamMsg[:]...) + buf = append(buf, wire_encode_uint64(uint64(len(bs)))...) + padLen := len(buf) + buf = append(buf, bs...) + var bn int + for bn < len(buf) { + n, err := s.rwc.Write(buf[bn:]) + bn += n + if err != nil { + l := bn - padLen + if l < 0 { + l = 0 + } + return l, err + } + } + return len(bs), nil +} + +// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. +func (s *stream) readMsg() ([]byte, error) { + for { + buf := s.inputBuffer + msg, ok, err := stream_chopMsg(&buf) + switch { + case err != nil: + // Something in the stream format is corrupt + return nil, fmt.Errorf("message error: %v", err) + case ok: + // Copy the packet into bs, shift the buffer, and return + msg = append(util.GetBytes(), msg...) + s.inputBuffer = append(s.inputBuffer[:0], buf...) + return msg, nil + default: + // Wait for the underlying reader to return enough info for us to proceed + frag := make([]byte, 2*streamMsgSize) + n, err := s.rwc.Read(frag) + if n > 0 { + s.inputBuffer = append(s.inputBuffer, frag[:n]...) + } else if err != nil { + return nil, err + } + } + } +} + +// Writes metadata bytes without stream padding, meant to be temporary +func (s *stream) _sendMetaBytes(metaBytes []byte) error { + var written int + for written < len(metaBytes) { + n, err := s.rwc.Write(metaBytes) + written += n + if err != nil { + return err + } + } + return nil +} + +// Reads metadata bytes without stream padding, meant to be temporary +func (s *stream) _recvMetaBytes() ([]byte, error) { + var meta version_metadata + frag := meta.encode() + metaBytes := make([]byte, 0, len(frag)) + for len(metaBytes) < len(frag) { + n, err := s.rwc.Read(frag) + if err != nil { + return nil, err + } + metaBytes = append(metaBytes, frag[:n]...) + } + return metaBytes, nil } // This reads from the channel into a []byte buffer for incoming messages. It @@ -28,7 +122,8 @@ func (s *stream) init(in func([]byte)) { func (s *stream) handleInput(bs []byte) error { if len(bs) > 0 { s.inputBuffer = append(s.inputBuffer, bs...) - msg, ok, err2 := stream_chopMsg(&s.inputBuffer) + buf := s.inputBuffer + msg, ok, err2 := stream_chopMsg(&buf) if err2 != nil { return fmt.Errorf("message error: %v", err2) } @@ -37,7 +132,7 @@ func (s *stream) handleInput(bs []byte) error { return nil } newMsg := append(util.GetBytes(), msg...) - s.inputBuffer = append(s.inputBuffer[:0], s.inputBuffer...) + s.inputBuffer = append(s.inputBuffer[:0], buf...) s.handlePacket(newMsg) util.Yield() // Make sure we give up control to the scheduler } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 1d4ec994..975da464 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -440,7 +440,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init(p.handlePacket) + iface.stream.init(sock, p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for { From 137212d8cf1e811c2cf696d37a4ff5104997a9d7 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 21 Jan 2019 23:08:50 -0600 Subject: [PATCH 07/27] work in progress, establishes TCP connections and gets through metadata handshake using the link code, but doesn't seem to send traffic yet (no switch peers are created) --- src/yggdrasil/awdl.go | 4 +- src/yggdrasil/link.go | 129 +++++++++++++++++++++++++++++++++++----- src/yggdrasil/stream.go | 6 +- src/yggdrasil/tcp.go | 23 +++++-- 4 files changed, 137 insertions(+), 25 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 7207b22e..ce8e1d70 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -1,7 +1,7 @@ package yggdrasil import ( - "fmt" + //"fmt" "sync" ) @@ -30,6 +30,7 @@ func (l *awdl) init(c *Core) error { return nil } +/* temporarily disabled while getting the TCP side to work func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*awdlInterface, error) { link, err := l.core.link.create(fromAWDL, toAWDL, name) if err != nil { @@ -90,3 +91,4 @@ func (ai *awdlInterface) handler() { } } } +*/ diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 423a968f..b53242a1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -4,11 +4,11 @@ import ( "errors" "fmt" "sync" - "sync/atomic" + //"sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" + //"github.com/yggdrasil-network/yggdrasil-go/src/util" ) type link struct { @@ -27,13 +27,10 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - fromlink chan []byte - tolink chan []byte - shutdown chan bool - peer *peer - stream stream + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO } func (l *link) init(c *Core) error { @@ -50,24 +47,123 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) { +func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) { l.mutex.Lock() defer l.mutex.Unlock() if _, ok := l.interfaces[name]; ok { return nil, errors.New("Interface with this name already exists") } intf := linkInterface{ - name: name, - link: l, - fromlink: fromlink, - tolink: tolink, - shutdown: make(chan bool), + name: name, + link: l, + msgIO: msgIO, } l.interfaces[intf.name] = &intf - go intf.start() + //go intf.start() return &intf, nil } +func (intf *linkInterface) handler() error { + // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later + myLinkPub, myLinkPriv := crypto.NewBoxKeys() + meta := version_getBaseMetadata() + meta.box = intf.link.core.boxPub + meta.sig = intf.link.core.sigPub + meta.link = *myLinkPub + metaBytes := meta.encode() + // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) + err := intf.msgIO._sendMetaBytes(metaBytes) + if err != nil { + return err + } + metaBytes, err = intf.msgIO._recvMetaBytes() + if err != nil { + return err + } + meta = version_metadata{} + if !meta.decode(metaBytes) || !meta.check() { + return errors.New("failed to decode metadata") + } + base := version_getBaseMetadata() + if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { + intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + return errors.New("failed to connect: wrong version") + } + // FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers. + shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + if intf.peer == nil { + return errors.New("failed to create peer") + } + defer func() { + // More cleanup can go here + intf.link.core.peers.removePeer(intf.peer.port) + }() + // Finish setting up the peer struct + out := make(chan []byte, 1) + defer close(out) + intf.peer.out = func(msg []byte) { + defer func() { recover() }() + out <- msg + } + intf.peer.close = func() { intf.msgIO.close() } + go intf.peer.linkLoop() + // Start the writer + go func() { + interval := 4 * time.Second + timer := time.NewTimer(interval) + clearTimer := func() { + if !timer.Stop() { + <-timer.C + } + } + defer clearTimer() + for { + // First try to send any link protocol traffic + select { + case msg := <-intf.peer.linkOut: + intf.msgIO.writeMsg(msg) + continue + default: + } + // No protocol traffic to send, so reset the timer + clearTimer() + timer.Reset(interval) + // Now block until something is ready or the timer triggers keepalive traffic + select { + case <-timer.C: + intf.msgIO.writeMsg(nil) + case msg := <-intf.peer.linkOut: + intf.msgIO.writeMsg(msg) + case msg, ok := <-out: + if !ok { + return + } + intf.msgIO.writeMsg(msg) + if true { + // TODO *don't* do this if we're not reading any traffic + // In such a case, the reader is responsible for resetting it the next time we read something + intf.link.core.switchTable.idleIn <- intf.peer.port + } + } + } + }() + intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + // Run reader loop + for { + msg, err := intf.msgIO.readMsg() + if len(msg) > 0 { + intf.peer.handlePacket(msg) + } + if err != nil { + return err + } + } + //////////////////////////////////////////////////////////////////////////////// + return nil +} + +/* func (intf *linkInterface) start() { myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() @@ -171,3 +267,4 @@ func (ai *linkInterface) handler() { } } } +*/ diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index ecfa245f..966319aa 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -12,10 +12,8 @@ import ( var _ = linkInterfaceMsgIO(&stream{}) type stream struct { - rwc io.ReadWriteCloser - inputBuffer []byte // Incoming packet stream - didFirstSend bool // Used for metadata exchange - didFirstRecv bool // Used for metadata exchange + rwc io.ReadWriteCloser + inputBuffer []byte // Incoming packet stream // TODO remove the rest, it shouldn't matter in the long run handlePacket func([]byte) } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 975da464..d2d20a71 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -44,7 +44,6 @@ type tcpInterface struct { mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) - stream stream } // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. @@ -281,9 +280,25 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { }() } +func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { + defer sock.Close() + iface.setExtraOptions(sock) + stream := stream{} + stream.init(sock, nil) + name := sock.LocalAddr().String() + sock.RemoteAddr().String() + link, err := iface.core.link.create(&stream, name) + if err != nil { + iface.core.log.Println(err) + panic(err) + } + iface.core.log.Println("DEBUG: starting handler") + link.handler() + iface.core.log.Println("DEBUG: stopped handler") +} + // This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. // It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). -func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { +func (iface *tcpInterface) handler_old(sock net.Conn, incoming bool) { defer sock.Close() iface.setExtraOptions(sock) // Get our keys @@ -440,7 +455,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init(sock, p.handlePacket) + //iface.stream.init(sock, p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for { @@ -452,7 +467,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { break } if n > 0 { - iface.stream.handleInput(bs[:n]) + //iface.stream.handleInput(bs[:n]) } } if err == nil { From f95663e9230d906d30de5b53a5f80cceb150b426 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 22 Jan 2019 18:24:15 -0600 Subject: [PATCH 08/27] actually finish initializing peers --- src/yggdrasil/link.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index b53242a1..ddd7db95 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -106,10 +106,12 @@ func (intf *linkInterface) handler() error { defer func() { recover() }() out <- msg } + intf.peer.linkOut = make(chan []byte, 1) intf.peer.close = func() { intf.msgIO.close() } go intf.peer.linkLoop() // Start the writer go func() { + // TODO util.PutBytes etc. interval := 4 * time.Second timer := time.NewTimer(interval) clearTimer := func() { From 12c0e019dc6114626fe4ed76f6f17e379a576484 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 22 Jan 2019 21:16:41 -0600 Subject: [PATCH 09/27] only create one interface, but still opens duplicate connections before it catches this, so more work is needed --- src/yggdrasil/link.go | 61 +++++++++++++++++++++++++++++++------------ src/yggdrasil/tcp.go | 10 ++++--- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ddd7db95..0953e4ff 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -8,13 +8,21 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - //"github.com/yggdrasil-network/yggdrasil-go/src/util" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) type link struct { core *Core mutex sync.RWMutex // protects interfaces below - interfaces map[string]*linkInterface + interfaces map[linkInfo]*linkInterface +} + +type linkInfo struct { + box crypto.BoxPubKey // Their encryption key + sig crypto.SigPubKey // Their signing key + linkType string // Type of link, e.g. TCP, AWDL + local string // Local name or address + remote string // Remote name or address } type linkInterfaceMsgIO interface { @@ -27,16 +35,18 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + closed chan struct{} } func (l *link) init(c *Core) error { l.core = c l.mutex.Lock() - l.interfaces = make(map[string]*linkInterface) + l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() if err := l.core.awdl.init(c); err != nil { @@ -47,18 +57,19 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) { - l.mutex.Lock() - defer l.mutex.Unlock() - if _, ok := l.interfaces[name]; ok { - return nil, errors.New("Interface with this name already exists") - } +func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string) (*linkInterface, error) { + // Technically anything unique would work for names, but lets pick something human readable, just for debugging intf := linkInterface{ name: name, link: l, msgIO: msgIO, + info: linkInfo{ + linkType: linkType, + local: local, + remote: remote, + }, } - l.interfaces[intf.name] = &intf + //l.interfaces[intf.name] = &intf //go intf.start() return &intf, nil } @@ -89,7 +100,25 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } - // FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers. + // Check if we already have a link to this node + intf.info.box = meta.box + intf.info.sig = meta.sig + intf.link.mutex.Lock() + if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { + intf.link.mutex.Unlock() + // FIXME we should really return an error and let the caller block instead + // That lets them do things like close connections before blocking + intf.link.core.log.Println("DEBUG: found existing interface for", intf.name) + <-oldIntf.closed + return nil + } else { + intf.closed = make(chan struct{}) + intf.link.interfaces[intf.info] = intf + defer close(intf.closed) + intf.link.core.log.Println("DEBUG: registered interface for", intf.name) + } + intf.link.mutex.Unlock() + // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) if intf.peer == nil { @@ -111,7 +140,6 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer go func() { - // TODO util.PutBytes etc. interval := 4 * time.Second timer := time.NewTimer(interval) clearTimer := func() { @@ -142,6 +170,7 @@ func (intf *linkInterface) handler() error { return } intf.msgIO.writeMsg(msg) + util.PutBytes(msg) if true { // TODO *don't* do this if we're not reading any traffic // In such a case, the reader is responsible for resetting it the next time we read something diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index d2d20a71..b2efbbe4 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -285,15 +285,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { iface.setExtraOptions(sock) stream := stream{} stream.init(sock, nil) - name := sock.LocalAddr().String() + sock.RemoteAddr().String() - link, err := iface.core.link.create(&stream, name) + local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) + remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) + name := "tcp://" + sock.RemoteAddr().String() + link, err := iface.core.link.create(&stream, name, "tcp", local, remote) if err != nil { iface.core.log.Println(err) panic(err) } - iface.core.log.Println("DEBUG: starting handler") + iface.core.log.Println("DEBUG: starting handler for", name) link.handler() - iface.core.log.Println("DEBUG: stopped handler") + iface.core.log.Println("DEBUG: stopped handler for", name) } // This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. From eb8951081d167059457e9bfaa57b7cfdc63e6a09 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 22 Jan 2019 21:23:57 -0600 Subject: [PATCH 10/27] fix duplicate connection bug, I think this is also in develop --- src/yggdrasil/tcp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index b2efbbe4..09ed7d0c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -183,7 +183,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if iface.isAlreadyCalling(saddr) { + if iface.isAlreadyCalling(callname) { return } iface.mutex.Lock() From f936151f2dc0de0caed3950c3c3d7f43959db21c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 22 Jan 2019 21:48:43 -0600 Subject: [PATCH 11/27] correctly clean up various things --- src/yggdrasil/link.go | 12 ++++++++++-- src/yggdrasil/tcp.go | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 0953e4ff..660c854a 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -114,7 +114,12 @@ func (intf *linkInterface) handler() error { } else { intf.closed = make(chan struct{}) intf.link.interfaces[intf.info] = intf - defer close(intf.closed) + defer func() { + intf.link.mutex.Lock() + delete(intf.link.interfaces, intf.info) + intf.link.mutex.Unlock() + close(intf.closed) + }() intf.link.core.log.Println("DEBUG: registered interface for", intf.name) } intf.link.mutex.Unlock() @@ -144,7 +149,10 @@ func (intf *linkInterface) handler() error { timer := time.NewTimer(interval) clearTimer := func() { if !timer.Stop() { - <-timer.C + select { + case <-timer.C: + default: + } } } defer clearTimer() diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 09ed7d0c..e1f490e7 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -294,8 +294,8 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { panic(err) } iface.core.log.Println("DEBUG: starting handler for", name) - link.handler() - iface.core.log.Println("DEBUG: stopped handler for", name) + err = link.handler() + iface.core.log.Println("DEBUG: stopped handler for", name, err) } // This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. From 7b2460662d6c4dc4ebf12940e29bff43b8ef4106 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 22 Jan 2019 21:53:39 -0600 Subject: [PATCH 12/27] close the connection before blocking a duplicate link --- src/yggdrasil/link.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 660c854a..1899582a 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -107,8 +107,9 @@ func (intf *linkInterface) handler() error { if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { intf.link.mutex.Unlock() // FIXME we should really return an error and let the caller block instead - // That lets them do things like close connections before blocking + // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. intf.link.core.log.Println("DEBUG: found existing interface for", intf.name) + intf.msgIO.close() <-oldIntf.closed return nil } else { From 9c6cf5068424ede379dd53880648a3b002333fb1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Jan 2019 15:08:19 +0000 Subject: [PATCH 13/27] Adapt AWDL to link --- src/yggdrasil/awdl.go | 79 ++++++++++++++++++------------------- src/yggdrasil/link.go | 6 +++ src/yggdrasil/mobile_ios.go | 20 +++------- 3 files changed, 51 insertions(+), 54 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index ce8e1d70..8df98cc3 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -1,7 +1,7 @@ package yggdrasil import ( - //"fmt" + "errors" "sync" ) @@ -12,13 +12,31 @@ type awdl struct { } type awdlInterface struct { - awdl *awdl + link *linkInterface + rwc awdlReadWriteCloser + peer *peer + stream stream +} + +type awdlReadWriteCloser struct { fromAWDL chan []byte toAWDL chan []byte - shutdown chan bool - peer *peer - link *linkInterface - stream stream +} + +func (c awdlReadWriteCloser) Read(p []byte) (n int, err error) { + p = <-c.fromAWDL + return len(p), nil +} + +func (c awdlReadWriteCloser) Write(p []byte) (n int, err error) { + c.toAWDL <- p + return len(p), nil +} + +func (c awdlReadWriteCloser) Close() error { + close(c.fromAWDL) + close(c.toAWDL) + return nil } func (l *awdl) init(c *Core) error { @@ -30,27 +48,26 @@ func (l *awdl) init(c *Core) error { return nil } -/* temporarily disabled while getting the TCP side to work -func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*awdlInterface, error) { - link, err := l.core.link.create(fromAWDL, toAWDL, name) +func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name, local, remote string) (*awdlInterface, error) { + rwc := awdlReadWriteCloser{ + fromAWDL: fromAWDL, + toAWDL: toAWDL, + } + s := stream{} + s.init(rwc, nil) + link, err := l.core.link.create(&s, name, "awdl", local, remote) if err != nil { return nil, err } intf := awdlInterface{ - awdl: l, - link: link, - fromAWDL: fromAWDL, - toAWDL: toAWDL, - shutdown: make(chan bool), + link: link, + rwc: rwc, } - inPacket := func(packet []byte) { - intf.link.fromlink <- packet - } - intf.stream.init(nil, inPacket) // FIXME nil = ReadWriteCloser - go intf.handler() + intf.stream.init(intf.rwc, nil) l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() + go link.handler() return &intf, nil } @@ -64,31 +81,13 @@ func (l *awdl) getInterface(identity string) *awdlInterface { } func (l *awdl) shutdown(identity string) error { - if err := l.core.link.shutdown(identity); err != nil { - return err - } if intf, ok := l.interfaces[identity]; ok { - intf.shutdown <- true + close(intf.link.closed) + intf.rwc.Close() l.mutex.Lock() delete(l.interfaces, identity) l.mutex.Unlock() return nil } - return fmt.Errorf("interface '%s' doesn't exist or already shutdown", identity) + return errors.New("Interface not found or already closed") } - -func (ai *awdlInterface) handler() { - for { - select { - case <-ai.shutdown: - return - case <-ai.link.shutdown: - return - case in := <-ai.fromAWDL: - ai.stream.handleInput(in) - case out := <-ai.link.tolink: - ai.toAWDL <- out - } - } -} -*/ diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 1899582a..c056e6bc 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -87,10 +87,12 @@ func (intf *linkInterface) handler() error { if err != nil { return err } + intf.link.core.log.Println("Sent my metadata") metaBytes, err = intf.msgIO._recvMetaBytes() if err != nil { return err } + intf.link.core.log.Println("Received their metadata") meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { return errors.New("failed to decode metadata") @@ -100,6 +102,7 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } + intf.link.core.log.Println("Do we have a link already?") // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig @@ -124,6 +127,7 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("DEBUG: registered interface for", intf.name) } intf.link.mutex.Unlock() + intf.link.core.log.Println("Create peer") // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) @@ -145,6 +149,7 @@ func (intf *linkInterface) handler() error { intf.peer.close = func() { intf.msgIO.close() } go intf.peer.linkLoop() // Start the writer + intf.link.core.log.Println("Start writer") go func() { interval := 4 * time.Second timer := time.NewTimer(interval) @@ -190,6 +195,7 @@ func (intf *linkInterface) handler() error { }() intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Run reader loop + intf.link.core.log.Println("Start reader") for { msg, err := intf.msgIO.readMsg() if len(msg) > 0 { diff --git a/src/yggdrasil/mobile_ios.go b/src/yggdrasil/mobile_ios.go index 72920fe0..e864c889 100644 --- a/src/yggdrasil/mobile_ios.go +++ b/src/yggdrasil/mobile_ios.go @@ -29,22 +29,14 @@ func (nsl MobileLogger) Write(p []byte) (n int, err error) { return len(p), nil } -func (c *Core) AWDLCreateInterface(name string) error { +func (c *Core) AWDLCreateInterface(name, local, remote string) error { fromAWDL := make(chan []byte, 32) toAWDL := make(chan []byte, 32) - - if intf, err := c.awdl.create(fromAWDL, toAWDL, name); err == nil { - if intf != nil { - c.log.Println(err) - return err - } else { - c.log.Println("c.awdl.create didn't return an interface") - return errors.New("c.awdl.create didn't return an interface") - } - } else { - c.log.Println(err) + if intf, err := c.awdl.create(fromAWDL, toAWDL, name, local, remote); err != nil || intf == nil { + c.log.Println("c.awdl.create:", err) return err } + return nil } func (c *Core) AWDLShutdownInterface(name string) error { @@ -53,7 +45,7 @@ func (c *Core) AWDLShutdownInterface(name string) error { func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { if intf := c.awdl.getInterface(identity); intf != nil { - return <-intf.toAWDL, nil + return <-intf.rwc.toAWDL, nil } return nil, errors.New("AWDLRecvPacket identity not known: " + identity) } @@ -61,7 +53,7 @@ func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { func (c *Core) AWDLSendPacket(identity string, buf []byte) error { packet := append(util.GetBytes(), buf[:]...) if intf := c.awdl.getInterface(identity); intf != nil { - intf.fromAWDL <- packet + intf.rwc.fromAWDL <- packet return nil } return errors.New("AWDLSendPacket identity not known: " + identity) From 81545fd9bfc2d05cd7144ede8e0d231c62c1668f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Jan 2019 15:16:22 +0000 Subject: [PATCH 14/27] Clean up --- src/yggdrasil/awdl.go | 3 +- src/yggdrasil/link.go | 106 ----------------------- src/yggdrasil/stream.go | 31 +------ src/yggdrasil/tcp.go | 187 +--------------------------------------- 4 files changed, 3 insertions(+), 324 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 8df98cc3..ba3131ff 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -54,7 +54,7 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name, local, rem toAWDL: toAWDL, } s := stream{} - s.init(rwc, nil) + s.init(rwc) link, err := l.core.link.create(&s, name, "awdl", local, remote) if err != nil { return nil, err @@ -63,7 +63,6 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name, local, rem link: link, rwc: rwc, } - intf.stream.init(intf.rwc, nil) l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index c056e6bc..e763fb1e 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -208,109 +208,3 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// return nil } - -/* -func (intf *linkInterface) start() { - myLinkPub, myLinkPriv := crypto.NewBoxKeys() - meta := version_getBaseMetadata() - meta.box = intf.link.core.boxPub - meta.sig = intf.link.core.sigPub - meta.link = *myLinkPub - metaBytes := meta.encode() - //intf.link.core.log.Println("start: intf.tolink <- metaBytes") - intf.tolink <- metaBytes - //intf.link.core.log.Println("finish: intf.tolink <- metaBytes") - //intf.link.core.log.Println("start: metaBytes = <-intf.fromlink") - metaBytes = <-intf.fromlink - //intf.link.core.log.Println("finish: metaBytes = <-intf.fromlink") - meta = version_metadata{} - if !meta.decode(metaBytes) || !meta.check() { - intf.link.core.log.Println("Metadata decode failure") - return - } - base := version_getBaseMetadata() - if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) - return - } - shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) - if intf.peer == nil { - intf.link.mutex.Lock() - delete(intf.link.interfaces, intf.name) - intf.link.mutex.Unlock() - return - } - intf.peer.linkOut = make(chan []byte, 1) // protocol traffic - intf.peer.out = func(msg []byte) { - defer func() { recover() }() - intf.tolink <- msg - } // called by peer.sendPacket() - intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - intf.peer.close = func() { - close(intf.fromlink) - close(intf.tolink) - } - go intf.handler() - go intf.peer.linkLoop() -} - -func (l *link) getInterface(identity string) *linkInterface { - l.mutex.RLock() - defer l.mutex.RUnlock() - if intf, ok := l.interfaces[identity]; ok { - return intf - } - return nil -} - -func (l *link) shutdown(identity string) error { - if intf, ok := l.interfaces[identity]; ok { - intf.shutdown <- true - l.core.peers.removePeer(intf.peer.port) - l.mutex.Lock() - delete(l.interfaces, identity) - l.mutex.Unlock() - return nil - } else { - return fmt.Errorf("interface '%s' doesn't exist or already shutdown", identity) - } -} - -func (ai *linkInterface) handler() { - send := func(msg []byte) { - ai.tolink <- msg - atomic.AddUint64(&ai.peer.bytesSent, uint64(len(msg))) - util.PutBytes(msg) - } - for { - timerInterval := tcp_ping_interval - timer := time.NewTimer(timerInterval) - defer timer.Stop() - select { - case p := <-ai.peer.linkOut: - send(p) - continue - default: - } - timer.Stop() - select { - case <-timer.C: - default: - } - timer.Reset(timerInterval) - select { - case _ = <-timer.C: - send([]byte{}) - case p := <-ai.peer.linkOut: - send(p) - continue - case r := <-ai.fromlink: - ai.peer.handlePacket(r) - ai.link.core.switchTable.idleIn <- ai.peer.port - case <-ai.shutdown: - return - } - } -} -*/ diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 966319aa..db2cdf7e 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -14,8 +14,6 @@ var _ = linkInterfaceMsgIO(&stream{}) type stream struct { rwc io.ReadWriteCloser inputBuffer []byte // Incoming packet stream - // TODO remove the rest, it shouldn't matter in the long run - handlePacket func([]byte) } func (s *stream) close() error { @@ -26,11 +24,9 @@ const streamMsgSize = 2048 + 65535 var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" -func (s *stream) init(rwc io.ReadWriteCloser, in func([]byte)) { +func (s *stream) init(rwc io.ReadWriteCloser) { // TODO have this also do the metadata handshake and create the peer struct s.rwc = rwc - s.handlePacket = in - // TODO call something to do the metadata exchange } @@ -112,31 +108,6 @@ func (s *stream) _recvMetaBytes() ([]byte, error) { return metaBytes, nil } -// This reads from the channel into a []byte buffer for incoming messages. It -// copies completed messages out of the cache into a new slice, and passes them -// to the peer struct via the provided `in func([]byte)` argument. Then it -// shifts the incomplete fragments of data forward so future reads won't -// overwrite it. -func (s *stream) handleInput(bs []byte) error { - if len(bs) > 0 { - s.inputBuffer = append(s.inputBuffer, bs...) - buf := s.inputBuffer - msg, ok, err2 := stream_chopMsg(&buf) - if err2 != nil { - return fmt.Errorf("message error: %v", err2) - } - if !ok { - // We didn't get the whole message yet - return nil - } - newMsg := append(util.GetBytes(), msg...) - s.inputBuffer = append(s.inputBuffer[:0], buf...) - s.handlePacket(newMsg) - util.Yield() // Make sure we give up control to the scheduler - } - return nil -} - // This takes a pointer to a slice as an argument. It checks if there's a // complete message and, if so, slices out those parts and returns the message, // true, and nil. If there's no error, but also no complete message, it returns diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index e1f490e7..74c5ffe3 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -20,14 +20,11 @@ import ( "math/rand" "net" "sync" - "sync/atomic" "time" "golang.org/x/net/proxy" - "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) const default_timeout = 6 * time.Second @@ -284,7 +281,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() iface.setExtraOptions(sock) stream := stream{} - stream.init(sock, nil) + stream.init(sock) local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) name := "tcp://" + sock.RemoteAddr().String() @@ -297,185 +294,3 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { err = link.handler() iface.core.log.Println("DEBUG: stopped handler for", name, err) } - -// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. -// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). -func (iface *tcpInterface) handler_old(sock net.Conn, incoming bool) { - defer sock.Close() - iface.setExtraOptions(sock) - // Get our keys - myLinkPub, myLinkPriv := crypto.NewBoxKeys() // ephemeral link keys - meta := version_getBaseMetadata() - meta.box = iface.core.boxPub - meta.sig = iface.core.sigPub - meta.link = *myLinkPub - metaBytes := meta.encode() - _, err := sock.Write(metaBytes) - if err != nil { - return - } - if iface.timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.timeout)) - } - _, err = sock.Read(metaBytes) - if err != nil { - return - } - meta = version_metadata{} // Reset to zero value - if !meta.decode(metaBytes) || !meta.check() { - // Failed to decode and check the metadata - // If it's a version mismatch issue, then print an error message - base := version_getBaseMetadata() - if meta.meta == base.meta { - if meta.ver > base.ver { - iface.core.log.Println("Failed to connect to node:", sock.RemoteAddr().String(), "version:", meta.ver) - } else if meta.ver == base.ver && meta.minorVer > base.minorVer { - iface.core.log.Println("Failed to connect to node:", sock.RemoteAddr().String(), "version:", fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) - } - } - // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? - return - } - remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) - localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) - if e1 != nil || e2 != nil { - return - } - info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, - localAddr: localAddr, - remoteAddr: remoteAddr, - } - if iface.isAlreadyConnected(info) { - return - } - // Quit the parent call if this is a connection to ourself - equiv := func(k1, k2 []byte) bool { - for idx := range k1 { - if k1[idx] != k2[idx] { - return false - } - } - return true - } - if equiv(meta.box[:], iface.core.boxPub[:]) { - return - } - if equiv(meta.sig[:], iface.core.sigPub[:]) { - return - } - // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - // Allow unauthorized peers if they're link-local - raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - raddr := net.ParseIP(raddrStr) - if !raddr.IsLinkLocalUnicast() { - return - } - } - // Check if we already have a connection to this node, close and block if yes - iface.mutex.Lock() - /*if blockChan, isIn := iface.conns[info]; isIn { - iface.mutex.Unlock() - sock.Close() - <-blockChan - return - }*/ - blockChan := make(chan struct{}) - iface.conns[info] = blockChan - iface.mutex.Unlock() - defer func() { - iface.mutex.Lock() - delete(iface.conns, info) - iface.mutex.Unlock() - close(blockChan) - }() - // Note that multiple connections to the same node are allowed - // E.g. over different interfaces - p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) - p.linkOut = make(chan []byte, 1) - out := make(chan []byte, 1) - defer close(out) - go func() { - // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{streamMsg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(streamMsg)+len(msgLen)+len(msg))) - util.PutBytes(msg) - } - timerInterval := tcp_ping_interval - timer := time.NewTimer(timerInterval) - defer timer.Stop() - for { - select { - case msg := <-p.linkOut: - // Always send outgoing link traffic first, if needed - send(msg) - continue - default: - } - // Otherwise wait reset the timer and wait for something to do - timer.Stop() - select { - case <-timer.C: - default: - } - timer.Reset(timerInterval) - select { - case _ = <-timer.C: - send(nil) // TCP keep-alive traffic - case msg := <-p.linkOut: - send(msg) - case msg, ok := <-out: - if !ok { - return - } - send(msg) // Block until the socket write has finished - // Now inform the switch that we're ready for more traffic - p.core.switchTable.idleIn <- p.port - } - } - }() - p.core.switchTable.idleIn <- p.port // Start in the idle state - p.out = func(msg []byte) { - defer func() { recover() }() - out <- msg - } - p.close = func() { sock.Close() } - go p.linkLoop() - defer func() { - // Put all of our cleanup here... - p.core.peers.removePeer(p.port) - }() - us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) - them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&meta.box) - themAddr := address.AddrForNodeID(themNodeID) - themAddrString := net.IP(themAddr[:]).String() - themString := fmt.Sprintf("%s@%s", themAddrString, them) - iface.core.log.Printf("Connected: %s, source: %s", themString, us) - //iface.stream.init(sock, p.handlePacket) - bs := make([]byte, 2*streamMsgSize) - var n int - for { - if iface.timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.timeout)) - } - n, err = sock.Read(bs) - if err != nil { - break - } - if n > 0 { - //iface.stream.handleInput(bs[:n]) - } - } - if err == nil { - iface.core.log.Printf("Disconnected: %s, source: %s", themString, us) - } else { - iface.core.log.Printf("Disconnected: %s, source: %s, error: %s", themString, us, err) - } - return -} From 2944be4faff0419cc4e4c7d4dbe68672d56473df Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Jan 2019 17:05:16 +0000 Subject: [PATCH 15/27] Further tweaks --- src/yggdrasil/awdl.go | 18 ++++++++++++------ src/yggdrasil/link.go | 6 ------ src/yggdrasil/mobile_ios.go | 10 ++++++---- src/yggdrasil/peer.go | 1 + 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index ba3131ff..3e26a251 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -2,6 +2,7 @@ package yggdrasil import ( "errors" + "io" "sync" ) @@ -24,8 +25,13 @@ type awdlReadWriteCloser struct { } func (c awdlReadWriteCloser) Read(p []byte) (n int, err error) { - p = <-c.fromAWDL - return len(p), nil + select { + case packet := <-c.fromAWDL: + n = copy(p, packet) + return n, nil + default: + return 0, io.EOF + } } func (c awdlReadWriteCloser) Write(p []byte) (n int, err error) { @@ -48,10 +54,10 @@ func (l *awdl) init(c *Core) error { return nil } -func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name, local, remote string) (*awdlInterface, error) { +func (l *awdl) create(name, local, remote string) (*awdlInterface, error) { rwc := awdlReadWriteCloser{ - fromAWDL: fromAWDL, - toAWDL: toAWDL, + fromAWDL: make(chan []byte, 1), + toAWDL: make(chan []byte, 1), } s := stream{} s.init(rwc) @@ -66,7 +72,7 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name, local, rem l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() - go link.handler() + go intf.link.handler() return &intf, nil } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index e763fb1e..8b833f53 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -87,12 +87,10 @@ func (intf *linkInterface) handler() error { if err != nil { return err } - intf.link.core.log.Println("Sent my metadata") metaBytes, err = intf.msgIO._recvMetaBytes() if err != nil { return err } - intf.link.core.log.Println("Received their metadata") meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { return errors.New("failed to decode metadata") @@ -102,7 +100,6 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } - intf.link.core.log.Println("Do we have a link already?") // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig @@ -127,7 +124,6 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("DEBUG: registered interface for", intf.name) } intf.link.mutex.Unlock() - intf.link.core.log.Println("Create peer") // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) @@ -149,7 +145,6 @@ func (intf *linkInterface) handler() error { intf.peer.close = func() { intf.msgIO.close() } go intf.peer.linkLoop() // Start the writer - intf.link.core.log.Println("Start writer") go func() { interval := 4 * time.Second timer := time.NewTimer(interval) @@ -195,7 +190,6 @@ func (intf *linkInterface) handler() error { }() intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Run reader loop - intf.link.core.log.Println("Start reader") for { msg, err := intf.msgIO.readMsg() if len(msg) > 0 { diff --git a/src/yggdrasil/mobile_ios.go b/src/yggdrasil/mobile_ios.go index e864c889..5e0d4aa4 100644 --- a/src/yggdrasil/mobile_ios.go +++ b/src/yggdrasil/mobile_ios.go @@ -30,9 +30,7 @@ func (nsl MobileLogger) Write(p []byte) (n int, err error) { } func (c *Core) AWDLCreateInterface(name, local, remote string) error { - fromAWDL := make(chan []byte, 32) - toAWDL := make(chan []byte, 32) - if intf, err := c.awdl.create(fromAWDL, toAWDL, name, local, remote); err != nil || intf == nil { + if intf, err := c.awdl.create(name, local, remote); err != nil || intf == nil { c.log.Println("c.awdl.create:", err) return err } @@ -45,7 +43,11 @@ func (c *Core) AWDLShutdownInterface(name string) error { func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { if intf := c.awdl.getInterface(identity); intf != nil { - return <-intf.rwc.toAWDL, nil + read, ok := <-intf.rwc.toAWDL + if !ok { + return nil, errors.New("AWDLRecvPacket: channel closed") + } + return read, nil } return nil, errors.New("AWDLRecvPacket identity not known: " + identity) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 2cd1afe8..ad54fbc3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -250,6 +250,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { func (p *peer) sendPacket(packet []byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? + atomic.AddUint64(&p.bytesSent, uint64(len(packet))) p.out(packet) } From 188a9e439dbac4a4e9a99526eecc541ba6d7bd74 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Jan 2019 18:16:31 +0000 Subject: [PATCH 16/27] Bug fixes for AWDL --- src/yggdrasil/awdl.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 3e26a251..56104ee7 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -25,18 +25,18 @@ type awdlReadWriteCloser struct { } func (c awdlReadWriteCloser) Read(p []byte) (n int, err error) { - select { - case packet := <-c.fromAWDL: + if packet, ok := <-c.fromAWDL; ok { n = copy(p, packet) return n, nil - default: - return 0, io.EOF } + return 0, io.EOF } func (c awdlReadWriteCloser) Write(p []byte) (n int, err error) { - c.toAWDL <- p - return len(p), nil + var pc []byte + pc = append(pc, p...) + c.toAWDL <- pc + return len(pc), nil } func (c awdlReadWriteCloser) Close() error { From 705b914d008f4917a4fee8ea5bdd0d875d930367 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Jan 2019 19:42:33 +0000 Subject: [PATCH 17/27] Move awdl into link --- src/yggdrasil/awdl.go | 50 ++++++++++++++++++------------------- src/yggdrasil/core.go | 3 +-- src/yggdrasil/link.go | 3 ++- src/yggdrasil/mobile_ios.go | 10 ++++---- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 56104ee7..e9e57a61 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -7,13 +7,13 @@ import ( ) type awdl struct { - core *Core + link *link mutex sync.RWMutex // protects interfaces below interfaces map[string]*awdlInterface } type awdlInterface struct { - link *linkInterface + linkif *linkInterface rwc awdlReadWriteCloser peer *peer stream stream @@ -45,53 +45,53 @@ func (c awdlReadWriteCloser) Close() error { return nil } -func (l *awdl) init(c *Core) error { - l.core = c - l.mutex.Lock() - l.interfaces = make(map[string]*awdlInterface) - l.mutex.Unlock() +func (a *awdl) init(l *link) error { + a.link = l + a.mutex.Lock() + a.interfaces = make(map[string]*awdlInterface) + a.mutex.Unlock() return nil } -func (l *awdl) create(name, local, remote string) (*awdlInterface, error) { +func (a *awdl) create(name, local, remote string) (*awdlInterface, error) { rwc := awdlReadWriteCloser{ fromAWDL: make(chan []byte, 1), toAWDL: make(chan []byte, 1), } s := stream{} s.init(rwc) - link, err := l.core.link.create(&s, name, "awdl", local, remote) + linkif, err := a.link.create(&s, name, "awdl", local, remote) if err != nil { return nil, err } intf := awdlInterface{ - link: link, - rwc: rwc, + linkif: linkif, + rwc: rwc, } - l.mutex.Lock() - l.interfaces[name] = &intf - l.mutex.Unlock() - go intf.link.handler() + a.mutex.Lock() + a.interfaces[name] = &intf + a.mutex.Unlock() + go intf.linkif.handler() return &intf, nil } -func (l *awdl) getInterface(identity string) *awdlInterface { - l.mutex.RLock() - defer l.mutex.RUnlock() - if intf, ok := l.interfaces[identity]; ok { +func (a *awdl) getInterface(identity string) *awdlInterface { + a.mutex.RLock() + defer a.mutex.RUnlock() + if intf, ok := a.interfaces[identity]; ok { return intf } return nil } -func (l *awdl) shutdown(identity string) error { - if intf, ok := l.interfaces[identity]; ok { - close(intf.link.closed) +func (a *awdl) shutdown(identity string) error { + if intf, ok := a.interfaces[identity]; ok { + close(intf.linkif.closed) intf.rwc.Close() - l.mutex.Lock() - delete(l.interfaces, identity) - l.mutex.Unlock() + a.mutex.Lock() + delete(a.interfaces, identity) + a.mutex.Unlock() return nil } return errors.New("Interface not found or already closed") diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 70905619..4a1aba31 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -44,8 +44,7 @@ type Core struct { searches searches multicast multicast tcp tcpInterface - link link // TODO: not sure if this wants to be here? - awdl awdl + link link log *log.Logger } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 8b833f53..99044ce5 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -15,6 +15,7 @@ type link struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface + awdl awdl // AWDL interface support } type linkInfo struct { @@ -49,7 +50,7 @@ func (l *link) init(c *Core) error { l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() - if err := l.core.awdl.init(c); err != nil { + if err := l.awdl.init(l); err != nil { l.core.log.Println("Failed to start AWDL interface") return err } diff --git a/src/yggdrasil/mobile_ios.go b/src/yggdrasil/mobile_ios.go index 5e0d4aa4..c2ec63be 100644 --- a/src/yggdrasil/mobile_ios.go +++ b/src/yggdrasil/mobile_ios.go @@ -30,19 +30,19 @@ func (nsl MobileLogger) Write(p []byte) (n int, err error) { } func (c *Core) AWDLCreateInterface(name, local, remote string) error { - if intf, err := c.awdl.create(name, local, remote); err != nil || intf == nil { - c.log.Println("c.awdl.create:", err) + if intf, err := c.link.awdl.create(name, local, remote); err != nil || intf == nil { + c.log.Println("c.link.awdl.create:", err) return err } return nil } func (c *Core) AWDLShutdownInterface(name string) error { - return c.awdl.shutdown(name) + return c.link.awdl.shutdown(name) } func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { - if intf := c.awdl.getInterface(identity); intf != nil { + if intf := c.link.awdl.getInterface(identity); intf != nil { read, ok := <-intf.rwc.toAWDL if !ok { return nil, errors.New("AWDLRecvPacket: channel closed") @@ -54,7 +54,7 @@ func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { func (c *Core) AWDLSendPacket(identity string, buf []byte) error { packet := append(util.GetBytes(), buf[:]...) - if intf := c.awdl.getInterface(identity); intf != nil { + if intf := c.link.awdl.getInterface(identity); intf != nil { intf.rwc.fromAWDL <- packet return nil } From 2466c54a71a231795e283ad525268381ef066c55 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 27 Jan 2019 20:56:10 +0000 Subject: [PATCH 18/27] Update debug lines in link.go --- src/yggdrasil/link.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 99044ce5..1e658479 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -51,7 +51,7 @@ func (l *link) init(c *Core) error { l.mutex.Unlock() if err := l.awdl.init(l); err != nil { - l.core.log.Println("Failed to start AWDL interface") + l.core.log.Errorln("Failed to start AWDL interface") return err } @@ -98,7 +98,7 @@ func (intf *linkInterface) handler() error { } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } // Check if we already have a link to this node @@ -109,7 +109,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. - intf.link.core.log.Println("DEBUG: found existing interface for", intf.name) + intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.msgIO.close() <-oldIntf.closed return nil @@ -122,7 +122,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() close(intf.closed) }() - intf.link.core.log.Println("DEBUG: registered interface for", intf.name) + intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) } intf.link.mutex.Unlock() // Create peer From 05962b2cbd3d7f184f8903ffafc943bef001e2ff Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 30 Jan 2019 20:58:23 -0600 Subject: [PATCH 19/27] disable idle nodes in the switch instead of killing the connection entirely. this implementation is ugly, but i think it maybe works --- go.mod | 2 +- go.sum | 4 +-- src/yggdrasil/link.go | 64 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 3e8db512..995e54c2 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,10 @@ module github.com/yggdrasil-network/yggdrasil-go require ( github.com/docker/libcontainer v2.2.1+incompatible + github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 github.com/mitchellh/mapstructure v1.1.2 - github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 diff --git a/go.sum b/go.sum index 17b1017a..92dfe88c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/docker/libcontainer v2.2.1+incompatible h1:++SbbkCw+X8vAd4j2gOCzZ2Nn7s2xFALTf7LZKmM1/0= github.com/docker/libcontainer v2.2.1+incompatible/go.mod h1:osvj61pYsqhNCMLGX31xr7klUBhHb/ZBuXS0o1Fvwbw= +github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= +github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 h1:xmvkbxXDeN1ffWq8kvrhyqVYAO2aXuRBsbpxVTR+JyU= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio= github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 h1:YnZmFjg0Nvk8851WTVWlqMC1ecJH07Ctz+Ezxx4u54g= @@ -18,5 +20,3 @@ golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dz golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= -github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 1e658479..320b3e61 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -146,7 +146,9 @@ func (intf *linkInterface) handler() error { intf.peer.close = func() { intf.msgIO.close() } go intf.peer.linkLoop() // Start the writer + signalReady := make(chan struct{}, 1) go func() { + defer close(signalReady) interval := 4 * time.Second timer := time.NewTimer(interval) clearTimer := func() { @@ -181,15 +183,63 @@ func (intf *linkInterface) handler() error { } intf.msgIO.writeMsg(msg) util.PutBytes(msg) - if true { - // TODO *don't* do this if we're not reading any traffic - // In such a case, the reader is responsible for resetting it the next time we read something - intf.link.core.switchTable.idleIn <- intf.peer.port + select { + case signalReady <- struct{}{}: + default: } } } }() - intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + // Used to enable/disable activity in the switch + signalAlive := make(chan struct{}, 1) + defer close(signalAlive) + go func() { + var isAlive bool + var isReady bool + interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + timer := time.NewTimer(interval) + clearTimer := func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + defer clearTimer() + for { + clearTimer() + timer.Reset(interval) + select { + case _, ok := <-signalAlive: + if !ok { + return + } + if !isAlive { + isAlive = true + if !isReady { + // (Re-)enable in the switch + isReady = true + intf.link.core.switchTable.idleIn <- intf.peer.port + } + } + case _, ok := <-signalReady: + if !ok { + return + } + if !isAlive || !isReady { + // Disable in the switch + isReady = false + } else { + // Keep enabled in the switch + intf.link.core.switchTable.idleIn <- intf.peer.port + } + case <-timer.C: + isAlive = false + } + } + }() // Run reader loop for { msg, err := intf.msgIO.readMsg() @@ -199,6 +249,10 @@ func (intf *linkInterface) handler() error { if err != nil { return err } + select { + case signalAlive <- struct{}{}: + default: + } } //////////////////////////////////////////////////////////////////////////////// return nil From e36f88c75f8702442478b1e15544c602ccdbe72d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 31 Jan 2019 23:18:02 +0000 Subject: [PATCH 20/27] Info logging when link connects/disconnects --- src/yggdrasil/link.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 320b3e61..681cf814 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -3,10 +3,13 @@ package yggdrasil import ( "errors" "fmt" + "net" + "strings" "sync" //"sync/atomic" "time" + "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" ) @@ -143,7 +146,22 @@ func (intf *linkInterface) handler() error { out <- msg } intf.peer.linkOut = make(chan []byte, 1) - intf.peer.close = func() { intf.msgIO.close() } + intf.peer.close = func() { + intf.msgIO.close() + // Make output + themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) + themAddrString := net.IP(themAddr[:]).String() + themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) + intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + } + // Make output + themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) + themAddrString := net.IP(themAddr[:]).String() + themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) + intf.link.core.log.Infof("Connected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + // Start the link loop go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) From 432f93de894fcc21a3fb30a32fa0e053405796c7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 31 Jan 2019 23:29:18 +0000 Subject: [PATCH 21/27] Check AllowedEncryptionPublicKeys --- src/yggdrasil/link.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 681cf814..3b540d04 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -1,6 +1,7 @@ package yggdrasil import ( + "encoding/hex" "errors" "fmt" "net" @@ -104,6 +105,18 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } + // Check if we're authorized to connect to this key / IP + if !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + // Allow unauthorized peers if they're link-local + raddrStr, _, _ := net.SplitHostPort(intf.info.remote) + raddr := net.ParseIP(raddrStr) + if !raddr.IsLinkLocalUnicast() { + intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKey does not contain key %s", + strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) + intf.msgIO.close() + return nil + } + } // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig From ec5f7d98790b246c3e5f0c5b143f5789ad895166 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 31 Jan 2019 23:47:20 +0000 Subject: [PATCH 22/27] Enforce AllowedEncryptionPublicKeys for all peers inc. link-local --- src/yggdrasil/link.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 3b540d04..e52ff955 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -107,15 +107,10 @@ func (intf *linkInterface) handler() error { } // Check if we're authorized to connect to this key / IP if !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - // Allow unauthorized peers if they're link-local - raddrStr, _, _ := net.SplitHostPort(intf.info.remote) - raddr := net.ParseIP(raddrStr) - if !raddr.IsLinkLocalUnicast() { - intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKey does not contain key %s", - strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) - intf.msgIO.close() - return nil - } + intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) + intf.msgIO.close() + return nil } // Check if we already have a link to this node intf.info.box = meta.box From 43f798e82ea1cce63e23c1c2be46426fc25a1c46 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Feb 2019 00:02:17 +0000 Subject: [PATCH 23/27] Check link-local in tcp.go, track direction in link.go, fix compile error for mobile.go --- src/yggdrasil/awdl.go | 4 ++-- src/yggdrasil/link.go | 35 +++++++++++++++++------------------ src/yggdrasil/mobile.go | 3 ++- src/yggdrasil/mobile_ios.go | 4 ++-- src/yggdrasil/tcp.go | 3 ++- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index e9e57a61..42366888 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -54,14 +54,14 @@ func (a *awdl) init(l *link) error { return nil } -func (a *awdl) create(name, local, remote string) (*awdlInterface, error) { +func (a *awdl) create(name, local, remote string, incoming bool) (*awdlInterface, error) { rwc := awdlReadWriteCloser{ fromAWDL: make(chan []byte, 1), toAWDL: make(chan []byte, 1), } s := stream{} s.init(rwc) - linkif, err := a.link.create(&s, name, "awdl", local, remote) + linkif, err := a.link.create(&s, name, "awdl", local, remote, incoming, true) if err != nil { return nil, err } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index e52ff955..2d2155c9 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -40,12 +40,14 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO - info linkInfo - closed chan struct{} + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + incoming bool + force bool + closed chan struct{} } func (l *link) init(c *Core) error { @@ -62,7 +64,7 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string) (*linkInterface, error) { +func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but lets pick something human readable, just for debugging intf := linkInterface{ name: name, @@ -73,6 +75,8 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st local: local, remote: remote, }, + incoming: incoming, + force: force, } //l.interfaces[intf.name] = &intf //go intf.start() @@ -106,7 +110,7 @@ func (intf *linkInterface) handler() error { return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP - if !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() @@ -154,19 +158,14 @@ func (intf *linkInterface) handler() error { out <- msg } intf.peer.linkOut = make(chan []byte, 1) - intf.peer.close = func() { - intf.msgIO.close() - // Make output - themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) - themAddrString := net.IP(themAddr[:]).String() - themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } - // Make output themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) + intf.peer.close = func() { + intf.msgIO.close() + intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + } intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop diff --git a/src/yggdrasil/mobile.go b/src/yggdrasil/mobile.go index 52215f06..76fbe54d 100644 --- a/src/yggdrasil/mobile.go +++ b/src/yggdrasil/mobile.go @@ -5,10 +5,11 @@ package yggdrasil import ( "encoding/hex" "encoding/json" - "log" "os" "time" + "github.com/gologme/log" + hjson "github.com/hjson/hjson-go" "github.com/mitchellh/mapstructure" "github.com/yggdrasil-network/yggdrasil-go/src/config" diff --git a/src/yggdrasil/mobile_ios.go b/src/yggdrasil/mobile_ios.go index c2ec63be..7b089992 100644 --- a/src/yggdrasil/mobile_ios.go +++ b/src/yggdrasil/mobile_ios.go @@ -29,8 +29,8 @@ func (nsl MobileLogger) Write(p []byte) (n int, err error) { return len(p), nil } -func (c *Core) AWDLCreateInterface(name, local, remote string) error { - if intf, err := c.link.awdl.create(name, local, remote); err != nil || intf == nil { +func (c *Core) AWDLCreateInterface(name, local, remote string, incoming bool) error { + if intf, err := c.link.awdl.create(name, local, remote, incoming); err != nil || intf == nil { c.log.Println("c.link.awdl.create:", err) return err } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 617aa225..979bc81b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -284,8 +284,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { stream.init(sock) local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) + remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() name := "tcp://" + sock.RemoteAddr().String() - link, err := iface.core.link.create(&stream, name, "tcp", local, remote) + link, err := iface.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) if err != nil { iface.core.log.Println(err) panic(err) From b44a0f29f3ed3ed31323027eef864acf20240259 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Feb 2019 22:18:55 -0600 Subject: [PATCH 24/27] send an ack if we receive a packet and don't have any return traffic, keeping a legacy 4-second keep-alive in case there's no traffic at all to send (to be removed later, after nodes have upgraded), ideally we should either remove ReadTimeout or use it for the switch idle timeout instead --- src/util/util.go | 12 ++++++ src/yggdrasil/link.go | 85 +++++++++++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 32 deletions(-) diff --git a/src/util/util.go b/src/util/util.go index 65e6d463..45be3b19 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func Yield() { @@ -44,3 +45,14 @@ func PutBytes(bs []byte) { default: } } + +// This is a workaround to go's broken timer implementation +func TimerStop(t *time.Timer) bool { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return true +} diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 2d2155c9..a6c0ccc3 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,6 +20,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface awdl awdl // AWDL interface support + // TODO timeout (to remove from switch), read from config.ReadTimeout } type linkInfo struct { @@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } - //l.interfaces[intf.name] = &intf - //go intf.start() return &intf, nil } @@ -172,41 +171,49 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) + signalSent := make(chan struct{}, 1) + sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) + defer close(signalSent) interval := 4 * time.Second - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } + tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp + defer util.TimerStop(tcpTimer) + send := func(bs []byte) { + intf.msgIO.writeMsg(bs) + select { + case signalSent <- struct{}{}: + default: } } - defer clearTimer() for { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - intf.msgIO.writeMsg(msg) + send(msg) continue default: } // No protocol traffic to send, so reset the timer - clearTimer() - timer.Reset(interval) + util.TimerStop(tcpTimer) + tcpTimer.Reset(interval) // Now block until something is ready or the timer triggers keepalive traffic select { - case <-timer.C: - intf.msgIO.writeMsg(nil) + case <-tcpTimer.C: + intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case <-sendAck: + intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) case msg := <-intf.peer.linkOut: intf.msgIO.writeMsg(msg) case msg, ok := <-out: if !ok { return } - intf.msgIO.writeMsg(msg) + send(msg) util.PutBytes(msg) select { case signalReady <- struct{}{}: @@ -217,27 +224,23 @@ func (intf *linkInterface) handler() error { }() //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Used to enable/disable activity in the switch - signalAlive := make(chan struct{}, 1) + signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) go func() { var isAlive bool var isReady bool - interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - } - defer clearTimer() + var ackTimerRunning bool + timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + ackTime := time.Second + timer := time.NewTimer(timeout) + defer util.TimerStop(timer) + ackTimer := time.NewTimer(ackTime) + defer util.TimerStop(ackTimer) for { - clearTimer() - timer.Reset(interval) + util.TimerStop(timer) + timer.Reset(timeout) select { - case _, ok := <-signalAlive: + case gotMsg, ok := <-signalAlive: if !ok { return } @@ -249,6 +252,24 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } + if gotMsg && !ackTimerRunning { + util.TimerStop(ackTimer) + ackTimer.Reset(ackTime) + ackTimerRunning = true + } + case _, ok := <-signalSent: + // Stop any running ack timer + if !ok { + return + } + util.TimerStop(ackTimer) + ackTimerRunning = false + case <-ackTimer.C: + // We haven't sent anything in the past ackTimeout, so signal a send of a nil packet + select { + case sendAck <- struct{}{}: + default: + } case _, ok := <-signalReady: if !ok { return @@ -275,7 +296,7 @@ func (intf *linkInterface) handler() error { return err } select { - case signalAlive <- struct{}{}: + case signalAlive <- len(msg) > 0: default: } } From ebbe5f67ad5a988e435aaf3d7df8390a7eab8da5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Feb 2019 22:41:51 -0600 Subject: [PATCH 25/27] don't time out a link unless we were expecting an ack and didn't get one --- src/yggdrasil/link.go | 57 +++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a6c0ccc3..6704c918 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -171,7 +171,7 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) - signalSent := make(chan struct{}, 1) + signalSent := make(chan bool, 1) sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) @@ -182,7 +182,7 @@ func (intf *linkInterface) handler() error { send := func(bs []byte) { intf.msgIO.writeMsg(bs) select { - case signalSent <- struct{}{}: + case signalSent <- len(bs) > 0: default: } } @@ -229,16 +229,15 @@ func (intf *linkInterface) handler() error { go func() { var isAlive bool var isReady bool - var ackTimerRunning bool - timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - ackTime := time.Second - timer := time.NewTimer(timeout) - defer util.TimerStop(timer) - ackTimer := time.NewTimer(ackTime) - defer util.TimerStop(ackTimer) + var sendTimerRunning bool + var recvTimerRunning bool + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + sendTime := time.Second + sendTimer := time.NewTimer(sendTime) + defer util.TimerStop(sendTimer) + recvTimer := time.NewTimer(recvTime) + defer util.TimerStop(recvTimer) for { - util.TimerStop(timer) - timer.Reset(timeout) select { case gotMsg, ok := <-signalAlive: if !ok { @@ -252,23 +251,26 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } - if gotMsg && !ackTimerRunning { - util.TimerStop(ackTimer) - ackTimer.Reset(ackTime) - ackTimerRunning = true + if gotMsg && !sendTimerRunning { + // We got a message + // Start a timer, if it expires then send a 0-sized ack to let them know we're alive + util.TimerStop(sendTimer) + sendTimer.Reset(sendTime) + sendTimerRunning = true } - case _, ok := <-signalSent: + case sentMsg, ok := <-signalSent: // Stop any running ack timer if !ok { return } - util.TimerStop(ackTimer) - ackTimerRunning = false - case <-ackTimer.C: - // We haven't sent anything in the past ackTimeout, so signal a send of a nil packet - select { - case sendAck <- struct{}{}: - default: + util.TimerStop(sendTimer) + sendTimerRunning = false + if sentMsg && !recvTimerRunning { + // We sent a message + // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem + util.TimerStop(recvTimer) + recvTimer.Reset(recvTime) + recvTimerRunning = true } case _, ok := <-signalReady: if !ok { @@ -281,7 +283,14 @@ func (intf *linkInterface) handler() error { // Keep enabled in the switch intf.link.core.switchTable.idleIn <- intf.peer.port } - case <-timer.C: + case <-sendTimer.C: + // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive + select { + case sendAck <- struct{}{}: + default: + } + case <-recvTimer.C: + // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding isAlive = false } } From 5ddf84f32991ae8bda3361cbbbea7c46294e97d3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 3 Feb 2019 15:22:14 -0600 Subject: [PATCH 26/27] remove peers completely after a long switch timeout, this could use some improvement later --- src/yggdrasil/switch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f2adf3fb..db39d010 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -281,6 +281,7 @@ func (t *switchTable) cleanPeers() { if now.Sub(peer.time) > switch_timeout+switch_throttle { // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. delete(t.data.peers, port) + go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe } } if _, isIn := t.data.peers[t.parent]; !isIn { From 2f8dd5dde01c75ff76e01157fb77ddeec1e03053 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 3 Feb 2019 15:50:25 -0600 Subject: [PATCH 27/27] remove race condition in setting peer.close by requiring it as an argument to newPeer --- src/yggdrasil/debug.go | 4 +--- src/yggdrasil/link.go | 9 +++------ src/yggdrasil/peer.go | 3 ++- src/yggdrasil/router.go | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f7319e46..94faba40 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -97,9 +97,7 @@ func (c *Core) DEBUG_getPeers() *peers { } func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { - //in <-chan []byte, - //out chan<- []byte) *peer { - return ps.newPeer(&box, &sig, &link, "(simulator)") //, in, out) + return ps.newPeer(&box, &sig, &link, "(simulator)", nil) } /* diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 6704c918..be98dd92 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -141,7 +141,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) if intf.peer == nil { return errors.New("failed to create peer") } @@ -160,13 +160,10 @@ func (intf *linkInterface) handler() error { themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.peer.close = func() { - intf.msgIO.close() - intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) + defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop go intf.peer.linkLoop() // Start the writer diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad54fbc3..237d6f61 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -113,7 +113,7 @@ type peer struct { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string) *peer { +func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -123,6 +123,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare firstSeen: now, doSend: make(chan struct{}, 1), dinfo: make(chan *dhtInfo, 1), + close: closer, core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d5059369..99e69828 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,7 @@ func (r *router) init(core *Core) { r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)") + p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { // This is to make very sure it never blocks select {