Merge pull request #297 from neilalexander/nodeconfig

Use central NodeConfig for components
This commit is contained in:
Neil Alexander 2019-01-15 23:51:58 +00:00 committed by GitHub
commit 8fa9b84108
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 642 additions and 322 deletions

View File

@ -9,10 +9,8 @@ import (
"log"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
"golang.org/x/text/encoding/unicode"
@ -31,6 +29,119 @@ type node struct {
core Core
}
func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig {
// Use a configuration file. If -useconf, the configuration will be read
// from stdin. If -useconffile, the configuration will be read from the
// filesystem.
var conf []byte
var err error
if *useconffile != "" {
// Read the file from the filesystem
conf, err = ioutil.ReadFile(*useconffile)
} else {
// Read the file from stdin.
conf, err = ioutil.ReadAll(os.Stdin)
}
if err != nil {
panic(err)
}
// If there's a byte order mark - which Windows 10 is now incredibly fond of
// throwing everywhere when it's converting things into UTF-16 for the hell
// of it - remove it and decode back down into UTF-8. This is necessary
// because hjson doesn't know what to do with UTF-16 and will panic
if bytes.Compare(conf[0:2], []byte{0xFF, 0xFE}) == 0 ||
bytes.Compare(conf[0:2], []byte{0xFE, 0xFF}) == 0 {
utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM)
decoder := utf.NewDecoder()
conf, err = decoder.Bytes(conf)
if err != nil {
panic(err)
}
}
// Generate a new configuration - this gives us a set of sane defaults -
// then parse the configuration we loaded above on top of it. The effect
// of this is that any configuration item that is missing from the provided
// configuration will use a sane default.
cfg := config.GenerateConfig(false)
var dat map[string]interface{}
if err := hjson.Unmarshal(conf, &dat); err != nil {
panic(err)
}
confJson, err := json.Marshal(dat)
if err != nil {
panic(err)
}
json.Unmarshal(confJson, &cfg)
// For now we will do a little bit to help the user adjust their
// configuration to match the new configuration format, as some of the key
// names have changed recently.
changes := map[string]string{
"Multicast": "",
"LinkLocal": "MulticastInterfaces",
"BoxPub": "EncryptionPublicKey",
"BoxPriv": "EncryptionPrivateKey",
"SigPub": "SigningPublicKey",
"SigPriv": "SigningPrivateKey",
"AllowedBoxPubs": "AllowedEncryptionPublicKeys",
}
// Loop over the mappings aove and see if we have anything to fix.
for from, to := range changes {
if _, ok := dat[from]; ok {
if to == "" {
if !*normaliseconf {
log.Println("Warning: Deprecated config option", from, "- please remove")
}
} else {
if !*normaliseconf {
log.Println("Warning: Deprecated config option", from, "- please rename to", to)
}
// If the configuration file doesn't already contain a line with the
// new name then set it to the old value. This makes sure that we
// don't overwrite something that was put there intentionally.
if _, ok := dat[to]; !ok {
dat[to] = dat[from]
}
}
}
}
// Check to see if the peers are in a parsable format, if not then default
// them to the TCP scheme
if peers, ok := dat["Peers"].([]interface{}); ok {
for index, peer := range peers {
uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") {
continue
}
if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:]
}
(dat["Peers"].([]interface{}))[index] = "tcp://" + uri
}
}
// Now do the same with the interface peers
if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok {
for intf, peers := range interfacepeers {
for index, peer := range peers.([]interface{}) {
uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") {
continue
}
if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:]
}
((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri
}
}
}
// Overlay our newly mapped configuration onto the autoconf node config that
// we generated above.
if err = mapstructure.Decode(dat, &cfg); err != nil {
panic(err)
}
return cfg
}
// Generates a new configuration and returns it in HJSON format. This is used
// with -genconf.
func doGenconf(isjson bool) string {
@ -61,6 +172,7 @@ func main() {
flag.Parse()
var cfg *nodeConfig
var err error
switch {
case *version:
fmt.Println("Build name:", yggdrasil.GetBuildName())
@ -71,114 +183,8 @@ func main() {
// port numbers, and will use an automatically selected TUN/TAP interface.
cfg = config.GenerateConfig(true)
case *useconffile != "" || *useconf:
// Use a configuration file. If -useconf, the configuration will be read
// from stdin. If -useconffile, the configuration will be read from the
// filesystem.
var configjson []byte
var err error
if *useconffile != "" {
// Read the file from the filesystem
configjson, err = ioutil.ReadFile(*useconffile)
} else {
// Read the file from stdin.
configjson, err = ioutil.ReadAll(os.Stdin)
}
if err != nil {
panic(err)
}
// If there's a byte order mark - which Windows 10 is now incredibly fond of
// throwing everywhere when it's converting things into UTF-16 for the hell
// of it - remove it and decode back down into UTF-8. This is necessary
// because hjson doesn't know what to do with UTF-16 and will panic
if bytes.Compare(configjson[0:2], []byte{0xFF, 0xFE}) == 0 ||
bytes.Compare(configjson[0:2], []byte{0xFE, 0xFF}) == 0 {
utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM)
decoder := utf.NewDecoder()
configjson, err = decoder.Bytes(configjson)
if err != nil {
panic(err)
}
}
// Generate a new configuration - this gives us a set of sane defaults -
// then parse the configuration we loaded above on top of it. The effect
// of this is that any configuration item that is missing from the provided
// configuration will use a sane default.
cfg = config.GenerateConfig(false)
var dat map[string]interface{}
if err := hjson.Unmarshal(configjson, &dat); err != nil {
panic(err)
}
confJson, err := json.Marshal(dat)
if err != nil {
panic(err)
}
json.Unmarshal(confJson, &cfg)
// For now we will do a little bit to help the user adjust their
// configuration to match the new configuration format, as some of the key
// names have changed recently.
changes := map[string]string{
"Multicast": "",
"LinkLocal": "MulticastInterfaces",
"BoxPub": "EncryptionPublicKey",
"BoxPriv": "EncryptionPrivateKey",
"SigPub": "SigningPublicKey",
"SigPriv": "SigningPrivateKey",
"AllowedBoxPubs": "AllowedEncryptionPublicKeys",
}
// Loop over the mappings aove and see if we have anything to fix.
for from, to := range changes {
if _, ok := dat[from]; ok {
if to == "" {
if !*normaliseconf {
log.Println("Warning: Deprecated config option", from, "- please remove")
}
} else {
if !*normaliseconf {
log.Println("Warning: Deprecated config option", from, "- please rename to", to)
}
// If the configuration file doesn't already contain a line with the
// new name then set it to the old value. This makes sure that we
// don't overwrite something that was put there intentionally.
if _, ok := dat[to]; !ok {
dat[to] = dat[from]
}
}
}
}
// Check to see if the peers are in a parsable format, if not then default
// them to the TCP scheme
if peers, ok := dat["Peers"].([]interface{}); ok {
for index, peer := range peers {
uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") {
continue
}
if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:]
}
(dat["Peers"].([]interface{}))[index] = "tcp://" + uri
}
}
// Now do the same with the interface peers
if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok {
for intf, peers := range interfacepeers {
for index, peer := range peers.([]interface{}) {
uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") {
continue
}
if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:]
}
((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri
}
}
}
// Overlay our newly mapped configuration onto the autoconf node config that
// we generated above.
if err = mapstructure.Decode(dat, &cfg); err != nil {
panic(err)
}
// Read the configuration from either stdin or from the filesystem
cfg = readConfig(useconf, useconffile, normaliseconf)
// If the -normaliseconf option was specified then remarshal the above
// configuration and print it back to stdout. This lets the user update
// their configuration file with newly mapped names (like above) or to
@ -214,15 +220,6 @@ func main() {
// Setup the Yggdrasil node itself. The node{} type includes a Core, so we
// don't need to create this manually.
n := node{}
// Check to see if any multicast interface expressions were provided in the
// config. If they were then set them now.
for _, ll := range cfg.MulticastInterfaces {
ifceExpr, err := regexp.Compile(ll)
if err != nil {
panic(err)
}
n.core.AddMulticastInterfaceExpr(ifceExpr)
}
// Now that we have a working configuration, we can now actually start
// Yggdrasil. This will start the router, switch, DHT node, TCP and UDP
// sockets, TUN/TAP adapter and multicast discovery port.
@ -235,27 +232,6 @@ func main() {
for _, pBoxStr := range cfg.AllowedEncryptionPublicKeys {
n.core.AddAllowedEncryptionPublicKey(pBoxStr)
}
// If any static peers were provided in the configuration above then we should
// configure them. The loop ensures that disconnected peers will eventually
// be reconnected with.
go func() {
if len(cfg.Peers) == 0 && len(cfg.InterfacePeers) == 0 {
return
}
for {
for _, peer := range cfg.Peers {
n.core.AddPeer(peer, "")
time.Sleep(time.Second)
}
for intf, intfpeers := range cfg.InterfacePeers {
for _, peer := range intfpeers {
n.core.AddPeer(peer, intf)
time.Sleep(time.Second)
}
}
time.Sleep(time.Minute)
}
}()
// The Stop function ensures that the TUN/TAP adapter is correctly shut down
// before the program exits.
defer func() {
@ -269,7 +245,9 @@ func main() {
logger.Printf("Your IPv6 subnet is %s", subnet.String())
// Catch interrupts from the operating system to exit gracefully.
c := make(chan os.Signal, 1)
r := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
signal.Notify(r, os.Interrupt, syscall.SIGHUP)
// Create a function to capture the service being stopped on Windows.
winTerminate := func() {
c <- os.Interrupt
@ -277,5 +255,18 @@ func main() {
minwinsvc.SetOnExit(winTerminate)
// Wait for the terminate/interrupt signal. Once a signal is received, the
// deferred Stop function above will run which will shut down TUN/TAP.
<-c
for {
select {
case _ = <-r:
if *useconffile != "" {
cfg = readConfig(useconf, useconffile, normaliseconf)
n.core.UpdateConfig(cfg)
} else {
logger.Println("Reloading config at runtime is only possible with -useconffile")
}
case _ = <-c:
goto exit
}
}
exit:
}

View File

@ -3,9 +3,10 @@ package yggdrasil
// Defines the minimum required struct members for an adapter type (this is
// now the base type for tunAdapter in tun.go)
type Adapter struct {
core *Core
send chan<- []byte
recv <-chan []byte
core *Core
send chan<- []byte
recv <-chan []byte
reconfigure chan chan error
}
// Initialises the adapter.
@ -13,4 +14,5 @@ func (adapter *Adapter) init(core *Core, send chan<- []byte, recv <-chan []byte)
adapter.core = core
adapter.send = send
adapter.recv = recv
adapter.reconfigure = make(chan chan error, 1)
}

View File

@ -22,10 +22,11 @@ import (
// TODO: Add authentication
type admin struct {
core *Core
listenaddr string
listener net.Listener
handlers []admin_handlerInfo
core *Core
reconfigure chan chan error
listenaddr string
listener net.Listener
handlers []admin_handlerInfo
}
type admin_info map[string]interface{}
@ -51,9 +52,25 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info)
}
// init runs the initial admin setup.
func (a *admin) init(c *Core, listenaddr string) {
func (a *admin) init(c *Core) {
a.core = c
a.listenaddr = listenaddr
a.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-a.reconfigure
a.core.configMutex.RLock()
if a.core.config.AdminListen != a.core.configOld.AdminListen {
a.listenaddr = a.core.config.AdminListen
a.close()
a.start()
}
a.core.configMutex.RUnlock()
e <- nil
}
}()
a.core.configMutex.RLock()
a.listenaddr = a.core.config.AdminListen
a.core.configMutex.RUnlock()
a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) {
handlers := make(map[string]interface{})
for _, handler := range a.handlers {
@ -331,7 +348,10 @@ func (a *admin) init(c *Core, listenaddr string) {
}
var box_pub_key, coords string
if in["box_pub_key"] == nil && in["coords"] == nil {
nodeinfo := []byte(a.core.nodeinfo.getNodeInfo())
var nodeinfo []byte
a.core.router.doAdmin(func() {
nodeinfo = []byte(a.core.router.nodeinfo.getNodeInfo())
})
var jsoninfo interface{}
if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil {
return admin_info{}, err
@ -842,7 +862,7 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) (
copy(key[:], keyBytes)
}
if !nocache {
if response, err := a.core.nodeinfo.getCachedNodeInfo(key); err == nil {
if response, err := a.core.router.nodeinfo.getCachedNodeInfo(key); err == nil {
return response, nil
}
}
@ -860,14 +880,14 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) (
}
response := make(chan *nodeinfoPayload, 1)
sendNodeInfoRequest := func() {
a.core.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) {
a.core.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) {
defer func() { recover() }()
select {
case response <- nodeinfo:
default:
}
})
a.core.nodeinfo.sendNodeInfo(key, coords, false)
a.core.router.nodeinfo.sendNodeInfo(key, coords, false)
}
a.core.router.doAdmin(sendNodeInfoRequest)
go func() {

View File

@ -18,6 +18,7 @@ import (
type cryptokey struct {
core *Core
enabled bool
reconfigure chan chan error
ipv4routes []cryptokey_route
ipv6routes []cryptokey_route
ipv4cache map[address.Address]cryptokey_route
@ -34,12 +35,75 @@ type cryptokey_route struct {
// Initialise crypto-key routing. This must be done before any other CKR calls.
func (c *cryptokey) init(core *Core) {
c.core = core
c.ipv4routes = make([]cryptokey_route, 0)
c.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-c.reconfigure
var err error
c.core.router.doAdmin(func() {
err = c.core.router.cryptokey.configure()
})
e <- err
}
}()
if err := c.configure(); err != nil {
c.core.log.Println("CKR configuration failed:", err)
}
}
// Configure the CKR routes - this must only ever be called from the router
// goroutine, e.g. through router.doAdmin
func (c *cryptokey) configure() error {
c.core.configMutex.RLock()
defer c.core.configMutex.RUnlock()
// Set enabled/disabled state
c.setEnabled(c.core.config.TunnelRouting.Enable)
// Clear out existing routes
c.ipv6routes = make([]cryptokey_route, 0)
c.ipv4routes = make([]cryptokey_route, 0)
// Add IPv6 routes
for ipv6, pubkey := range c.core.config.TunnelRouting.IPv6Destinations {
if err := c.addRoute(ipv6, pubkey); err != nil {
return err
}
}
// Add IPv4 routes
for ipv4, pubkey := range c.core.config.TunnelRouting.IPv4Destinations {
if err := c.addRoute(ipv4, pubkey); err != nil {
return err
}
}
// Clear out existing sources
c.ipv6sources = make([]net.IPNet, 0)
c.ipv4sources = make([]net.IPNet, 0)
// Add IPv6 sources
c.ipv6sources = make([]net.IPNet, 0)
for _, source := range c.core.config.TunnelRouting.IPv6Sources {
if err := c.addSourceSubnet(source); err != nil {
return err
}
}
// Add IPv4 sources
c.ipv4sources = make([]net.IPNet, 0)
for _, source := range c.core.config.TunnelRouting.IPv4Sources {
if err := c.addSourceSubnet(source); err != nil {
return err
}
}
// Wipe the caches
c.ipv4cache = make(map[address.Address]cryptokey_route, 0)
c.ipv6cache = make(map[address.Address]cryptokey_route, 0)
c.ipv4sources = make([]net.IPNet, 0)
c.ipv6sources = make([]net.IPNet, 0)
return nil
}
// Enable or disable crypto-key routing.

View File

@ -2,11 +2,11 @@ package yggdrasil
import (
"encoding/hex"
"fmt"
"io/ioutil"
"log"
"net"
"regexp"
"sync"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
@ -17,10 +17,20 @@ import (
var buildName string
var buildVersion string
type module interface {
init(*Core, *config.NodeConfig) error
start() error
}
// The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run.
type Core struct {
// This is the main data structure that holds everything else for a node
// We're going to keep our own copy of the provided config - that way we can
// guarantee that it will be covered by the mutex
config config.NodeConfig // Active config
configOld config.NodeConfig // Previous config
configMutex sync.RWMutex // Protects both config and configOld
boxPub crypto.BoxPubKey
boxPriv crypto.BoxPrivKey
sigPub crypto.SigPubKey
@ -33,17 +43,12 @@ type Core struct {
admin admin
searches searches
multicast multicast
nodeinfo nodeinfo
tcp tcpInterface
awdl awdl
log *log.Logger
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
}
func (c *Core) init(bpub *crypto.BoxPubKey,
bpriv *crypto.BoxPrivKey,
spub *crypto.SigPubKey,
spriv *crypto.SigPrivKey) {
func (c *Core) init() error {
// TODO separate init and start functions
// Init sets up structs
// Start launches goroutines that depend on structs being set up
@ -51,20 +56,104 @@ func (c *Core) init(bpub *crypto.BoxPubKey,
if c.log == nil {
c.log = log.New(ioutil.Discard, "", 0)
}
c.boxPub, c.boxPriv = *bpub, *bpriv
c.sigPub, c.sigPriv = *spub, *spriv
c.admin.core = c
boxPubHex, err := hex.DecodeString(c.config.EncryptionPublicKey)
if err != nil {
return err
}
boxPrivHex, err := hex.DecodeString(c.config.EncryptionPrivateKey)
if err != nil {
return err
}
sigPubHex, err := hex.DecodeString(c.config.SigningPublicKey)
if err != nil {
return err
}
sigPrivHex, err := hex.DecodeString(c.config.SigningPrivateKey)
if err != nil {
return err
}
copy(c.boxPub[:], boxPubHex)
copy(c.boxPriv[:], boxPrivHex)
copy(c.sigPub[:], sigPubHex)
copy(c.sigPriv[:], sigPrivHex)
c.admin.init(c)
c.searches.init(c)
c.dht.init(c)
c.sessions.init(c)
c.multicast.init(c)
c.peers.init(c)
c.router.init(c)
c.switchTable.init(c, c.sigPub) // TODO move before peers? before router?
c.switchTable.init(c) // TODO move before peers? before router?
return nil
}
// Get the current build name. This is usually injected if built from git,
// or returns "unknown" otherwise.
// If any static peers were provided in the configuration above then we should
// configure them. The loop ensures that disconnected peers will eventually
// be reconnected with.
func (c *Core) addPeerLoop() {
for {
// Get the peers from the config - these could change!
c.configMutex.RLock()
peers := c.config.Peers
interfacepeers := c.config.InterfacePeers
c.configMutex.RUnlock()
// Add peers from the Peers section
for _, peer := range peers {
c.AddPeer(peer, "")
time.Sleep(time.Second)
}
// Add peers from the InterfacePeers section
for intf, intfpeers := range interfacepeers {
for _, peer := range intfpeers {
c.AddPeer(peer, intf)
time.Sleep(time.Second)
}
}
// Sit for a while
time.Sleep(time.Minute)
}
}
// UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.configMutex.Lock()
c.configOld = c.config
c.config = *config
c.configMutex.Unlock()
components := []chan chan error{
c.admin.reconfigure,
c.searches.reconfigure,
c.dht.reconfigure,
c.sessions.reconfigure,
c.peers.reconfigure,
c.router.reconfigure,
c.router.tun.reconfigure,
c.router.cryptokey.reconfigure,
c.switchTable.reconfigure,
c.tcp.reconfigure,
c.multicast.reconfigure,
}
for _, component := range components {
response := make(chan error)
component <- response
if err := <-response; err != nil {
c.log.Println(err)
}
}
}
// GetBuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func GetBuildName() string {
if buildName == "" {
return "unknown"
@ -97,38 +186,14 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
c.log.Println("Starting up...")
var boxPub crypto.BoxPubKey
var boxPriv crypto.BoxPrivKey
var sigPub crypto.SigPubKey
var sigPriv crypto.SigPrivKey
boxPubHex, err := hex.DecodeString(nc.EncryptionPublicKey)
if err != nil {
return err
}
boxPrivHex, err := hex.DecodeString(nc.EncryptionPrivateKey)
if err != nil {
return err
}
sigPubHex, err := hex.DecodeString(nc.SigningPublicKey)
if err != nil {
return err
}
sigPrivHex, err := hex.DecodeString(nc.SigningPrivateKey)
if err != nil {
return err
}
copy(boxPub[:], boxPubHex)
copy(boxPriv[:], boxPrivHex)
copy(sigPub[:], sigPubHex)
copy(sigPriv[:], sigPrivHex)
c.configMutex.Lock()
c.config = *nc
c.configOld = c.config
c.configMutex.Unlock()
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)
c.admin.init(c, nc.AdminListen)
c.init()
c.nodeinfo.init(c)
c.nodeinfo.setNodeInfo(nc.NodeInfo, nc.NodeInfoPrivacy)
if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil {
if err := c.tcp.init(c); err != nil {
c.log.Println("Failed to start TCP interface")
return err
}
@ -161,31 +226,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err
}
c.router.cryptokey.setEnabled(nc.TunnelRouting.Enable)
if c.router.cryptokey.isEnabled() {
c.log.Println("Crypto-key routing enabled")
for ipv6, pubkey := range nc.TunnelRouting.IPv6Destinations {
if err := c.router.cryptokey.addRoute(ipv6, pubkey); err != nil {
panic(err)
}
}
for _, source := range nc.TunnelRouting.IPv6Sources {
if c.router.cryptokey.addSourceSubnet(source); err != nil {
panic(err)
}
}
for ipv4, pubkey := range nc.TunnelRouting.IPv4Destinations {
if err := c.router.cryptokey.addRoute(ipv4, pubkey); err != nil {
panic(err)
}
}
for _, source := range nc.TunnelRouting.IPv4Sources {
if c.router.cryptokey.addSourceSubnet(source); err != nil {
panic(err)
}
}
}
if err := c.admin.start(); err != nil {
c.log.Println("Failed to start admin socket")
return err
@ -196,12 +236,13 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err
}
ip := net.IP(c.router.addr[:]).String()
if err := c.router.tun.start(nc.IfName, nc.IfTAPMode, fmt.Sprintf("%s/%d", ip, 8*len(address.GetPrefix())-1), nc.IfMTU); err != nil {
if err := c.router.tun.start(); err != nil {
c.log.Println("Failed to start TUN/TAP")
return err
}
go c.addPeerLoop()
c.log.Println("Startup complete")
return nil
}
@ -250,12 +291,12 @@ func (c *Core) GetSubnet() *net.IPNet {
// Gets the nodeinfo.
func (c *Core) GetNodeInfo() nodeinfoPayload {
return c.nodeinfo.getNodeInfo()
return c.router.nodeinfo.getNodeInfo()
}
// Sets the nodeinfo.
func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
c.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
}
// Sets the output logger of the Yggdrasil node after startup. This may be
@ -270,13 +311,6 @@ func (c *Core) AddPeer(addr string, sintf string) error {
return c.admin.addPeer(addr, sintf)
}
// Adds an expression to select multicast interfaces for peer discovery. This
// should be done before calling Start. This function can be called multiple
// times to add multiple search expressions.
func (c *Core) AddMulticastInterfaceExpr(expr *regexp.Regexp) {
c.ifceExpr = append(c.ifceExpr, expr)
}
// Adds an allowed public key. This allow peerings to be restricted only to
// keys that you have selected.
func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error {

View File

@ -16,6 +16,7 @@ import "fmt"
import "net"
import "log"
import "regexp"
import "encoding/hex"
import _ "net/http/pprof"
import "net/http"
@ -23,6 +24,7 @@ import "runtime"
import "os"
import "github.com/yggdrasil-network/yggdrasil-go/src/address"
import "github.com/yggdrasil-network/yggdrasil-go/src/config"
import "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
import "github.com/yggdrasil-network/yggdrasil-go/src/defaults"
@ -52,7 +54,17 @@ func StartProfiler(log *log.Logger) error {
func (c *Core) Init() {
bpub, bpriv := crypto.NewBoxKeys()
spub, spriv := crypto.NewSigKeys()
c.init(bpub, bpriv, spub, spriv)
hbpub := hex.EncodeToString(bpub[:])
hbpriv := hex.EncodeToString(bpriv[:])
hspub := hex.EncodeToString(spub[:])
hspriv := hex.EncodeToString(spriv[:])
c.config = config.NodeConfig{
EncryptionPublicKey: hbpub,
EncryptionPrivateKey: hbpriv,
SigningPublicKey: hspub,
SigningPrivateKey: hspriv,
}
c.init( /*bpub, bpriv, spub, spriv*/ )
c.switchTable.start()
c.router.start()
}
@ -350,7 +362,7 @@ func (c *Core) DEBUG_init(bpub []byte,
bpriv []byte,
spub []byte,
spriv []byte) {
var boxPub crypto.BoxPubKey
/*var boxPub crypto.BoxPubKey
var boxPriv crypto.BoxPrivKey
var sigPub crypto.SigPubKey
var sigPriv crypto.SigPrivKey
@ -358,7 +370,18 @@ func (c *Core) DEBUG_init(bpub []byte,
copy(boxPriv[:], bpriv)
copy(sigPub[:], spub)
copy(sigPriv[:], spriv)
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)*/
hbpub := hex.EncodeToString(bpub[:])
hbpriv := hex.EncodeToString(bpriv[:])
hspub := hex.EncodeToString(spub[:])
hspriv := hex.EncodeToString(spriv[:])
c.config = config.NodeConfig{
EncryptionPublicKey: hbpub,
EncryptionPrivateKey: hbpriv,
SigningPublicKey: hspub,
SigningPrivateKey: hspriv,
}
c.init( /*bpub, bpriv, spub, spriv*/ )
if err := c.router.start(); err != nil {
panic(err)
@ -427,7 +450,8 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) {
//*
func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) {
if err := c.tcp.init(c, addrport, 0); err != nil {
c.config.Listen = addrport
if err := c.tcp.init(c /*, addrport, 0*/); err != nil {
c.log.Println("Failed to start TCP interface:", err)
panic(err)
}
@ -474,7 +498,8 @@ func (c *Core) DEBUG_addKCPConn(saddr string) {
func (c *Core) DEBUG_setupAndStartAdminInterface(addrport string) {
a := admin{}
a.init(c, addrport)
c.config.AdminListen = addrport
a.init(c /*, addrport*/)
c.admin = a
}
@ -492,7 +517,7 @@ func (c *Core) DEBUG_setLogger(log *log.Logger) {
}
func (c *Core) DEBUG_setIfceExpr(expr *regexp.Regexp) {
c.ifceExpr = append(c.ifceExpr, expr)
c.log.Println("DEBUG_setIfceExpr no longer implemented")
}
func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {

View File

@ -65,11 +65,12 @@ type dhtReqKey struct {
// The main DHT struct.
type dht struct {
core *Core
nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
core *Core
reconfigure chan chan error
nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar...
table map[crypto.NodeID]*dhtInfo
imp []*dhtInfo
@ -78,6 +79,13 @@ type dht struct {
// Initializes the DHT.
func (t *dht) init(c *Core) {
t.core = c
t.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-t.reconfigure
e <- nil
}
}()
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey]dht_callbackInfo)

View File

@ -4,30 +4,43 @@ import (
"context"
"fmt"
"net"
"regexp"
"sync"
"time"
"golang.org/x/net/ipv6"
)
type multicast struct {
core *Core
sock *ipv6.PacketConn
groupAddr string
core *Core
reconfigure chan chan error
sock *ipv6.PacketConn
groupAddr string
myAddr *net.TCPAddr
myAddrMutex sync.RWMutex
}
func (m *multicast) init(core *Core) {
m.core = core
m.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-m.reconfigure
m.myAddrMutex.Lock()
m.myAddr = m.core.tcp.getAddr()
m.myAddrMutex.Unlock()
e <- nil
}
}()
m.groupAddr = "[ff02::114]:9001"
// Check if we've been given any expressions
if len(m.core.ifceExpr) == 0 {
return
if count := len(m.interfaces()); count != 0 {
m.core.log.Println("Found", count, "multicast interface(s)")
}
// Ask the system for network interfaces
m.core.log.Println("Found", len(m.interfaces()), "multicast interface(s)")
}
func (m *multicast) start() error {
if len(m.core.ifceExpr) == 0 {
if len(m.interfaces()) == 0 {
m.core.log.Println("Multicast discovery is disabled")
} else {
m.core.log.Println("Multicast discovery is enabled")
@ -55,6 +68,10 @@ func (m *multicast) start() error {
}
func (m *multicast) interfaces() []net.Interface {
// Get interface expressions from config
m.core.configMutex.RLock()
exprs := m.core.config.MulticastInterfaces
m.core.configMutex.RUnlock()
// Ask the system for network interfaces
var interfaces []net.Interface
allifaces, err := net.Interfaces()
@ -75,8 +92,12 @@ func (m *multicast) interfaces() []net.Interface {
// Ignore point-to-point interfaces
continue
}
for _, expr := range m.core.ifceExpr {
if expr.MatchString(iface.Name) {
for _, expr := range exprs {
e, err := regexp.Compile(expr)
if err != nil {
panic(err)
}
if e.MatchString(iface.Name) {
interfaces = append(interfaces, iface)
}
}
@ -85,13 +106,14 @@ func (m *multicast) interfaces() []net.Interface {
}
func (m *multicast) announce() {
var anAddr net.TCPAddr
m.myAddrMutex.Lock()
m.myAddr = m.core.tcp.getAddr()
m.myAddrMutex.Unlock()
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
if err != nil {
panic(err)
}
var anAddr net.TCPAddr
myAddr := m.core.tcp.getAddr()
anAddr.Port = myAddr.Port
destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
if err != nil {
panic(err)
@ -103,6 +125,9 @@ func (m *multicast) announce() {
if err != nil {
panic(err)
}
m.myAddrMutex.RLock()
anAddr.Port = m.myAddr.Port
m.myAddrMutex.RUnlock()
for _, addr := range addrs {
addrIP, _, _ := net.ParseCIDR(addr.String())
if addrIP.To4() != nil {

View File

@ -170,7 +170,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(),
IsResponse: isResponse,
NodeInfo: m.core.nodeinfo.getNodeInfo(),
NodeInfo: m.getNodeInfo(),
}
bs := nodeinfo.encode()
shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key)

View File

@ -19,6 +19,7 @@ import (
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
type peers struct {
core *Core
reconfigure chan chan error
mutex sync.Mutex // Synchronize writes to atomic
ports atomic.Value //map[switchPort]*peer, use CoW semantics
authMutex sync.RWMutex
@ -31,6 +32,13 @@ func (ps *peers) init(c *Core) {
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c
ps.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-ps.reconfigure
e <- nil
}
}()
ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{})
}

View File

@ -37,19 +37,21 @@ import (
// The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer.
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
type router struct {
core *Core
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
toRecv chan router_recvPacket // packets to handle via recvPacket()
tun tunAdapter // TUN/TAP adapter
adapters []Adapter // Other adapters
recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
core *Core
reconfigure chan chan error
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
toRecv chan router_recvPacket // packets to handle via recvPacket()
tun tunAdapter // TUN/TAP adapter
adapters []Adapter // Other adapters
recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
nodeinfo nodeinfo
}
// Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun.
@ -61,6 +63,7 @@ type router_recvPacket struct {
// Initializes the router struct, which includes setting up channels to/from the tun/tap.
func (r *router) init(core *Core) {
r.core = core
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this...
@ -83,6 +86,9 @@ func (r *router) init(core *Core) {
r.send = send
r.reset = make(chan struct{}, 1)
r.admin = make(chan func(), 32)
r.core.configMutex.RLock()
r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy)
r.core.configMutex.RUnlock()
r.cryptokey.init(r.core)
r.tun.init(r.core, send, recv)
}
@ -124,6 +130,10 @@ func (r *router) mainLoop() {
}
case f := <-r.admin:
f()
case e := <-r.reconfigure:
r.core.configMutex.RLock()
e <- r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy)
r.core.configMutex.RUnlock()
}
}
}
@ -463,7 +473,7 @@ func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
return
}
req.SendPermPub = *fromKey
r.core.nodeinfo.handleNodeInfo(&req)
r.nodeinfo.handleNodeInfo(&req)
}
// Passed a function to call.

View File

@ -42,13 +42,21 @@ type searchInfo struct {
// This stores a map of active searches.
type searches struct {
core *Core
searches map[crypto.NodeID]*searchInfo
core *Core
reconfigure chan chan error
searches map[crypto.NodeID]*searchInfo
}
// Intializes the searches struct.
func (s *searches) init(core *Core) {
s.core = core
s.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-s.reconfigure
e <- nil
}
}()
s.searches = make(map[crypto.NodeID]*searchInfo)
}

View File

@ -7,6 +7,7 @@ package yggdrasil
import (
"bytes"
"encoding/hex"
"sync"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
@ -18,6 +19,7 @@ import (
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct {
core *Core
reconfigure chan chan error
theirAddr address.Address
theirSubnet address.Subnet
theirPermPub crypto.BoxPubKey
@ -101,6 +103,7 @@ func (s *sessionInfo) timedout() bool {
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
type sessions struct {
core *Core
reconfigure chan chan error
lastCleanup time.Time
// Maps known permanent keys to their shared key, used by DHT a lot
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey
@ -113,6 +116,7 @@ type sessions struct {
addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey
// Options from the session firewall
sessionFirewallMutex sync.RWMutex
sessionFirewallEnabled bool
sessionFirewallAllowsDirect bool
sessionFirewallAllowsRemote bool
@ -124,6 +128,24 @@ type sessions struct {
// Initializes the session struct.
func (ss *sessions) init(core *Core) {
ss.core = core
ss.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-ss.reconfigure
responses := make(map[crypto.Handle]chan error)
for index, session := range ss.sinfos {
responses[index] = make(chan error)
session.reconfigure <- responses[index]
}
for _, response := range responses {
if err := <-response; err != nil {
e <- err
continue
}
}
e <- nil
}
}()
ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
ss.sinfos = make(map[crypto.Handle]*sessionInfo)
ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle)
@ -135,12 +157,16 @@ func (ss *sessions) init(core *Core) {
// Enable or disable the session firewall
func (ss *sessions) setSessionFirewallState(enabled bool) {
ss.sessionFirewallMutex.Lock()
defer ss.sessionFirewallMutex.Unlock()
ss.sessionFirewallEnabled = enabled
}
// Set the session firewall defaults (first parameter is whether to allow
// sessions from direct peers, second is whether to allow from remote nodes).
func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote bool, alwaysAllowsOutbound bool) {
ss.sessionFirewallMutex.Lock()
defer ss.sessionFirewallMutex.Unlock()
ss.sessionFirewallAllowsDirect = allowsDirect
ss.sessionFirewallAllowsRemote = allowsRemote
ss.sessionFirewallAlwaysAllowsOutbound = alwaysAllowsOutbound
@ -148,17 +174,24 @@ func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote b
// Set the session firewall whitelist - nodes always allowed to open sessions.
func (ss *sessions) setSessionFirewallWhitelist(whitelist []string) {
ss.sessionFirewallMutex.Lock()
defer ss.sessionFirewallMutex.Unlock()
ss.sessionFirewallWhitelist = whitelist
}
// Set the session firewall blacklist - nodes never allowed to open sessions.
func (ss *sessions) setSessionFirewallBlacklist(blacklist []string) {
ss.sessionFirewallMutex.Lock()
defer ss.sessionFirewallMutex.Unlock()
ss.sessionFirewallBlacklist = blacklist
}
// Determines whether the session with a given publickey is allowed based on
// session firewall rules.
func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool {
ss.sessionFirewallMutex.RLock()
defer ss.sessionFirewallMutex.RUnlock()
// Allow by default if the session firewall is disabled
if !ss.sessionFirewallEnabled {
return true
@ -264,13 +297,12 @@ func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool)
// Creates a new session and lazily cleans up old/timedout existing sessions.
// This includse initializing session info to sane defaults (e.g. lowest supported MTU).
func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
if ss.sessionFirewallEnabled {
if !ss.isSessionAllowed(theirPermKey, true) {
return nil
}
if !ss.isSessionAllowed(theirPermKey, true) {
return nil
}
sinfo := sessionInfo{}
sinfo.core = ss.core
sinfo.reconfigure = make(chan chan error, 1)
sinfo.theirPermPub = *theirPermKey
pub, priv := crypto.NewBoxKeys()
sinfo.mySesPub = *pub
@ -442,11 +474,14 @@ func (ss *sessions) handlePing(ping *sessionPing) {
// Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
// Check the session firewall
ss.sessionFirewallMutex.RLock()
if !isIn && ss.sessionFirewallEnabled {
if !ss.isSessionAllowed(&ping.SendPermPub, false) {
ss.sessionFirewallMutex.RUnlock()
return
}
}
ss.sessionFirewallMutex.RUnlock()
if !isIn || sinfo.timedout() {
if isIn {
sinfo.close()
@ -539,6 +574,8 @@ func (sinfo *sessionInfo) doWorker() {
} else {
return
}
case e := <-sinfo.reconfigure:
e <- nil
}
}
}

View File

@ -162,6 +162,7 @@ type switchData struct {
// All the information stored by the switch.
type switchTable struct {
core *Core
reconfigure chan chan error
key crypto.SigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
@ -181,11 +182,14 @@ type switchTable struct {
const SwitchQueueTotalMinSize = 4 * 1024 * 1024
// Initializes the switchTable struct.
func (t *switchTable) init(core *Core, key crypto.SigPubKey) {
func (t *switchTable) init(core *Core) {
now := time.Now()
t.core = core
t.key = key
locator := switchLocator{root: key, tstamp: now.Unix()}
t.reconfigure = make(chan chan error, 1)
t.core.configMutex.RLock()
t.key = t.core.sigPub
t.core.configMutex.RUnlock()
locator := switchLocator{root: t.key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
@ -808,6 +812,8 @@ func (t *switchTable) doWorker() {
}
case f := <-t.admin:
f()
case e := <-t.reconfigure:
e <- nil
}
}
}

View File

@ -39,8 +39,11 @@ const tcp_ping_interval = (default_tcp_timeout * 2 / 3)
// The TCP listener and information about active TCP connections, to avoid duplication.
type tcpInterface struct {
core *Core
reconfigure chan chan error
serv net.Listener
serv_stop chan bool
tcp_timeout time.Duration
tcp_addr string
mutex sync.Mutex // Protecting the below
calls map[string]struct{}
conns map[tcpInfo](chan struct{})
@ -81,10 +84,37 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) {
}
// Initializes the struct.
func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err error) {
func (iface *tcpInterface) init(core *Core) (err error) {
iface.core = core
iface.serv_stop = make(chan bool, 1)
iface.reconfigure = make(chan chan error, 1)
go func() {
for {
e := <-iface.reconfigure
iface.core.configMutex.RLock()
updated := iface.core.config.Listen != iface.core.configOld.Listen
iface.core.configMutex.RUnlock()
if updated {
iface.serv_stop <- true
iface.serv.Close()
e <- iface.listen()
} else {
e <- nil
}
}
}()
return iface.listen()
}
func (iface *tcpInterface) listen() error {
var err error
iface.core.configMutex.RLock()
iface.tcp_addr = iface.core.config.Listen
iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond
iface.core.configMutex.RUnlock()
iface.tcp_timeout = time.Duration(readTimeout) * time.Millisecond
if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout {
iface.tcp_timeout = default_tcp_timeout
}
@ -93,11 +123,12 @@ func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err
lc := net.ListenConfig{
Control: iface.tcpContext,
}
iface.serv, err = lc.Listen(ctx, "tcp", addr)
iface.serv, err = lc.Listen(ctx, "tcp", iface.tcp_addr)
if err == nil {
iface.calls = make(map[string]struct{})
iface.conns = make(map[tcpInfo](chan struct{}))
go iface.listener()
return nil
}
return err
@ -110,12 +141,38 @@ func (iface *tcpInterface) listener() {
for {
sock, err := iface.serv.Accept()
if err != nil {
panic(err)
iface.core.log.Println("Failed to accept connection:", err)
return
}
select {
case <-iface.serv_stop:
iface.core.log.Println("Stopping listener")
return
default:
if err != nil {
panic(err)
}
go iface.handler(sock, true)
}
go iface.handler(sock, true)
}
}
// Checks if we already have a connection to this node
func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool {
iface.mutex.Lock()
defer iface.mutex.Unlock()
_, isIn := iface.conns[info]
return isIn
}
// Checks if we already are calling this address
func (iface *tcpInterface) isAlreadyCalling(saddr string) bool {
iface.mutex.Lock()
defer iface.mutex.Unlock()
_, isIn := iface.calls[saddr]
return isIn
}
// Checks if a connection already exists.
// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address.
// If the dial is successful, it launches the handler.
@ -127,25 +184,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
if sintf != "" {
callname = fmt.Sprintf("%s/%s", saddr, sintf)
}
quit := false
iface.mutex.Lock()
if _, isIn := iface.calls[callname]; isIn {
quit = true
} else {
iface.calls[callname] = struct{}{}
defer func() {
// Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock()
delete(iface.calls, callname)
iface.mutex.Unlock()
}()
}
iface.mutex.Unlock()
if quit {
if iface.isAlreadyCalling(saddr) {
return
}
iface.calls[callname] = struct{}{}
defer func() {
// Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock()
delete(iface.calls, callname)
iface.mutex.Unlock()
}()
var conn net.Conn
var err error
if socksaddr != nil {
@ -252,9 +302,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
// TODO? Block forever to prevent future connection attempts? suppress future messages about the same node?
return
}
remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String())
localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String())
if e1 != nil || e2 != nil {
return
}
info := tcpInfo{ // used as a map key, so don't include ephemeral link key
box: meta.box,
sig: meta.sig,
box: meta.box,
sig: meta.sig,
localAddr: localAddr,
remoteAddr: remoteAddr,
}
if iface.isAlreadyConnected(info) {
return
}
// Quit the parent call if this is a connection to ourself
equiv := func(k1, k2 []byte) bool {
@ -265,14 +325,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}
return true
}
if equiv(info.box[:], iface.core.boxPub[:]) {
if equiv(meta.box[:], iface.core.boxPub[:]) {
return
}
if equiv(info.sig[:], iface.core.sigPub[:]) {
if equiv(meta.sig[:], iface.core.sigPub[:]) {
return
}
// Check if we're authorized to connect to this key / IP
if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) {
if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
// Allow unauthorized peers if they're link-local
raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
raddr := net.ParseIP(raddrStr)
@ -281,15 +341,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}
}
// Check if we already have a connection to this node, close and block if yes
info.localAddr, _, _ = net.SplitHostPort(sock.LocalAddr().String())
info.remoteAddr, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
iface.mutex.Lock()
if blockChan, isIn := iface.conns[info]; isIn {
/*if blockChan, isIn := iface.conns[info]; isIn {
iface.mutex.Unlock()
sock.Close()
<-blockChan
return
}
}*/
blockChan := make(chan struct{})
iface.conns[info] = blockChan
iface.mutex.Unlock()
@ -301,7 +359,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}()
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String())
p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String())
p.linkOut = make(chan []byte, 1)
in := func(bs []byte) {
p.handlePacket(bs)
@ -363,16 +421,16 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}()
us, _, _ := net.SplitHostPort(sock.LocalAddr().String())
them, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
themNodeID := crypto.GetNodeID(&info.box)
themNodeID := crypto.GetNodeID(&meta.box)
themAddr := address.AddrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them)
iface.core.log.Println("Connected:", themString, "source", us)
iface.core.log.Printf("Connected: %s, source: %s", themString, us)
err = iface.reader(sock, in) // In this goroutine, because of defers
if err == nil {
iface.core.log.Println("Disconnected:", themString, "source", us)
iface.core.log.Printf("Disconnected: %s, source: %s", themString, us)
} else {
iface.core.log.Println("Disconnected:", themString, "source", us, "with error:", err)
iface.core.log.Printf("Disconnected: %s, source: %s, error: %s", themString, us, err)
}
return
}

View File

@ -5,6 +5,8 @@ package yggdrasil
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"time"
@ -42,11 +44,33 @@ func getSupportedMTU(mtu int) int {
func (tun *tunAdapter) init(core *Core, send chan<- []byte, recv <-chan []byte) {
tun.Adapter.init(core, send, recv)
tun.icmpv6.init(tun)
go func() {
for {
e := <-tun.reconfigure
tun.core.configMutex.RLock()
updated := tun.core.config.IfName != tun.core.configOld.IfName ||
tun.core.config.IfTAPMode != tun.core.configOld.IfTAPMode ||
tun.core.config.IfMTU != tun.core.configOld.IfMTU
tun.core.configMutex.RUnlock()
if updated {
tun.core.log.Println("Reconfiguring TUN/TAP is not supported yet")
e <- nil
} else {
e <- nil
}
}
}()
}
// Starts the setup process for the TUN/TAP adapter, and if successful, starts
// the read/write goroutines to handle packets on that interface.
func (tun *tunAdapter) start(ifname string, iftapmode bool, addr string, mtu int) error {
func (tun *tunAdapter) start() error {
tun.core.configMutex.RLock()
ifname := tun.core.config.IfName
iftapmode := tun.core.config.IfTAPMode
addr := fmt.Sprintf("%s/%d", net.IP(tun.core.router.addr[:]).String(), 8*len(address.GetPrefix())-1)
mtu := tun.core.config.IfMTU
tun.core.configMutex.RUnlock()
if ifname != "none" {
if err := tun.setup(ifname, iftapmode, addr, mtu); err != nil {
return err