diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 43cf77a7..c3add0ce 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -9,10 +9,8 @@ import ( "log" "os" "os/signal" - "regexp" "strings" "syscall" - "time" "golang.org/x/text/encoding/unicode" @@ -31,6 +29,119 @@ type node struct { core Core } +func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig { + // Use a configuration file. If -useconf, the configuration will be read + // from stdin. If -useconffile, the configuration will be read from the + // filesystem. + var conf []byte + var err error + if *useconffile != "" { + // Read the file from the filesystem + conf, err = ioutil.ReadFile(*useconffile) + } else { + // Read the file from stdin. + conf, err = ioutil.ReadAll(os.Stdin) + } + if err != nil { + panic(err) + } + // If there's a byte order mark - which Windows 10 is now incredibly fond of + // throwing everywhere when it's converting things into UTF-16 for the hell + // of it - remove it and decode back down into UTF-8. This is necessary + // because hjson doesn't know what to do with UTF-16 and will panic + if bytes.Compare(conf[0:2], []byte{0xFF, 0xFE}) == 0 || + bytes.Compare(conf[0:2], []byte{0xFE, 0xFF}) == 0 { + utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM) + decoder := utf.NewDecoder() + conf, err = decoder.Bytes(conf) + if err != nil { + panic(err) + } + } + // Generate a new configuration - this gives us a set of sane defaults - + // then parse the configuration we loaded above on top of it. The effect + // of this is that any configuration item that is missing from the provided + // configuration will use a sane default. + cfg := config.GenerateConfig(false) + var dat map[string]interface{} + if err := hjson.Unmarshal(conf, &dat); err != nil { + panic(err) + } + confJson, err := json.Marshal(dat) + if err != nil { + panic(err) + } + json.Unmarshal(confJson, &cfg) + // For now we will do a little bit to help the user adjust their + // configuration to match the new configuration format, as some of the key + // names have changed recently. + changes := map[string]string{ + "Multicast": "", + "LinkLocal": "MulticastInterfaces", + "BoxPub": "EncryptionPublicKey", + "BoxPriv": "EncryptionPrivateKey", + "SigPub": "SigningPublicKey", + "SigPriv": "SigningPrivateKey", + "AllowedBoxPubs": "AllowedEncryptionPublicKeys", + } + // Loop over the mappings aove and see if we have anything to fix. + for from, to := range changes { + if _, ok := dat[from]; ok { + if to == "" { + if !*normaliseconf { + log.Println("Warning: Deprecated config option", from, "- please remove") + } + } else { + if !*normaliseconf { + log.Println("Warning: Deprecated config option", from, "- please rename to", to) + } + // If the configuration file doesn't already contain a line with the + // new name then set it to the old value. This makes sure that we + // don't overwrite something that was put there intentionally. + if _, ok := dat[to]; !ok { + dat[to] = dat[from] + } + } + } + } + // Check to see if the peers are in a parsable format, if not then default + // them to the TCP scheme + if peers, ok := dat["Peers"].([]interface{}); ok { + for index, peer := range peers { + uri := peer.(string) + if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { + continue + } + if strings.HasPrefix(uri, "tcp:") { + uri = uri[4:] + } + (dat["Peers"].([]interface{}))[index] = "tcp://" + uri + } + } + // Now do the same with the interface peers + if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { + for intf, peers := range interfacepeers { + for index, peer := range peers.([]interface{}) { + uri := peer.(string) + if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { + continue + } + if strings.HasPrefix(uri, "tcp:") { + uri = uri[4:] + } + ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri + } + } + } + // Overlay our newly mapped configuration onto the autoconf node config that + // we generated above. + if err = mapstructure.Decode(dat, &cfg); err != nil { + panic(err) + } + + return cfg +} + // Generates a new configuration and returns it in HJSON format. This is used // with -genconf. func doGenconf(isjson bool) string { @@ -61,6 +172,7 @@ func main() { flag.Parse() var cfg *nodeConfig + var err error switch { case *version: fmt.Println("Build name:", yggdrasil.GetBuildName()) @@ -71,114 +183,8 @@ func main() { // port numbers, and will use an automatically selected TUN/TAP interface. cfg = config.GenerateConfig(true) case *useconffile != "" || *useconf: - // Use a configuration file. If -useconf, the configuration will be read - // from stdin. If -useconffile, the configuration will be read from the - // filesystem. - var configjson []byte - var err error - if *useconffile != "" { - // Read the file from the filesystem - configjson, err = ioutil.ReadFile(*useconffile) - } else { - // Read the file from stdin. - configjson, err = ioutil.ReadAll(os.Stdin) - } - if err != nil { - panic(err) - } - // If there's a byte order mark - which Windows 10 is now incredibly fond of - // throwing everywhere when it's converting things into UTF-16 for the hell - // of it - remove it and decode back down into UTF-8. This is necessary - // because hjson doesn't know what to do with UTF-16 and will panic - if bytes.Compare(configjson[0:2], []byte{0xFF, 0xFE}) == 0 || - bytes.Compare(configjson[0:2], []byte{0xFE, 0xFF}) == 0 { - utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM) - decoder := utf.NewDecoder() - configjson, err = decoder.Bytes(configjson) - if err != nil { - panic(err) - } - } - // Generate a new configuration - this gives us a set of sane defaults - - // then parse the configuration we loaded above on top of it. The effect - // of this is that any configuration item that is missing from the provided - // configuration will use a sane default. - cfg = config.GenerateConfig(false) - var dat map[string]interface{} - if err := hjson.Unmarshal(configjson, &dat); err != nil { - panic(err) - } - confJson, err := json.Marshal(dat) - if err != nil { - panic(err) - } - json.Unmarshal(confJson, &cfg) - // For now we will do a little bit to help the user adjust their - // configuration to match the new configuration format, as some of the key - // names have changed recently. - changes := map[string]string{ - "Multicast": "", - "LinkLocal": "MulticastInterfaces", - "BoxPub": "EncryptionPublicKey", - "BoxPriv": "EncryptionPrivateKey", - "SigPub": "SigningPublicKey", - "SigPriv": "SigningPrivateKey", - "AllowedBoxPubs": "AllowedEncryptionPublicKeys", - } - // Loop over the mappings aove and see if we have anything to fix. - for from, to := range changes { - if _, ok := dat[from]; ok { - if to == "" { - if !*normaliseconf { - log.Println("Warning: Deprecated config option", from, "- please remove") - } - } else { - if !*normaliseconf { - log.Println("Warning: Deprecated config option", from, "- please rename to", to) - } - // If the configuration file doesn't already contain a line with the - // new name then set it to the old value. This makes sure that we - // don't overwrite something that was put there intentionally. - if _, ok := dat[to]; !ok { - dat[to] = dat[from] - } - } - } - } - // Check to see if the peers are in a parsable format, if not then default - // them to the TCP scheme - if peers, ok := dat["Peers"].([]interface{}); ok { - for index, peer := range peers { - uri := peer.(string) - if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { - continue - } - if strings.HasPrefix(uri, "tcp:") { - uri = uri[4:] - } - (dat["Peers"].([]interface{}))[index] = "tcp://" + uri - } - } - // Now do the same with the interface peers - if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { - for intf, peers := range interfacepeers { - for index, peer := range peers.([]interface{}) { - uri := peer.(string) - if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { - continue - } - if strings.HasPrefix(uri, "tcp:") { - uri = uri[4:] - } - ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri - } - } - } - // Overlay our newly mapped configuration onto the autoconf node config that - // we generated above. - if err = mapstructure.Decode(dat, &cfg); err != nil { - panic(err) - } + // Read the configuration from either stdin or from the filesystem + cfg = readConfig(useconf, useconffile, normaliseconf) // If the -normaliseconf option was specified then remarshal the above // configuration and print it back to stdout. This lets the user update // their configuration file with newly mapped names (like above) or to @@ -214,15 +220,6 @@ func main() { // Setup the Yggdrasil node itself. The node{} type includes a Core, so we // don't need to create this manually. n := node{} - // Check to see if any multicast interface expressions were provided in the - // config. If they were then set them now. - for _, ll := range cfg.MulticastInterfaces { - ifceExpr, err := regexp.Compile(ll) - if err != nil { - panic(err) - } - n.core.AddMulticastInterfaceExpr(ifceExpr) - } // Now that we have a working configuration, we can now actually start // Yggdrasil. This will start the router, switch, DHT node, TCP and UDP // sockets, TUN/TAP adapter and multicast discovery port. @@ -235,27 +232,6 @@ func main() { for _, pBoxStr := range cfg.AllowedEncryptionPublicKeys { n.core.AddAllowedEncryptionPublicKey(pBoxStr) } - // If any static peers were provided in the configuration above then we should - // configure them. The loop ensures that disconnected peers will eventually - // be reconnected with. - go func() { - if len(cfg.Peers) == 0 && len(cfg.InterfacePeers) == 0 { - return - } - for { - for _, peer := range cfg.Peers { - n.core.AddPeer(peer, "") - time.Sleep(time.Second) - } - for intf, intfpeers := range cfg.InterfacePeers { - for _, peer := range intfpeers { - n.core.AddPeer(peer, intf) - time.Sleep(time.Second) - } - } - time.Sleep(time.Minute) - } - }() // The Stop function ensures that the TUN/TAP adapter is correctly shut down // before the program exits. defer func() { @@ -269,7 +245,9 @@ func main() { logger.Printf("Your IPv6 subnet is %s", subnet.String()) // Catch interrupts from the operating system to exit gracefully. c := make(chan os.Signal, 1) + r := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(r, os.Interrupt, syscall.SIGHUP) // Create a function to capture the service being stopped on Windows. winTerminate := func() { c <- os.Interrupt @@ -277,5 +255,18 @@ func main() { minwinsvc.SetOnExit(winTerminate) // Wait for the terminate/interrupt signal. Once a signal is received, the // deferred Stop function above will run which will shut down TUN/TAP. - <-c + for { + select { + case _ = <-r: + if *useconffile != "" { + cfg = readConfig(useconf, useconffile, normaliseconf) + n.core.UpdateConfig(cfg) + } else { + logger.Println("Reloading config at runtime is only possible with -useconffile") + } + case _ = <-c: + goto exit + } + } +exit: } diff --git a/src/yggdrasil/adapter.go b/src/yggdrasil/adapter.go index 7fb6a19e..3ce80d2b 100644 --- a/src/yggdrasil/adapter.go +++ b/src/yggdrasil/adapter.go @@ -3,9 +3,10 @@ package yggdrasil // Defines the minimum required struct members for an adapter type (this is // now the base type for tunAdapter in tun.go) type Adapter struct { - core *Core - send chan<- []byte - recv <-chan []byte + core *Core + send chan<- []byte + recv <-chan []byte + reconfigure chan chan error } // Initialises the adapter. @@ -13,4 +14,5 @@ func (adapter *Adapter) init(core *Core, send chan<- []byte, recv <-chan []byte) adapter.core = core adapter.send = send adapter.recv = recv + adapter.reconfigure = make(chan chan error, 1) } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 2496b309..5524fe25 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -22,10 +22,11 @@ import ( // TODO: Add authentication type admin struct { - core *Core - listenaddr string - listener net.Listener - handlers []admin_handlerInfo + core *Core + reconfigure chan chan error + listenaddr string + listener net.Listener + handlers []admin_handlerInfo } type admin_info map[string]interface{} @@ -51,9 +52,25 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info) } // init runs the initial admin setup. -func (a *admin) init(c *Core, listenaddr string) { +func (a *admin) init(c *Core) { a.core = c - a.listenaddr = listenaddr + a.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-a.reconfigure + a.core.configMutex.RLock() + if a.core.config.AdminListen != a.core.configOld.AdminListen { + a.listenaddr = a.core.config.AdminListen + a.close() + a.start() + } + a.core.configMutex.RUnlock() + e <- nil + } + }() + a.core.configMutex.RLock() + a.listenaddr = a.core.config.AdminListen + a.core.configMutex.RUnlock() a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) { handlers := make(map[string]interface{}) for _, handler := range a.handlers { @@ -331,7 +348,10 @@ func (a *admin) init(c *Core, listenaddr string) { } var box_pub_key, coords string if in["box_pub_key"] == nil && in["coords"] == nil { - nodeinfo := []byte(a.core.nodeinfo.getNodeInfo()) + var nodeinfo []byte + a.core.router.doAdmin(func() { + nodeinfo = []byte(a.core.router.nodeinfo.getNodeInfo()) + }) var jsoninfo interface{} if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil { return admin_info{}, err @@ -842,7 +862,7 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) ( copy(key[:], keyBytes) } if !nocache { - if response, err := a.core.nodeinfo.getCachedNodeInfo(key); err == nil { + if response, err := a.core.router.nodeinfo.getCachedNodeInfo(key); err == nil { return response, nil } } @@ -860,14 +880,14 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) ( } response := make(chan *nodeinfoPayload, 1) sendNodeInfoRequest := func() { - a.core.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { + a.core.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { defer func() { recover() }() select { case response <- nodeinfo: default: } }) - a.core.nodeinfo.sendNodeInfo(key, coords, false) + a.core.router.nodeinfo.sendNodeInfo(key, coords, false) } a.core.router.doAdmin(sendNodeInfoRequest) go func() { diff --git a/src/yggdrasil/ckr.go b/src/yggdrasil/ckr.go index a3df891e..a5ed4552 100644 --- a/src/yggdrasil/ckr.go +++ b/src/yggdrasil/ckr.go @@ -18,6 +18,7 @@ import ( type cryptokey struct { core *Core enabled bool + reconfigure chan chan error ipv4routes []cryptokey_route ipv6routes []cryptokey_route ipv4cache map[address.Address]cryptokey_route @@ -34,12 +35,75 @@ type cryptokey_route struct { // Initialise crypto-key routing. This must be done before any other CKR calls. func (c *cryptokey) init(core *Core) { c.core = core - c.ipv4routes = make([]cryptokey_route, 0) + c.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-c.reconfigure + var err error + c.core.router.doAdmin(func() { + err = c.core.router.cryptokey.configure() + }) + e <- err + } + }() + + if err := c.configure(); err != nil { + c.core.log.Println("CKR configuration failed:", err) + } +} + +// Configure the CKR routes - this must only ever be called from the router +// goroutine, e.g. through router.doAdmin +func (c *cryptokey) configure() error { + c.core.configMutex.RLock() + defer c.core.configMutex.RUnlock() + + // Set enabled/disabled state + c.setEnabled(c.core.config.TunnelRouting.Enable) + + // Clear out existing routes c.ipv6routes = make([]cryptokey_route, 0) + c.ipv4routes = make([]cryptokey_route, 0) + + // Add IPv6 routes + for ipv6, pubkey := range c.core.config.TunnelRouting.IPv6Destinations { + if err := c.addRoute(ipv6, pubkey); err != nil { + return err + } + } + + // Add IPv4 routes + for ipv4, pubkey := range c.core.config.TunnelRouting.IPv4Destinations { + if err := c.addRoute(ipv4, pubkey); err != nil { + return err + } + } + + // Clear out existing sources + c.ipv6sources = make([]net.IPNet, 0) + c.ipv4sources = make([]net.IPNet, 0) + + // Add IPv6 sources + c.ipv6sources = make([]net.IPNet, 0) + for _, source := range c.core.config.TunnelRouting.IPv6Sources { + if err := c.addSourceSubnet(source); err != nil { + return err + } + } + + // Add IPv4 sources + c.ipv4sources = make([]net.IPNet, 0) + for _, source := range c.core.config.TunnelRouting.IPv4Sources { + if err := c.addSourceSubnet(source); err != nil { + return err + } + } + + // Wipe the caches c.ipv4cache = make(map[address.Address]cryptokey_route, 0) c.ipv6cache = make(map[address.Address]cryptokey_route, 0) - c.ipv4sources = make([]net.IPNet, 0) - c.ipv6sources = make([]net.IPNet, 0) + + return nil } // Enable or disable crypto-key routing. diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index a1c2127d..3382562e 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -2,11 +2,11 @@ package yggdrasil import ( "encoding/hex" - "fmt" "io/ioutil" "log" "net" - "regexp" + "sync" + "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -17,10 +17,20 @@ import ( var buildName string var buildVersion string +type module interface { + init(*Core, *config.NodeConfig) error + start() error +} + // The Core object represents the Yggdrasil node. You should create a Core // object for each Yggdrasil node you plan to run. type Core struct { // This is the main data structure that holds everything else for a node + // We're going to keep our own copy of the provided config - that way we can + // guarantee that it will be covered by the mutex + config config.NodeConfig // Active config + configOld config.NodeConfig // Previous config + configMutex sync.RWMutex // Protects both config and configOld boxPub crypto.BoxPubKey boxPriv crypto.BoxPrivKey sigPub crypto.SigPubKey @@ -33,17 +43,12 @@ type Core struct { admin admin searches searches multicast multicast - nodeinfo nodeinfo tcp tcpInterface awdl awdl log *log.Logger - ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } -func (c *Core) init(bpub *crypto.BoxPubKey, - bpriv *crypto.BoxPrivKey, - spub *crypto.SigPubKey, - spriv *crypto.SigPrivKey) { +func (c *Core) init() error { // TODO separate init and start functions // Init sets up structs // Start launches goroutines that depend on structs being set up @@ -51,20 +56,104 @@ func (c *Core) init(bpub *crypto.BoxPubKey, if c.log == nil { c.log = log.New(ioutil.Discard, "", 0) } - c.boxPub, c.boxPriv = *bpub, *bpriv - c.sigPub, c.sigPriv = *spub, *spriv - c.admin.core = c + + boxPubHex, err := hex.DecodeString(c.config.EncryptionPublicKey) + if err != nil { + return err + } + boxPrivHex, err := hex.DecodeString(c.config.EncryptionPrivateKey) + if err != nil { + return err + } + sigPubHex, err := hex.DecodeString(c.config.SigningPublicKey) + if err != nil { + return err + } + sigPrivHex, err := hex.DecodeString(c.config.SigningPrivateKey) + if err != nil { + return err + } + + copy(c.boxPub[:], boxPubHex) + copy(c.boxPriv[:], boxPrivHex) + copy(c.sigPub[:], sigPubHex) + copy(c.sigPriv[:], sigPrivHex) + + c.admin.init(c) c.searches.init(c) c.dht.init(c) c.sessions.init(c) c.multicast.init(c) c.peers.init(c) c.router.init(c) - c.switchTable.init(c, c.sigPub) // TODO move before peers? before router? + c.switchTable.init(c) // TODO move before peers? before router? + + return nil } -// Get the current build name. This is usually injected if built from git, -// or returns "unknown" otherwise. +// If any static peers were provided in the configuration above then we should +// configure them. The loop ensures that disconnected peers will eventually +// be reconnected with. +func (c *Core) addPeerLoop() { + for { + // Get the peers from the config - these could change! + c.configMutex.RLock() + peers := c.config.Peers + interfacepeers := c.config.InterfacePeers + c.configMutex.RUnlock() + + // Add peers from the Peers section + for _, peer := range peers { + c.AddPeer(peer, "") + time.Sleep(time.Second) + } + + // Add peers from the InterfacePeers section + for intf, intfpeers := range interfacepeers { + for _, peer := range intfpeers { + c.AddPeer(peer, intf) + time.Sleep(time.Second) + } + } + + // Sit for a while + time.Sleep(time.Minute) + } +} + +// UpdateConfig updates the configuration in Core and then signals the +// various module goroutines to reconfigure themselves if needed +func (c *Core) UpdateConfig(config *config.NodeConfig) { + c.configMutex.Lock() + c.configOld = c.config + c.config = *config + c.configMutex.Unlock() + + components := []chan chan error{ + c.admin.reconfigure, + c.searches.reconfigure, + c.dht.reconfigure, + c.sessions.reconfigure, + c.peers.reconfigure, + c.router.reconfigure, + c.router.tun.reconfigure, + c.router.cryptokey.reconfigure, + c.switchTable.reconfigure, + c.tcp.reconfigure, + c.multicast.reconfigure, + } + + for _, component := range components { + response := make(chan error) + component <- response + if err := <-response; err != nil { + c.log.Println(err) + } + } +} + +// GetBuildName gets the current build name. This is usually injected if built +// from git, or returns "unknown" otherwise. func GetBuildName() string { if buildName == "" { return "unknown" @@ -97,38 +186,14 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.log.Println("Starting up...") - var boxPub crypto.BoxPubKey - var boxPriv crypto.BoxPrivKey - var sigPub crypto.SigPubKey - var sigPriv crypto.SigPrivKey - boxPubHex, err := hex.DecodeString(nc.EncryptionPublicKey) - if err != nil { - return err - } - boxPrivHex, err := hex.DecodeString(nc.EncryptionPrivateKey) - if err != nil { - return err - } - sigPubHex, err := hex.DecodeString(nc.SigningPublicKey) - if err != nil { - return err - } - sigPrivHex, err := hex.DecodeString(nc.SigningPrivateKey) - if err != nil { - return err - } - copy(boxPub[:], boxPubHex) - copy(boxPriv[:], boxPrivHex) - copy(sigPub[:], sigPubHex) - copy(sigPriv[:], sigPrivHex) + c.configMutex.Lock() + c.config = *nc + c.configOld = c.config + c.configMutex.Unlock() - c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) - c.admin.init(c, nc.AdminListen) + c.init() - c.nodeinfo.init(c) - c.nodeinfo.setNodeInfo(nc.NodeInfo, nc.NodeInfoPrivacy) - - if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil { + if err := c.tcp.init(c); err != nil { c.log.Println("Failed to start TCP interface") return err } @@ -161,31 +226,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - c.router.cryptokey.setEnabled(nc.TunnelRouting.Enable) - if c.router.cryptokey.isEnabled() { - c.log.Println("Crypto-key routing enabled") - for ipv6, pubkey := range nc.TunnelRouting.IPv6Destinations { - if err := c.router.cryptokey.addRoute(ipv6, pubkey); err != nil { - panic(err) - } - } - for _, source := range nc.TunnelRouting.IPv6Sources { - if c.router.cryptokey.addSourceSubnet(source); err != nil { - panic(err) - } - } - for ipv4, pubkey := range nc.TunnelRouting.IPv4Destinations { - if err := c.router.cryptokey.addRoute(ipv4, pubkey); err != nil { - panic(err) - } - } - for _, source := range nc.TunnelRouting.IPv4Sources { - if c.router.cryptokey.addSourceSubnet(source); err != nil { - panic(err) - } - } - } - if err := c.admin.start(); err != nil { c.log.Println("Failed to start admin socket") return err @@ -196,12 +236,13 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - ip := net.IP(c.router.addr[:]).String() - if err := c.router.tun.start(nc.IfName, nc.IfTAPMode, fmt.Sprintf("%s/%d", ip, 8*len(address.GetPrefix())-1), nc.IfMTU); err != nil { + if err := c.router.tun.start(); err != nil { c.log.Println("Failed to start TUN/TAP") return err } + go c.addPeerLoop() + c.log.Println("Startup complete") return nil } @@ -250,12 +291,12 @@ func (c *Core) GetSubnet() *net.IPNet { // Gets the nodeinfo. func (c *Core) GetNodeInfo() nodeinfoPayload { - return c.nodeinfo.getNodeInfo() + return c.router.nodeinfo.getNodeInfo() } // Sets the nodeinfo. func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) { - c.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) + c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) } // Sets the output logger of the Yggdrasil node after startup. This may be @@ -270,13 +311,6 @@ func (c *Core) AddPeer(addr string, sintf string) error { return c.admin.addPeer(addr, sintf) } -// Adds an expression to select multicast interfaces for peer discovery. This -// should be done before calling Start. This function can be called multiple -// times to add multiple search expressions. -func (c *Core) AddMulticastInterfaceExpr(expr *regexp.Regexp) { - c.ifceExpr = append(c.ifceExpr, expr) -} - // Adds an allowed public key. This allow peerings to be restricted only to // keys that you have selected. func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 4a32eb64..7c6c7570 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -16,6 +16,7 @@ import "fmt" import "net" import "log" import "regexp" +import "encoding/hex" import _ "net/http/pprof" import "net/http" @@ -23,6 +24,7 @@ import "runtime" import "os" import "github.com/yggdrasil-network/yggdrasil-go/src/address" +import "github.com/yggdrasil-network/yggdrasil-go/src/config" import "github.com/yggdrasil-network/yggdrasil-go/src/crypto" import "github.com/yggdrasil-network/yggdrasil-go/src/defaults" @@ -52,7 +54,17 @@ func StartProfiler(log *log.Logger) error { func (c *Core) Init() { bpub, bpriv := crypto.NewBoxKeys() spub, spriv := crypto.NewSigKeys() - c.init(bpub, bpriv, spub, spriv) + hbpub := hex.EncodeToString(bpub[:]) + hbpriv := hex.EncodeToString(bpriv[:]) + hspub := hex.EncodeToString(spub[:]) + hspriv := hex.EncodeToString(spriv[:]) + c.config = config.NodeConfig{ + EncryptionPublicKey: hbpub, + EncryptionPrivateKey: hbpriv, + SigningPublicKey: hspub, + SigningPrivateKey: hspriv, + } + c.init( /*bpub, bpriv, spub, spriv*/ ) c.switchTable.start() c.router.start() } @@ -350,7 +362,7 @@ func (c *Core) DEBUG_init(bpub []byte, bpriv []byte, spub []byte, spriv []byte) { - var boxPub crypto.BoxPubKey + /*var boxPub crypto.BoxPubKey var boxPriv crypto.BoxPrivKey var sigPub crypto.SigPubKey var sigPriv crypto.SigPrivKey @@ -358,7 +370,18 @@ func (c *Core) DEBUG_init(bpub []byte, copy(boxPriv[:], bpriv) copy(sigPub[:], spub) copy(sigPriv[:], spriv) - c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) + c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)*/ + hbpub := hex.EncodeToString(bpub[:]) + hbpriv := hex.EncodeToString(bpriv[:]) + hspub := hex.EncodeToString(spub[:]) + hspriv := hex.EncodeToString(spriv[:]) + c.config = config.NodeConfig{ + EncryptionPublicKey: hbpub, + EncryptionPrivateKey: hbpriv, + SigningPublicKey: hspub, + SigningPrivateKey: hspriv, + } + c.init( /*bpub, bpriv, spub, spriv*/ ) if err := c.router.start(); err != nil { panic(err) @@ -427,7 +450,8 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) { //* func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) { - if err := c.tcp.init(c, addrport, 0); err != nil { + c.config.Listen = addrport + if err := c.tcp.init(c /*, addrport, 0*/); err != nil { c.log.Println("Failed to start TCP interface:", err) panic(err) } @@ -474,7 +498,8 @@ func (c *Core) DEBUG_addKCPConn(saddr string) { func (c *Core) DEBUG_setupAndStartAdminInterface(addrport string) { a := admin{} - a.init(c, addrport) + c.config.AdminListen = addrport + a.init(c /*, addrport*/) c.admin = a } @@ -492,7 +517,7 @@ func (c *Core) DEBUG_setLogger(log *log.Logger) { } func (c *Core) DEBUG_setIfceExpr(expr *regexp.Regexp) { - c.ifceExpr = append(c.ifceExpr, expr) + c.log.Println("DEBUG_setIfceExpr no longer implemented") } func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b52a820b..5427aca9 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -65,11 +65,12 @@ type dhtReqKey struct { // The main DHT struct. type dht struct { - core *Core - nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks + core *Core + reconfigure chan chan error + nodeID crypto.NodeID + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -78,6 +79,13 @@ type dht struct { // Initializes the DHT. func (t *dht) init(c *Core) { t.core = c + t.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-t.reconfigure + e <- nil + } + }() t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) t.callbacks = make(map[dhtReqKey]dht_callbackInfo) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index c0a676a8..08f0954e 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -4,30 +4,43 @@ import ( "context" "fmt" "net" + "regexp" + "sync" "time" "golang.org/x/net/ipv6" ) type multicast struct { - core *Core - sock *ipv6.PacketConn - groupAddr string + core *Core + reconfigure chan chan error + sock *ipv6.PacketConn + groupAddr string + myAddr *net.TCPAddr + myAddrMutex sync.RWMutex } func (m *multicast) init(core *Core) { m.core = core + m.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-m.reconfigure + m.myAddrMutex.Lock() + m.myAddr = m.core.tcp.getAddr() + m.myAddrMutex.Unlock() + e <- nil + } + }() m.groupAddr = "[ff02::114]:9001" // Check if we've been given any expressions - if len(m.core.ifceExpr) == 0 { - return + if count := len(m.interfaces()); count != 0 { + m.core.log.Println("Found", count, "multicast interface(s)") } - // Ask the system for network interfaces - m.core.log.Println("Found", len(m.interfaces()), "multicast interface(s)") } func (m *multicast) start() error { - if len(m.core.ifceExpr) == 0 { + if len(m.interfaces()) == 0 { m.core.log.Println("Multicast discovery is disabled") } else { m.core.log.Println("Multicast discovery is enabled") @@ -55,6 +68,10 @@ func (m *multicast) start() error { } func (m *multicast) interfaces() []net.Interface { + // Get interface expressions from config + m.core.configMutex.RLock() + exprs := m.core.config.MulticastInterfaces + m.core.configMutex.RUnlock() // Ask the system for network interfaces var interfaces []net.Interface allifaces, err := net.Interfaces() @@ -75,8 +92,12 @@ func (m *multicast) interfaces() []net.Interface { // Ignore point-to-point interfaces continue } - for _, expr := range m.core.ifceExpr { - if expr.MatchString(iface.Name) { + for _, expr := range exprs { + e, err := regexp.Compile(expr) + if err != nil { + panic(err) + } + if e.MatchString(iface.Name) { interfaces = append(interfaces, iface) } } @@ -85,13 +106,14 @@ func (m *multicast) interfaces() []net.Interface { } func (m *multicast) announce() { + var anAddr net.TCPAddr + m.myAddrMutex.Lock() + m.myAddr = m.core.tcp.getAddr() + m.myAddrMutex.Unlock() groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) } - var anAddr net.TCPAddr - myAddr := m.core.tcp.getAddr() - anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) @@ -103,6 +125,9 @@ func (m *multicast) announce() { if err != nil { panic(err) } + m.myAddrMutex.RLock() + anAddr.Port = m.myAddr.Port + m.myAddrMutex.RUnlock() for _, addr := range addrs { addrIP, _, _ := net.ParseCIDR(addr.String()) if addrIP.To4() != nil { diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index b9076328..963a2fc2 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -170,7 +170,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse nodeinfo := nodeinfoReqRes{ SendCoords: table.self.getCoords(), IsResponse: isResponse, - NodeInfo: m.core.nodeinfo.getNodeInfo(), + NodeInfo: m.getNodeInfo(), } bs := nodeinfo.encode() shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index fd045419..c83504fc 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -19,6 +19,7 @@ import ( // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { core *Core + reconfigure chan chan error mutex sync.Mutex // Synchronize writes to atomic ports atomic.Value //map[switchPort]*peer, use CoW semantics authMutex sync.RWMutex @@ -31,6 +32,13 @@ func (ps *peers) init(c *Core) { defer ps.mutex.Unlock() ps.putPorts(make(map[switchPort]*peer)) ps.core = c + ps.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-ps.reconfigure + e <- nil + } + }() ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{}) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 87da8829..11509d44 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -37,19 +37,21 @@ import ( // The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { - core *Core - addr address.Address - subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - toRecv chan router_recvPacket // packets to handle via recvPacket() - tun tunAdapter // TUN/TAP adapter - adapters []Adapter // Other adapters - recv chan<- []byte // place where the tun pulls received packets from - send <-chan []byte // place where the tun puts outgoing packets - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff - cryptokey cryptokey + core *Core + reconfigure chan chan error + addr address.Address + subnet address.Subnet + in <-chan []byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + toRecv chan router_recvPacket // packets to handle via recvPacket() + tun tunAdapter // TUN/TAP adapter + adapters []Adapter // Other adapters + recv chan<- []byte // place where the tun pulls received packets from + send <-chan []byte // place where the tun puts outgoing packets + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff + cryptokey cryptokey + nodeinfo nodeinfo } // Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun. @@ -61,6 +63,7 @@ type router_recvPacket struct { // Initializes the router struct, which includes setting up channels to/from the tun/tap. func (r *router) init(core *Core) { r.core = core + r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... @@ -83,6 +86,9 @@ func (r *router) init(core *Core) { r.send = send r.reset = make(chan struct{}, 1) r.admin = make(chan func(), 32) + r.core.configMutex.RLock() + r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) + r.core.configMutex.RUnlock() r.cryptokey.init(r.core) r.tun.init(r.core, send, recv) } @@ -124,6 +130,10 @@ func (r *router) mainLoop() { } case f := <-r.admin: f() + case e := <-r.reconfigure: + r.core.configMutex.RLock() + e <- r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) + r.core.configMutex.RUnlock() } } } @@ -463,7 +473,7 @@ func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { return } req.SendPermPub = *fromKey - r.core.nodeinfo.handleNodeInfo(&req) + r.nodeinfo.handleNodeInfo(&req) } // Passed a function to call. diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 756ad278..c391dda2 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -42,13 +42,21 @@ type searchInfo struct { // This stores a map of active searches. type searches struct { - core *Core - searches map[crypto.NodeID]*searchInfo + core *Core + reconfigure chan chan error + searches map[crypto.NodeID]*searchInfo } // Intializes the searches struct. func (s *searches) init(core *Core) { s.core = core + s.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-s.reconfigure + e <- nil + } + }() s.searches = make(map[crypto.NodeID]*searchInfo) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 4f395b07..e29cd4fa 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -7,6 +7,7 @@ package yggdrasil import ( "bytes" "encoding/hex" + "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -18,6 +19,7 @@ import ( // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { core *Core + reconfigure chan chan error theirAddr address.Address theirSubnet address.Subnet theirPermPub crypto.BoxPubKey @@ -101,6 +103,7 @@ func (s *sessionInfo) timedout() bool { // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { core *Core + reconfigure chan chan error lastCleanup time.Time // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey @@ -113,6 +116,7 @@ type sessions struct { addrToPerm map[address.Address]*crypto.BoxPubKey subnetToPerm map[address.Subnet]*crypto.BoxPubKey // Options from the session firewall + sessionFirewallMutex sync.RWMutex sessionFirewallEnabled bool sessionFirewallAllowsDirect bool sessionFirewallAllowsRemote bool @@ -124,6 +128,24 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(core *Core) { ss.core = core + ss.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-ss.reconfigure + responses := make(map[crypto.Handle]chan error) + for index, session := range ss.sinfos { + responses[index] = make(chan error) + session.reconfigure <- responses[index] + } + for _, response := range responses { + if err := <-response; err != nil { + e <- err + continue + } + } + e <- nil + } + }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle) @@ -135,12 +157,16 @@ func (ss *sessions) init(core *Core) { // Enable or disable the session firewall func (ss *sessions) setSessionFirewallState(enabled bool) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallEnabled = enabled } // Set the session firewall defaults (first parameter is whether to allow // sessions from direct peers, second is whether to allow from remote nodes). func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote bool, alwaysAllowsOutbound bool) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallAllowsDirect = allowsDirect ss.sessionFirewallAllowsRemote = allowsRemote ss.sessionFirewallAlwaysAllowsOutbound = alwaysAllowsOutbound @@ -148,17 +174,24 @@ func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote b // Set the session firewall whitelist - nodes always allowed to open sessions. func (ss *sessions) setSessionFirewallWhitelist(whitelist []string) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallWhitelist = whitelist } // Set the session firewall blacklist - nodes never allowed to open sessions. func (ss *sessions) setSessionFirewallBlacklist(blacklist []string) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallBlacklist = blacklist } // Determines whether the session with a given publickey is allowed based on // session firewall rules. func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { + ss.sessionFirewallMutex.RLock() + defer ss.sessionFirewallMutex.RUnlock() + // Allow by default if the session firewall is disabled if !ss.sessionFirewallEnabled { return true @@ -264,13 +297,12 @@ func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool) // Creates a new session and lazily cleans up old/timedout existing sessions. // This includse initializing session info to sane defaults (e.g. lowest supported MTU). func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { - if ss.sessionFirewallEnabled { - if !ss.isSessionAllowed(theirPermKey, true) { - return nil - } + if !ss.isSessionAllowed(theirPermKey, true) { + return nil } sinfo := sessionInfo{} sinfo.core = ss.core + sinfo.reconfigure = make(chan chan error, 1) sinfo.theirPermPub = *theirPermKey pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub @@ -442,11 +474,14 @@ func (ss *sessions) handlePing(ping *sessionPing) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) // Check the session firewall + ss.sessionFirewallMutex.RLock() if !isIn && ss.sessionFirewallEnabled { if !ss.isSessionAllowed(&ping.SendPermPub, false) { + ss.sessionFirewallMutex.RUnlock() return } } + ss.sessionFirewallMutex.RUnlock() if !isIn || sinfo.timedout() { if isIn { sinfo.close() @@ -539,6 +574,8 @@ func (sinfo *sessionInfo) doWorker() { } else { return } + case e := <-sinfo.reconfigure: + e <- nil } } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 2b0d5d37..2a4e7e46 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -162,6 +162,7 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core + reconfigure chan chan error key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root @@ -181,11 +182,14 @@ type switchTable struct { const SwitchQueueTotalMinSize = 4 * 1024 * 1024 // Initializes the switchTable struct. -func (t *switchTable) init(core *Core, key crypto.SigPubKey) { +func (t *switchTable) init(core *Core) { now := time.Now() t.core = core - t.key = key - locator := switchLocator{root: key, tstamp: now.Unix()} + t.reconfigure = make(chan chan error, 1) + t.core.configMutex.RLock() + t.key = t.core.sigPub + t.core.configMutex.RUnlock() + locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} t.updater.Store(&sync.Once{}) @@ -808,6 +812,8 @@ func (t *switchTable) doWorker() { } case f := <-t.admin: f() + case e := <-t.reconfigure: + e <- nil } } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 8d8fee34..c90c3ffb 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -39,8 +39,11 @@ const tcp_ping_interval = (default_tcp_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core + reconfigure chan chan error serv net.Listener + serv_stop chan bool tcp_timeout time.Duration + tcp_addr string mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) @@ -81,10 +84,37 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { } // Initializes the struct. -func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err error) { +func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core + iface.serv_stop = make(chan bool, 1) + iface.reconfigure = make(chan chan error, 1) + go func() { + for { + e := <-iface.reconfigure + iface.core.configMutex.RLock() + updated := iface.core.config.Listen != iface.core.configOld.Listen + iface.core.configMutex.RUnlock() + if updated { + iface.serv_stop <- true + iface.serv.Close() + e <- iface.listen() + } else { + e <- nil + } + } + }() + + return iface.listen() +} + +func (iface *tcpInterface) listen() error { + var err error + + iface.core.configMutex.RLock() + iface.tcp_addr = iface.core.config.Listen + iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond + iface.core.configMutex.RUnlock() - iface.tcp_timeout = time.Duration(readTimeout) * time.Millisecond if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { iface.tcp_timeout = default_tcp_timeout } @@ -93,11 +123,12 @@ func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err lc := net.ListenConfig{ Control: iface.tcpContext, } - iface.serv, err = lc.Listen(ctx, "tcp", addr) + iface.serv, err = lc.Listen(ctx, "tcp", iface.tcp_addr) if err == nil { iface.calls = make(map[string]struct{}) iface.conns = make(map[tcpInfo](chan struct{})) go iface.listener() + return nil } return err @@ -110,12 +141,38 @@ func (iface *tcpInterface) listener() { for { sock, err := iface.serv.Accept() if err != nil { - panic(err) + iface.core.log.Println("Failed to accept connection:", err) + return + } + select { + case <-iface.serv_stop: + iface.core.log.Println("Stopping listener") + return + default: + if err != nil { + panic(err) + } + go iface.handler(sock, true) } - go iface.handler(sock, true) } } +// Checks if we already have a connection to this node +func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.conns[info] + return isIn +} + +// Checks if we already are calling this address +func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.calls[saddr] + return isIn +} + // Checks if a connection already exists. // If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. // If the dial is successful, it launches the handler. @@ -127,25 +184,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - quit := false - iface.mutex.Lock() - if _, isIn := iface.calls[callname]; isIn { - quit = true - } else { - iface.calls[callname] = struct{}{} - defer func() { - // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() - }() - } - iface.mutex.Unlock() - if quit { + if iface.isAlreadyCalling(saddr) { return } + iface.calls[callname] = struct{}{} + defer func() { + // Block new calls for a little while, to mitigate livelock scenarios + time.Sleep(default_tcp_timeout) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + iface.mutex.Lock() + delete(iface.calls, callname) + iface.mutex.Unlock() + }() var conn net.Conn var err error if socksaddr != nil { @@ -252,9 +302,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? return } + remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) + localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) + if e1 != nil || e2 != nil { + return + } info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, + box: meta.box, + sig: meta.sig, + localAddr: localAddr, + remoteAddr: remoteAddr, + } + if iface.isAlreadyConnected(info) { + return } // Quit the parent call if this is a connection to ourself equiv := func(k1, k2 []byte) bool { @@ -265,14 +325,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } return true } - if equiv(info.box[:], iface.core.boxPub[:]) { + if equiv(meta.box[:], iface.core.boxPub[:]) { return } - if equiv(info.sig[:], iface.core.sigPub[:]) { + if equiv(meta.sig[:], iface.core.sigPub[:]) { return } // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) { + if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { // Allow unauthorized peers if they're link-local raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) raddr := net.ParseIP(raddrStr) @@ -281,15 +341,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } } // Check if we already have a connection to this node, close and block if yes - info.localAddr, _, _ = net.SplitHostPort(sock.LocalAddr().String()) - info.remoteAddr, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) iface.mutex.Lock() - if blockChan, isIn := iface.conns[info]; isIn { + /*if blockChan, isIn := iface.conns[info]; isIn { iface.mutex.Unlock() sock.Close() <-blockChan return - } + }*/ blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() @@ -301,7 +359,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) + p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) in := func(bs []byte) { p.handlePacket(bs) @@ -363,16 +421,16 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&info.box) + themNodeID := crypto.GetNodeID(&meta.box) themAddr := address.AddrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) - iface.core.log.Println("Connected:", themString, "source", us) + iface.core.log.Printf("Connected: %s, source: %s", themString, us) err = iface.reader(sock, in) // In this goroutine, because of defers if err == nil { - iface.core.log.Println("Disconnected:", themString, "source", us) + iface.core.log.Printf("Disconnected: %s, source: %s", themString, us) } else { - iface.core.log.Println("Disconnected:", themString, "source", us, "with error:", err) + iface.core.log.Printf("Disconnected: %s, source: %s, error: %s", themString, us, err) } return } diff --git a/src/yggdrasil/tun.go b/src/yggdrasil/tun.go index 8c0f91d5..c0a71396 100644 --- a/src/yggdrasil/tun.go +++ b/src/yggdrasil/tun.go @@ -5,6 +5,8 @@ package yggdrasil import ( "bytes" "errors" + "fmt" + "net" "sync" "time" @@ -42,11 +44,33 @@ func getSupportedMTU(mtu int) int { func (tun *tunAdapter) init(core *Core, send chan<- []byte, recv <-chan []byte) { tun.Adapter.init(core, send, recv) tun.icmpv6.init(tun) + go func() { + for { + e := <-tun.reconfigure + tun.core.configMutex.RLock() + updated := tun.core.config.IfName != tun.core.configOld.IfName || + tun.core.config.IfTAPMode != tun.core.configOld.IfTAPMode || + tun.core.config.IfMTU != tun.core.configOld.IfMTU + tun.core.configMutex.RUnlock() + if updated { + tun.core.log.Println("Reconfiguring TUN/TAP is not supported yet") + e <- nil + } else { + e <- nil + } + } + }() } // Starts the setup process for the TUN/TAP adapter, and if successful, starts // the read/write goroutines to handle packets on that interface. -func (tun *tunAdapter) start(ifname string, iftapmode bool, addr string, mtu int) error { +func (tun *tunAdapter) start() error { + tun.core.configMutex.RLock() + ifname := tun.core.config.IfName + iftapmode := tun.core.config.IfTAPMode + addr := fmt.Sprintf("%s/%d", net.IP(tun.core.router.addr[:]).String(), 8*len(address.GetPrefix())-1) + mtu := tun.core.config.IfMTU + tun.core.configMutex.RUnlock() if ifname != "none" { if err := tun.setup(ifname, iftapmode, addr, mtu); err != nil { return err