Some more (inelegant) multiple listener code plus some reconfigure support

This commit is contained in:
Neil Alexander 2019-03-04 18:41:32 +00:00
parent be8db0c120
commit 82bb95b77f
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
5 changed files with 103 additions and 31 deletions

View File

@ -76,3 +76,20 @@ func FuncTimeout(f func(), timeout time.Duration) bool {
return false return false
} }
} }
// This calculates the difference between two arrays and returns items
// that appear in A but not in B - useful somewhat when reconfiguring
// and working out what configuration items changed
func Difference(a, b []string) []string {
ab := []string{}
mb := map[string]bool{}
for _, x := range b {
mb[x] = true
}
for _, x := range a {
if _, ok := mb[x]; !ok {
ab = append(ab, x)
}
}
return ab
}

View File

@ -7,9 +7,10 @@ import (
) )
type awdl struct { type awdl struct {
link *link link *link
mutex sync.RWMutex // protects interfaces below reconfigure chan chan error
interfaces map[string]*awdlInterface mutex sync.RWMutex // protects interfaces below
interfaces map[string]*awdlInterface
} }
type awdlInterface struct { type awdlInterface struct {
@ -49,8 +50,16 @@ func (a *awdl) init(l *link) error {
a.link = l a.link = l
a.mutex.Lock() a.mutex.Lock()
a.interfaces = make(map[string]*awdlInterface) a.interfaces = make(map[string]*awdlInterface)
a.reconfigure = make(chan chan error, 1)
a.mutex.Unlock() a.mutex.Unlock()
go func() {
for {
e := <-a.reconfigure
e <- nil
}
}()
return nil return nil
} }

View File

@ -143,7 +143,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.router.tun.reconfigure, c.router.tun.reconfigure,
c.router.cryptokey.reconfigure, c.router.cryptokey.reconfigure,
c.switchTable.reconfigure, c.switchTable.reconfigure,
// c.link.reconfigure, c.link.reconfigure,
c.multicast.reconfigure, c.multicast.reconfigure,
} }

View File

@ -18,12 +18,13 @@ import (
) )
type link struct { type link struct {
core *Core core *Core
mutex sync.RWMutex // protects interfaces below reconfigure chan chan error
interfaces map[linkInfo]*linkInterface mutex sync.RWMutex // protects interfaces below
handlers map[string]linkListener interfaces map[linkInfo]*linkInterface
awdl awdl // AWDL interface support handlers map[string]linkListener
tcp tcp // TCP interface support awdl awdl // AWDL interface support
tcp tcp // TCP interface support
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
} }
@ -63,6 +64,7 @@ func (l *link) init(c *Core) error {
l.core = c l.core = c
l.mutex.Lock() l.mutex.Lock()
l.interfaces = make(map[linkInfo]*linkInterface) l.interfaces = make(map[linkInfo]*linkInterface)
l.reconfigure = make(chan chan error)
l.mutex.Unlock() l.mutex.Unlock()
if err := l.tcp.init(l); err != nil { if err := l.tcp.init(l); err != nil {
@ -75,6 +77,23 @@ func (l *link) init(c *Core) error {
return err return err
} }
go func() {
for {
e := <-l.reconfigure
tcpresponse := make(chan error)
awdlresponse := make(chan error)
l.tcp.reconfigure <- tcpresponse
l.awdl.reconfigure <- awdlresponse
if err := <-tcpresponse; err != nil {
e <- err
}
if err := <-awdlresponse; err != nil {
e <- err
}
e <- nil
}
}()
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
const default_timeout = 6 * time.Second const default_timeout = 6 * time.Second
@ -32,13 +33,13 @@ const tcp_ping_interval = (default_timeout * 2 / 3)
// The TCP listener and information about active TCP connections, to avoid duplication. // The TCP listener and information about active TCP connections, to avoid duplication.
type tcp struct { type tcp struct {
link *link link *link
reconfigure chan chan error reconfigure chan chan error
stop chan bool mutex sync.Mutex // Protecting the below
mutex sync.Mutex // Protecting the below listeners map[string]net.Listener
listeners map[string]net.Listener listenerstops map[string]chan bool
calls map[string]struct{} calls map[string]struct{}
conns map[tcpInfo](chan struct{}) conns map[tcpInfo](chan struct{})
} }
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
@ -81,22 +82,38 @@ func (t *tcp) connectSOCKS(socksaddr, peeraddr string) {
// Initializes the struct. // Initializes the struct.
func (t *tcp) init(l *link) error { func (t *tcp) init(l *link) error {
t.link = l t.link = l
t.stop = make(chan bool, 1)
t.reconfigure = make(chan chan error, 1) t.reconfigure = make(chan chan error, 1)
go func() { go func() {
for { for {
e := <-t.reconfigure e := <-t.reconfigure
t.link.core.configMutex.RLock() t.link.core.configMutex.RLock()
//updated := t.link.core.config.Listen != t.link.core.configOld.Listen added := util.Difference(t.link.core.config.Listen, t.link.core.configOld.Listen)
updated := false deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen)
updated := len(added) > 0 || len(deleted) > 0
t.link.core.configMutex.RUnlock() t.link.core.configMutex.RUnlock()
if updated { if updated {
/* t.stop <- true for _, add := range added {
for _, listener := range t.listeners { if add[:6] != "tcp://" {
continue
}
if err := t.listen(add[6:]); err != nil {
e <- err
continue
}
}
for _, delete := range deleted {
t.link.core.log.Warnln("Removing listener", delete, "not currently implemented")
/*t.mutex.Lock()
if listener, ok := t.listeners[delete]; ok {
listener.Close() listener.Close()
} }
e <- t.listen() */ if listener, ok := t.listenerstops[delete]; ok {
listener <- true
}
t.mutex.Unlock()*/
}
e <- nil
} else { } else {
e <- nil e <- nil
} }
@ -107,6 +124,7 @@ func (t *tcp) init(l *link) error {
t.calls = make(map[string]struct{}) t.calls = make(map[string]struct{})
t.conns = make(map[tcpInfo](chan struct{})) t.conns = make(map[tcpInfo](chan struct{}))
t.listeners = make(map[string]net.Listener) t.listeners = make(map[string]net.Listener)
t.listenerstops = make(map[string]chan bool)
t.mutex.Unlock() t.mutex.Unlock()
t.link.core.configMutex.RLock() t.link.core.configMutex.RLock()
@ -134,6 +152,7 @@ func (t *tcp) listen(listenaddr string) error {
if err == nil { if err == nil {
t.mutex.Lock() t.mutex.Lock()
t.listeners[listenaddr] = listener t.listeners[listenaddr] = listener
t.listenerstops[listenaddr] = make(chan bool, 1)
t.mutex.Unlock() t.mutex.Unlock()
go t.listener(listenaddr) go t.listener(listenaddr)
return nil return nil
@ -149,17 +168,25 @@ func (t *tcp) listener(listenaddr string) {
t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist")
return return
} }
reallistenaddr := listener.Addr().String()
defer listener.Close() defer listener.Close()
t.link.core.log.Infoln("Listening for TCP on:", listener.Addr().String()) t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr)
for { for {
sock, err := listener.Accept() var sock net.Conn
if err != nil { var err error
t.link.core.log.Errorln("Failed to accept connection:", err) accepted := make(chan bool)
return go func() {
} sock, err = listener.Accept()
accepted <- true
}()
select { select {
case <-t.stop: case <-accepted:
t.link.core.log.Errorln("Stopping listener") if err != nil {
t.link.core.log.Errorln("Failed to accept connection:", err)
return
}
case <-t.listenerstops[listenaddr]:
t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr)
return return
default: default:
if err != nil { if err != nil {