From 9c818c6278473a923e400832b94daf93ab977c12 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 26 Apr 2020 07:33:03 -0500 Subject: [PATCH 1/6] work-in-progress on a new sim --- build | 2 +- cmd/yggdrasilsim/main.go | 15 +++++++ cmd/yggdrasilsim/node.go | 23 ++++++++++ cmd/yggdrasilsim/store.go | 41 ++++++++++++++++++ src/yggdrasil/simlink.go | 88 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 cmd/yggdrasilsim/main.go create mode 100644 cmd/yggdrasilsim/node.go create mode 100644 cmd/yggdrasilsim/store.go create mode 100644 src/yggdrasil/simlink.go diff --git a/build b/build index 66f94403..6b93ca77 100755 --- a/build +++ b/build @@ -45,7 +45,7 @@ elif [ $ANDROID ]; then github.com/yggdrasil-network/yggdrasil-extras/src/mobile \ github.com/yggdrasil-network/yggdrasil-extras/src/dummy else - for CMD in yggdrasil yggdrasilctl ; do + for CMD in yggdrasil yggdrasilctl yggdrasilsim; do echo "Building: $CMD" go build $ARGS -ldflags="$LDFLAGS" -gcflags="$GCFLAGS" ./cmd/$CMD diff --git a/cmd/yggdrasilsim/main.go b/cmd/yggdrasilsim/main.go new file mode 100644 index 00000000..40fd9ce4 --- /dev/null +++ b/cmd/yggdrasilsim/main.go @@ -0,0 +1,15 @@ +package main + +import ( +//"github.com/yggdrasil-network/yggdrasil-go/src/address" +//"github.com/yggdrasil-network/yggdrasil-go/src/config" +//"github.com/yggdrasil-network/yggdrasil-go/src/crypto" +//"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +) + +func main() { + store := makeStoreSquareGrid(4) + var block chan struct{} + <-block + panic(store) +} diff --git a/cmd/yggdrasilsim/node.go b/cmd/yggdrasilsim/node.go new file mode 100644 index 00000000..e23b5109 --- /dev/null +++ b/cmd/yggdrasilsim/node.go @@ -0,0 +1,23 @@ +package main + +import ( + "io/ioutil" + + "github.com/gologme/log" + + //"github.com/yggdrasil-network/yggdrasil-go/src/address" + "github.com/yggdrasil-network/yggdrasil-go/src/config" + //"github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +) + +type simNode struct { + core yggdrasil.Core + id int +} + +func newNode(id int) *simNode { + n := simNode{id: id} + n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0)) + return &n +} diff --git a/cmd/yggdrasilsim/store.go b/cmd/yggdrasilsim/store.go new file mode 100644 index 00000000..6fce81a4 --- /dev/null +++ b/cmd/yggdrasilsim/store.go @@ -0,0 +1,41 @@ +package main + +type nodeStore map[int]*simNode + +func makeStoreSingle() nodeStore { + s := make(nodeStore) + s[0] = newNode(0) + return s +} + +func linkNodes(a *simNode, b *simNode) { + la := a.core.NewSimlink() + lb := b.core.NewSimlink() + la.SetDestination(lb) + lb.SetDestination(la) + la.Start() + lb.Start() +} + +func makeStoreSquareGrid(sideLength int) nodeStore { + store := make(nodeStore) + nNodes := sideLength * sideLength + idxs := make([]int, 0, nNodes) + // TODO shuffle nodeIDs + for idx := 1; idx <= nNodes; idx++ { + idxs = append(idxs, idx) + } + for _, idx := range idxs { + n := newNode(idx) + store[idx] = n + } + for idx := 0; idx < nNodes; idx++ { + if (idx % sideLength) != 0 { + linkNodes(store[idxs[idx]], store[idxs[idx-1]]) + } + if idx >= sideLength { + linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]]) + } + } + return store +} diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go new file mode 100644 index 00000000..33332645 --- /dev/null +++ b/src/yggdrasil/simlink.go @@ -0,0 +1,88 @@ +package yggdrasil + +import ( + "errors" + "github.com/Arceliar/phony" + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) + +type Simlink struct { + phony.Inbox + rch chan []byte + dest *Simlink + link *linkInterface + started bool +} + +func (s *Simlink) readMsg() ([]byte, error) { + bs := <-s.rch + if bs != nil { + return bs, nil + } else { + return nil, errors.New("read from closed Simlink") + } +} + +func (s *Simlink) _recvMetaBytes() ([]byte, error) { + return s.readMsg() +} + +func (s *Simlink) _sendMetaBytes(bs []byte) error { + _, err := s.writeMsgs([][]byte{bs}) + return err +} + +func (s *Simlink) close() error { + close(s.rch) + return nil +} + +func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) { + if s.dest == nil { + return 0, errors.New("write to unpaired Simlink") + } + var size int + for _, msg := range msgs { + size += len(msg) + bs := append(util.GetBytes(), msg...) + phony.Block(s, func() { + s.dest.Act(s, func() { + defer func() { recover() }() + s.dest.rch <- bs + }) + }) + } + return size, nil +} + +func (c *Core) NewSimlink() *Simlink { + s := &Simlink{rch: make(chan []byte, 1)} + n := "Simlink" + s.link, _ = c.link.create(s, n, n, n, n, false, true) + return s +} + +func (s *Simlink) SetDestination(dest *Simlink) error { + var err error + phony.Block(s, func() { + if s.dest != nil { + err = errors.New("destination already set") + } else { + s.dest = dest + } + }) + return err +} + +func (s *Simlink) Start() error { + var err error + phony.Block(s, func() { + if s.started { + err = errors.New("already started") + } else { + s.started = true + go s.link.handler() + } + }) + return err +} From 5db93be4df4ae6eabe749bdeedffd6e3f4acf63c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 26 Apr 2020 09:59:30 -0500 Subject: [PATCH 2/6] more sim work --- cmd/yggdrasilsim/node.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/yggdrasilsim/node.go b/cmd/yggdrasilsim/node.go index e23b5109..65e6a805 100644 --- a/cmd/yggdrasilsim/node.go +++ b/cmd/yggdrasilsim/node.go @@ -5,19 +5,24 @@ import ( "github.com/gologme/log" - //"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/config" - //"github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" ) type simNode struct { - core yggdrasil.Core - id int + core yggdrasil.Core + id int + nodeID crypto.NodeID + dialer *yggdrasil.Dialer + listener *yggdrasil.Listener } func newNode(id int) *simNode { n := simNode{id: id} n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0)) + n.nodeID = *n.core.NodeID() + n.dialer, _ = n.core.ConnDialer() + n.listener, _ = n.core.ConnListen() return &n } From 72afa0502990b12e6a172090c3621cf35fc9f3de Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 May 2020 10:01:09 -0500 Subject: [PATCH 3/6] test dial/listen in the sim --- build | 2 +- cmd/yggdrasilsim/dial.go | 60 ++++++++++++++++++++++++++++++++++++++++ cmd/yggdrasilsim/main.go | 4 +-- src/yggdrasil/simlink.go | 3 +- 4 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 cmd/yggdrasilsim/dial.go diff --git a/build b/build index 6b93ca77..0d0da7e2 100755 --- a/build +++ b/build @@ -45,7 +45,7 @@ elif [ $ANDROID ]; then github.com/yggdrasil-network/yggdrasil-extras/src/mobile \ github.com/yggdrasil-network/yggdrasil-extras/src/dummy else - for CMD in yggdrasil yggdrasilctl yggdrasilsim; do + for CMD in yggdrasil yggdrasilctl; do echo "Building: $CMD" go build $ARGS -ldflags="$LDFLAGS" -gcflags="$GCFLAGS" ./cmd/$CMD diff --git a/cmd/yggdrasilsim/dial.go b/cmd/yggdrasilsim/dial.go new file mode 100644 index 00000000..5713fdd7 --- /dev/null +++ b/cmd/yggdrasilsim/dial.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "sort" + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" +) + +func doListen(recvNode *simNode) { + for { + c, err := recvNode.listener.Accept() + if err != nil { + panic(err) + } + c.Close() + } +} + +func dialTest(sendNode, recvNode *simNode) { + if sendNode.id == recvNode.id { + fmt.Println("Skipping dial to self") + return + } + var mask crypto.NodeID + for idx := range mask { + mask[idx] = 0xff + } + for { + c, err := sendNode.dialer.DialByNodeIDandMask(nil, &recvNode.nodeID, &mask) + if c != nil { + c.Close() + return + } + if err != nil { + fmt.Println("Dial failed:", err) + } + time.Sleep(time.Second) + } +} + +func dialStore(store nodeStore) { + var nodeIdxs []int + for idx, n := range store { + nodeIdxs = append(nodeIdxs, idx) + go doListen(n) + } + sort.Slice(nodeIdxs, func(i, j int) bool { + return nodeIdxs[i] < nodeIdxs[j] + }) + for _, idx := range nodeIdxs { + sendNode := store[idx] + for _, jdx := range nodeIdxs { + recvNode := store[jdx] + fmt.Printf("Dialing from node %d to node %d / %d...\n", idx, jdx, len(store)) + dialTest(sendNode, recvNode) + } + } +} diff --git a/cmd/yggdrasilsim/main.go b/cmd/yggdrasilsim/main.go index 40fd9ce4..fcbcfc97 100644 --- a/cmd/yggdrasilsim/main.go +++ b/cmd/yggdrasilsim/main.go @@ -9,7 +9,5 @@ import ( func main() { store := makeStoreSquareGrid(4) - var block chan struct{} - <-block - panic(store) + dialStore(store) } diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index 33332645..e846f3ba 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -3,7 +3,6 @@ package yggdrasil import ( "errors" "github.com/Arceliar/phony" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) type Simlink struct { @@ -44,7 +43,7 @@ func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) { var size int for _, msg := range msgs { size += len(msg) - bs := append(util.GetBytes(), msg...) + bs := append([]byte(nil), msg...) phony.Block(s, func() { s.dest.Act(s, func() { defer func() { recover() }() From 15162ee952c3789cd5aed7ae74e36f9a31a11866 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 May 2020 10:51:26 -0500 Subject: [PATCH 4/6] fix a panic from a doubly closed channel in the simlink --- src/yggdrasil/simlink.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index e846f3ba..736ee633 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -14,12 +14,11 @@ type Simlink struct { } func (s *Simlink) readMsg() ([]byte, error) { - bs := <-s.rch - if bs != nil { - return bs, nil - } else { + bs, ok := <-s.rch + if !ok { return nil, errors.New("read from closed Simlink") } + return bs, nil } func (s *Simlink) _recvMetaBytes() ([]byte, error) { @@ -32,6 +31,7 @@ func (s *Simlink) _sendMetaBytes(bs []byte) error { } func (s *Simlink) close() error { + defer func() { recover() }() close(s.rch) return nil } From 402cfc0f005219580e3bb8da48e36c77ad65a377 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 May 2020 10:56:17 -0500 Subject: [PATCH 5/6] undo remaining trivial change to build --- build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build b/build index 0d0da7e2..66f94403 100755 --- a/build +++ b/build @@ -45,7 +45,7 @@ elif [ $ANDROID ]; then github.com/yggdrasil-network/yggdrasil-extras/src/mobile \ github.com/yggdrasil-network/yggdrasil-extras/src/dummy else - for CMD in yggdrasil yggdrasilctl; do + for CMD in yggdrasil yggdrasilctl ; do echo "Building: $CMD" go build $ARGS -ldflags="$LDFLAGS" -gcflags="$GCFLAGS" ./cmd/$CMD From 20ef5910136c1aa214dce826fee0ce5d85b76150 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 May 2020 11:16:11 -0500 Subject: [PATCH 6/6] fix some crashes with races during peer setup --- cmd/yggdrasilsim/dial.go | 1 + cmd/yggdrasilsim/main.go | 7 ------- src/yggdrasil/link.go | 30 +++++++++++++++--------------- src/yggdrasil/peer.go | 4 +++- src/yggdrasil/router.go | 10 +++++----- src/yggdrasil/simlink.go | 6 +++++- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/cmd/yggdrasilsim/dial.go b/cmd/yggdrasilsim/dial.go index 5713fdd7..c7892d40 100644 --- a/cmd/yggdrasilsim/dial.go +++ b/cmd/yggdrasilsim/dial.go @@ -9,6 +9,7 @@ import ( ) func doListen(recvNode *simNode) { + // TODO be able to stop the listeners somehow so they don't leak across different tests for { c, err := recvNode.listener.Accept() if err != nil { diff --git a/cmd/yggdrasilsim/main.go b/cmd/yggdrasilsim/main.go index fcbcfc97..25504c92 100644 --- a/cmd/yggdrasilsim/main.go +++ b/cmd/yggdrasilsim/main.go @@ -1,12 +1,5 @@ package main -import ( -//"github.com/yggdrasil-network/yggdrasil-go/src/address" -//"github.com/yggdrasil-network/yggdrasil-go/src/config" -//"github.com/yggdrasil-network/yggdrasil-go/src/crypto" -//"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" -) - func main() { store := makeStoreSquareGrid(4) dialStore(store) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 733b9ac1..7f6b9b56 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -217,9 +217,23 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + out := func(msgs [][]byte) { + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, msgs, false) + } + linkOut := func(bs []byte) { + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) + } phony.Block(&intf.link.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -228,20 +242,6 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.peer.Act(nil, intf.peer._removeSelf) }() - intf.peer.out = func(msgs [][]byte) { - // nil to prevent it from blocking if the link is somehow frozen - // this is safe because another packet won't be sent until the link notifies - // the peer that it's ready for one - intf.writer.sendFrom(nil, msgs, false) - } - intf.peer.linkOut = func(bs []byte) { - // nil to prevent it from blocking if the link is somehow frozen - // FIXME this is hypothetically not safe, the peer shouldn't be sending - // additional packets until this one finishes, otherwise this could leak - // memory if writing happens slower than link packets are generated... - // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) - } themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7eef9a11..801691a0 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -123,7 +123,7 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -134,6 +134,8 @@ func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShar close: closer, core: ps.core, intf: intf, + out: out, + linkOut: linkOut, } oldPorts := ps.ports newPorts := make(map[switchPort]*peer) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 71d92609..1bb14c4c 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -62,17 +62,17 @@ func (r *router) init(core *Core) { }, } var p *peer - phony.Block(&r.core.peers, func() { - // FIXME don't block here! - p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - }) - p.out = func(packets [][]byte) { + peerOut := func(packets [][]byte) { r.handlePackets(p, packets) r.Act(p, func() { // after the router handle the packets, notify the peer that it's ready for more p.Act(r, p._handleIdle) }) } + phony.Block(&r.core.peers, func() { + // FIXME don't block here! + p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil) + }) p.Act(r, p._handleIdle) r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index 736ee633..f830c215 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -57,7 +57,11 @@ func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) { func (c *Core) NewSimlink() *Simlink { s := &Simlink{rch: make(chan []byte, 1)} n := "Simlink" - s.link, _ = c.link.create(s, n, n, n, n, false, true) + var err error + s.link, err = c.link.create(s, n, n, n, n, false, true) + if err != nil { + panic(err) + } return s }