Refactor multicast setup (isolated config, etc)

This commit is contained in:
Neil Alexander 2022-09-03 11:42:05 +01:00
parent dad0b10dfe
commit 493208fb37
10 changed files with 215 additions and 150 deletions

View File

@ -12,6 +12,7 @@ import (
"net"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
@ -325,40 +326,59 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
default:
}
// Setup the Yggdrasil node itself. The node{} type includes a Core, so we
// don't need to create this manually.
sk, err := hex.DecodeString(cfg.PrivateKey)
if err != nil {
panic(err)
}
options := []core.SetupOption{
core.IfName(cfg.IfName),
core.IfMTU(cfg.IfMTU),
}
for _, peer := range cfg.Peers {
options = append(options, core.Peer{URI: peer})
}
for intf, peers := range cfg.InterfacePeers {
for _, peer := range peers {
options = append(options, core.Peer{URI: peer, SourceInterface: intf})
}
}
for _, allowed := range cfg.AllowedPublicKeys {
k, err := hex.DecodeString(allowed)
n := &node{config: cfg}
// Setup the Yggdrasil node itself.
{
sk, err := hex.DecodeString(cfg.PrivateKey)
if err != nil {
panic(err)
}
options := []core.SetupOption{
core.IfName(cfg.IfName),
core.IfMTU(cfg.IfMTU),
}
for _, peer := range cfg.Peers {
options = append(options, core.Peer{URI: peer})
}
for intf, peers := range cfg.InterfacePeers {
for _, peer := range peers {
options = append(options, core.Peer{URI: peer, SourceInterface: intf})
}
}
for _, allowed := range cfg.AllowedPublicKeys {
k, err := hex.DecodeString(allowed)
if err != nil {
panic(err)
}
options = append(options, core.AllowedPublicKey(k[:]))
}
n.core, err = core.New(sk[:], logger, options...)
if err != nil {
panic(err)
}
options = append(options, core.AllowedPublicKey(k[:]))
}
n := node{config: cfg}
n.core, err = core.New(sk[:], options...)
if err != nil {
panic(err)
// Setup the multicast module.
{
options := []multicast.SetupOption{}
for _, intf := range cfg.MulticastInterfaces {
options = append(options, multicast.MulticastInterface{
Regex: regexp.MustCompile(intf.Regex),
Beacon: intf.Beacon,
Listen: intf.Listen,
Port: intf.Port,
})
}
n.multicast, err = multicast.New(n.core, logger, options...)
if err != nil {
panic(err)
}
}
// Register the session firewall gatekeeper function
// Allocate our modules
n.admin = &admin.AdminSocket{}
n.multicast = &multicast.Multicast{}
n.tuntap = &tuntap.TunAdapter{}
// Start the admin socket
if err := n.admin.Init(n.core, cfg, logger, nil); err != nil {
@ -368,10 +388,8 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
}
n.admin.SetupAdminHandlers()
// Start the multicast interface
if err := n.multicast.Init(n.core, cfg, logger, nil); err != nil {
if n.multicast, err = multicast.New(n.core, logger, nil); err != nil {
logger.Errorln("An error occurred initialising multicast:", err)
} else if err := n.multicast.Start(); err != nil {
logger.Errorln("An error occurred starting multicast:", err)
}
n.multicast.SetupAdminHandlers(n.admin)
// Start the TUN/TAP interface

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net"
"regexp"
"github.com/gologme/log"
@ -28,7 +29,7 @@ type Yggdrasil struct {
core *core.Core
iprwc *ipv6rwc.ReadWriteCloser
config *config.NodeConfig
multicast multicast.Multicast
multicast *multicast.Multicast
log MobileLogger
}
@ -49,46 +50,60 @@ func (m *Yggdrasil) StartJSON(configjson []byte) error {
return err
}
// Setup the Yggdrasil node itself.
sk, err := hex.DecodeString(m.config.PrivateKey)
if err != nil {
panic(err)
}
options := []core.SetupOption{
core.IfName("none"),
core.IfMTU(m.config.IfMTU),
}
for _, peer := range m.config.Peers {
options = append(options, core.Peer{URI: peer})
}
for intf, peers := range m.config.InterfacePeers {
for _, peer := range peers {
options = append(options, core.Peer{URI: peer, SourceInterface: intf})
}
}
for _, allowed := range m.config.AllowedPublicKeys {
k, err := hex.DecodeString(allowed)
{
sk, err := hex.DecodeString(m.config.PrivateKey)
if err != nil {
panic(err)
}
options := []core.SetupOption{
core.IfName("none"),
core.IfMTU(m.config.IfMTU),
}
for _, peer := range m.config.Peers {
options = append(options, core.Peer{URI: peer})
}
for intf, peers := range m.config.InterfacePeers {
for _, peer := range peers {
options = append(options, core.Peer{URI: peer, SourceInterface: intf})
}
}
for _, allowed := range m.config.AllowedPublicKeys {
k, err := hex.DecodeString(allowed)
if err != nil {
panic(err)
}
options = append(options, core.AllowedPublicKey(k[:]))
}
m.core, err = core.New(sk[:], logger, options...)
if err != nil {
panic(err)
}
options = append(options, core.AllowedPublicKey(k[:]))
}
m.core, err = core.New(sk[:], options...)
if err != nil {
panic(err)
// Setup the multicast module.
if len(m.config.MulticastInterfaces) > 0 {
var err error
options := []multicast.SetupOption{}
for _, intf := range m.config.MulticastInterfaces {
options = append(options, multicast.MulticastInterface{
Regex: regexp.MustCompile(intf.Regex),
Beacon: intf.Beacon,
Listen: intf.Listen,
Port: intf.Port,
})
}
m.multicast, err = multicast.New(m.core, logger, options...)
if err != nil {
panic(err)
}
}
mtu := m.config.IfMTU
m.iprwc = ipv6rwc.NewReadWriteCloser(m.core)
if m.iprwc.MaxMTU() < mtu {
mtu = m.iprwc.MaxMTU()
}
m.iprwc.SetMTU(mtu)
if len(m.config.MulticastInterfaces) > 0 {
if err := m.multicast.Init(m.core, m.config, logger, nil); err != nil {
logger.Errorln("An error occurred initialising multicast:", err)
} else if err := m.multicast.Start(); err != nil {
logger.Errorln("An error occurred starting multicast:", err)
}
}
return nil
}

View File

@ -15,8 +15,8 @@ import (
//"sort"
//"time"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
//"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
//"github.com/Arceliar/phony"
)
@ -159,7 +159,7 @@ func (c *Core) Subnet() net.IPNet {
// may be useful if you want to redirect the output later. Note that this
// expects a Logger from the github.com/gologme/log package and not from Go's
// built-in log package.
func (c *Core) SetLogger(log *log.Logger) {
func (c *Core) SetLogger(log util.Logger) {
c.log = log
}
@ -239,8 +239,10 @@ func (c *Core) RemovePeer(addr string, sintf string) error {
// CallPeer calls a peer once. This should be specified in the peer URI format,
// e.g.:
// tcp://a.b.c.d:e
// socks://a.b.c.d:e/f.g.h.i:j
//
// tcp://a.b.c.d:e
// socks://a.b.c.d:e/f.g.h.i:j
//
// This does not add the peer to the peer list, so if the connection drops, the
// peer will not be called again automatically.
func (c *Core) CallPeer(u *url.URL, sintf string) error {

View File

@ -7,7 +7,6 @@ import (
"io"
"net"
"net/url"
"os"
"time"
iwe "github.com/Arceliar/ironwood/encrypted"
@ -15,6 +14,7 @@ import (
"github.com/Arceliar/phony"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
//"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
@ -33,7 +33,7 @@ type Core struct {
public ed25519.PublicKey
links links
proto protoHandler
log *log.Logger
log util.Logger
addPeerTimer *time.Timer
config struct {
_peers map[Peer]struct{} // configurable after startup
@ -46,14 +46,14 @@ type Core struct {
}
}
func New(secret ed25519.PrivateKey, opts ...SetupOption) (*Core, error) {
func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*Core, error) {
if len(secret) != ed25519.PrivateKeySize {
return nil, fmt.Errorf("private key is incorrect length")
}
c := &Core{
secret: secret,
public: secret.Public().(ed25519.PublicKey),
log: log.New(os.Stdout, "", 0), // TODO: not this
log: logger,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
var err error

View File

@ -36,10 +36,11 @@ func CreateAndConnectTwo(t testing.TB, verbose bool) (nodeA *Core, nodeB *Core)
if _, skB, err = ed25519.GenerateKey(nil); err != nil {
t.Fatal(err)
}
if nodeA, err = New(skA, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil {
logger := GetLoggerWithPrefix("", false)
if nodeA, err = New(skA, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil {
t.Fatal(err)
}
if nodeB, err = New(skB, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil {
if nodeB, err = New(skB, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil {
t.Fatal(err)
}

View File

@ -30,7 +30,6 @@ type SetupOption interface {
}
type ListenAddress string
type AdminListenAddress string
type Peer struct {
URI string
SourceInterface string
@ -41,11 +40,10 @@ type IfName string
type IfMTU uint16
type AllowedPublicKey ed25519.PublicKey
func (a ListenAddress) isSetupOption() {}
func (a AdminListenAddress) isSetupOption() {}
func (a Peer) isSetupOption() {}
func (a NodeInfo) isSetupOption() {}
func (a NodeInfoPrivacy) isSetupOption() {}
func (a IfName) isSetupOption() {}
func (a IfMTU) isSetupOption() {}
func (a AllowedPublicKey) isSetupOption() {}
func (a ListenAddress) isSetupOption() {}
func (a Peer) isSetupOption() {}
func (a NodeInfo) isSetupOption() {}
func (a NodeInfoPrivacy) isSetupOption() {}
func (a IfName) isSetupOption() {}
func (a IfMTU) isSetupOption() {}
func (a AllowedPublicKey) isSetupOption() {}

View File

@ -9,13 +9,11 @@ import (
"fmt"
"net"
"net/url"
"regexp"
"time"
"github.com/Arceliar/phony"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
"golang.org/x/net/ipv6"
)
@ -27,13 +25,15 @@ import (
type Multicast struct {
phony.Inbox
core *core.Core
config *config.NodeConfig
log *log.Logger
sock *ipv6.PacketConn
groupAddr string
listeners map[string]*listenerInfo
isOpen bool
_interfaces map[string]interfaceInfo
_isOpen bool
_listeners map[string]*listenerInfo
_interfaces map[string]*interfaceInfo
config struct {
_groupAddr GroupAddress
_interfaces map[MulticastInterface]struct{}
}
}
type interfaceInfo struct {
@ -51,40 +51,38 @@ type listenerInfo struct {
port uint16
}
// Init prepares the multicast interface for use.
func (m *Multicast) Init(core *core.Core, nc *config.NodeConfig, log *log.Logger, options interface{}) error {
m.core = core
m.config = nc
m.log = log
m.listeners = make(map[string]*listenerInfo)
m._interfaces = make(map[string]interfaceInfo)
m.groupAddr = "[ff02::114]:9001"
return nil
}
// Start starts the multicast interface. This launches goroutines which will
// listen for multicast beacons from other hosts and will advertise multicast
// beacons out to the network.
func (m *Multicast) Start() error {
func New(core *core.Core, log *log.Logger, opts ...SetupOption) (*Multicast, error) {
m := &Multicast{
core: core,
log: log,
_listeners: make(map[string]*listenerInfo),
_interfaces: make(map[string]*interfaceInfo),
}
m.config._interfaces = map[MulticastInterface]struct{}{}
m.config._groupAddr = GroupAddress("[ff02::114]:9001")
for _, opt := range opts {
m._applyOption(opt)
}
var err error
phony.Block(m, func() {
err = m._start()
})
m.log.Debugln("Started multicast module")
return err
return m, err
}
func (m *Multicast) _start() error {
if m.isOpen {
if m._isOpen {
return fmt.Errorf("multicast module is already started")
}
m.config.RLock()
defer m.config.RUnlock()
if len(m.config.MulticastInterfaces) == 0 {
if len(m.config._interfaces) == 0 {
return nil
}
m.log.Infoln("Starting multicast module")
addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
defer m.log.Infoln("Started multicast module")
addr, err := net.ResolveUDPAddr("udp", string(m.config._groupAddr))
if err != nil {
return err
}
@ -101,7 +99,7 @@ func (m *Multicast) _start() error {
// Windows can't set this flag, so we need to handle it in other ways
}
m.isOpen = true
m._isOpen = true
go m.listen()
m.Act(nil, m._multicastStarted)
m.Act(nil, m._announce)
@ -113,7 +111,7 @@ func (m *Multicast) _start() error {
func (m *Multicast) IsStarted() bool {
var isOpen bool
phony.Block(m, func() {
isOpen = m.isOpen
isOpen = m._isOpen
})
return isOpen
}
@ -130,7 +128,7 @@ func (m *Multicast) Stop() error {
func (m *Multicast) _stop() error {
m.log.Infoln("Stopping multicast module")
m.isOpen = false
m._isOpen = false
if m.sock != nil {
m.sock.Close()
}
@ -138,7 +136,7 @@ func (m *Multicast) _stop() error {
}
func (m *Multicast) _updateInterfaces() {
interfaces := m.getAllowedInterfaces()
interfaces := m._getAllowedInterfaces()
for name, info := range interfaces {
addrs, err := info.iface.Addrs()
if err != nil {
@ -163,10 +161,8 @@ func (m *Multicast) Interfaces() map[string]net.Interface {
}
// getAllowedInterfaces returns the currently known/enabled multicast interfaces.
func (m *Multicast) getAllowedInterfaces() map[string]interfaceInfo {
interfaces := make(map[string]interfaceInfo)
// Get interface expressions from config
ifcfgs := m.config.MulticastInterfaces
func (m *Multicast) _getAllowedInterfaces() map[string]*interfaceInfo {
interfaces := make(map[string]*interfaceInfo)
// Ask the system for network interfaces
allifaces, err := net.Interfaces()
if err != nil {
@ -176,62 +172,55 @@ func (m *Multicast) getAllowedInterfaces() map[string]interfaceInfo {
}
// Work out which interfaces to announce on
for _, iface := range allifaces {
if iface.Flags&net.FlagUp == 0 {
// Ignore interfaces that are down
continue
switch {
case iface.Flags&net.FlagUp == 0:
continue // Ignore interfaces that are down
case iface.Flags&net.FlagMulticast == 0:
continue // Ignore non-multicast interfaces
case iface.Flags&net.FlagPointToPoint != 0:
continue // Ignore point-to-point interfaces
}
if iface.Flags&net.FlagMulticast == 0 {
// Ignore non-multicast interfaces
continue
}
if iface.Flags&net.FlagPointToPoint != 0 {
// Ignore point-to-point interfaces
continue
}
for _, ifcfg := range ifcfgs {
for ifcfg := range m.config._interfaces {
// Compile each regular expression
e, err := regexp.Compile(ifcfg.Regex)
if err != nil {
panic(err)
}
// Does the interface match the regular expression? Store it if so
if e.MatchString(iface.Name) {
if ifcfg.Beacon || ifcfg.Listen {
info := interfaceInfo{
iface: iface,
beacon: ifcfg.Beacon,
listen: ifcfg.Listen,
port: ifcfg.Port,
}
interfaces[iface.Name] = info
}
break
if !ifcfg.Beacon && !ifcfg.Listen {
continue
}
if !ifcfg.Regex.MatchString(iface.Name) {
continue
}
interfaces[iface.Name] = &interfaceInfo{
iface: iface,
beacon: ifcfg.Beacon,
listen: ifcfg.Listen,
port: ifcfg.Port,
}
break
}
}
return interfaces
}
func (m *Multicast) _announce() {
if !m.isOpen {
if !m._isOpen {
return
}
m._updateInterfaces()
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
groupAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr))
if err != nil {
panic(err)
}
destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
destAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr))
if err != nil {
panic(err)
}
// There might be interfaces that we configured listeners for but are no
// longer up - if that's the case then we should stop the listeners
for name, info := range m.listeners {
for name, info := range m._listeners {
// Prepare our stop function!
stop := func() {
info.listener.Stop()
delete(m.listeners, name)
delete(m._listeners, name)
m.log.Debugln("No longer multicasting on", name)
}
// If the interface is no longer visible on the system then stop the
@ -290,7 +279,7 @@ func (m *Multicast) _announce() {
}
// Try and see if we already have a TCP listener for this interface
var linfo *listenerInfo
if nfo, ok := m.listeners[iface.Name]; !ok || nfo.listener.Listener == nil {
if nfo, ok := m._listeners[iface.Name]; !ok || nfo.listener.Listener == nil {
// No listener was found - let's create one
urlString := fmt.Sprintf("tls://[%s]:%d", addrIP, info.port)
u, err := url.Parse(urlString)
@ -301,13 +290,13 @@ func (m *Multicast) _announce() {
m.log.Debugln("Started multicasting on", iface.Name)
// Store the listener so that we can stop it later if needed
linfo = &listenerInfo{listener: li, time: time.Now(), port: info.port}
m.listeners[iface.Name] = linfo
m._listeners[iface.Name] = linfo
} else {
m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err)
}
} else {
// An existing listener was found
linfo = m.listeners[iface.Name]
linfo = m._listeners[iface.Name]
}
// Make sure nothing above failed for some reason
if linfo == nil {
@ -340,7 +329,7 @@ func (m *Multicast) _announce() {
}
func (m *Multicast) listen() {
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
groupAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr))
if err != nil {
panic(err)
}
@ -388,7 +377,7 @@ func (m *Multicast) listen() {
if !from.IP.Equal(addr.IP) {
continue
}
var interfaces map[string]interfaceInfo
var interfaces map[string]*interfaceInfo
phony.Block(m, func() {
interfaces = m._interfaces
})

View File

@ -31,7 +31,7 @@ import (
)
func (m *Multicast) _multicastStarted() {
if !m.isOpen {
if !m._isOpen {
return
}
C.StopAWDLBrowsing()

28
src/multicast/options.go Normal file
View File

@ -0,0 +1,28 @@
package multicast
import "regexp"
func (m *Multicast) _applyOption(opt SetupOption) {
switch v := opt.(type) {
case MulticastInterface:
m.config._interfaces[v] = struct{}{}
case GroupAddress:
m.config._groupAddr = v
}
}
type SetupOption interface {
isSetupOption()
}
type MulticastInterface struct {
Regex *regexp.Regexp
Beacon bool
Listen bool
Port uint16
}
type GroupAddress string
func (a MulticastInterface) isSetupOption() {}
func (a GroupAddress) isSetupOption() {}

View File

@ -8,6 +8,20 @@ import (
"time"
)
// Any logger that satisfies this interface is suitable for Yggdrasil.
type Logger interface {
Printf(string, ...interface{})
Println(...interface{})
Infof(string, ...interface{})
Infoln(...interface{})
Warnf(string, ...interface{})
Warnln(...interface{})
Errorf(string, ...interface{})
Errorln(...interface{})
Debugf(string, ...interface{})
Debugln(...interface{})
}
// TimerStop stops a timer and makes sure the channel is drained, returns true if the timer was stopped before firing.
func TimerStop(t *time.Timer) bool {
stopped := t.Stop()