From b2a2e251ad173686c752463f38efdf77e6814487 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Aug 2019 18:53:11 -0500 Subject: [PATCH] more TunAdapter migration --- src/tuntap/conn.go | 12 ++++--- src/tuntap/iface.go | 78 +++++++++++++++++++-------------------------- src/tuntap/tun.go | 72 ++++++++++++++++++++++++----------------- 3 files changed, 84 insertions(+), 78 deletions(-) diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index cdb8c32e..afb0f09e 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -27,12 +27,10 @@ type tunConn struct { } func (s *tunConn) close() { - s.tun.mutex.Lock() - defer s.tun.mutex.Unlock() - s._close_nomutex() + s.tun.RecvFrom(s, s._close_from_tun) } -func (s *tunConn) _close_nomutex() { +func (s *tunConn) _close_from_tun() { s.conn.Close() delete(s.tun.addrToConn, s.addr) delete(s.tun.subnetToConn, s.snet) @@ -118,6 +116,12 @@ func (s *tunConn) _read(bs []byte) (err error) { return } +func (s *tunConn) writeFrom(from phony.Actor, bs []byte) { + s.RecvFrom(from, func() { + s._write(bs) + }) +} + func (s *tunConn) _write(bs []byte) (err error) { select { case <-s.stop: diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 3f923513..16e8d25d 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -90,13 +90,11 @@ func (w *tunWriter) _write(b []byte) { util.PutBytes(b) } if err != nil { - w.tun.mutex.Lock() - open := w.tun.isOpen - w.tun.mutex.Unlock() - if !open { - return - } - w.tun.log.Errorln("TUN/TAP iface write error:", err) + w.tun.RecvFrom(w, func() { + if !w.tun.isOpen { + w.tun.log.Errorln("TUN/TAP iface write error:", err) + } + }) } if written != n { w.tun.log.Errorln("TUN/TAP iface write mismatch:", written, "bytes written vs", n, "bytes given") @@ -221,7 +219,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { } // Do we have an active connection for this node address? var dstNodeID, dstNodeIDMask *crypto.NodeID - tun.mutex.RLock() session, isIn := tun.addrToConn[dstAddr] if !isIn || session == nil { session, isIn = tun.subnetToConn[dstSnet] @@ -235,7 +232,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { } } } - tun.mutex.RUnlock() // If we don't have a connection then we should open one if !isIn || session == nil { // Check we haven't been given empty node ID, really this shouldn't ever @@ -243,45 +239,37 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { if dstNodeID == nil || dstNodeIDMask == nil { panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") } - // Dial to the remote node - go func() { - // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes - tun.mutex.Lock() - _, known := tun.dials[*dstNodeID] - tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs) - for len(tun.dials[*dstNodeID]) > 32 { - util.PutBytes(tun.dials[*dstNodeID][0]) - tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] - } - tun.mutex.Unlock() - if known { - return - } - var tc *tunConn - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - // We've been given a connection so prepare the session wrapper - if tc, err = tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) + _, known := tun.dials[*dstNodeID] + tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs) + for len(tun.dials[*dstNodeID]) > 32 { + util.PutBytes(tun.dials[*dstNodeID][0]) + tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] + } + if !known { + go func() { + if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { + tun.RecvFrom(nil, func() { + // We've been given a connection so prepare the session wrapper + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + var tc *tunConn + var err error + if tc, err = tun._wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + return + } + for _, packet := range packets { + tc.writeFrom(nil, packet) + } + }) } - } - tun.mutex.Lock() - packets := tun.dials[*dstNodeID] - delete(tun.dials, *dstNodeID) - tun.mutex.Unlock() - if tc != nil { - for _, packet := range packets { - p := packet // Possibly required because of how range - <-tc.SyncExec(func() { tc._write(p) }) - } - } - }() - // While the dial is going on we can't do much else - return + }() + } } // If we have a connection now, try writing to it if isIn && session != nil { - session.RecvFrom(tun, func() { session._write(bs) }) + session.writeFrom(tun, bs) } } diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 7bb3a59f..2b163e3b 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -13,7 +13,7 @@ import ( "errors" "fmt" "net" - "sync" + //"sync" "github.com/Arceliar/phony" "github.com/gologme/log" @@ -34,21 +34,21 @@ const tun_ETHER_HEADER_LENGTH = 14 // you should pass this object to the yggdrasil.SetRouterAdapter() function // before calling yggdrasil.Start(). type TunAdapter struct { - writer tunWriter - reader tunReader - config *config.NodeState - log *log.Logger - reconfigure chan chan error - listener *yggdrasil.Listener - dialer *yggdrasil.Dialer - addr address.Address - subnet address.Subnet - ckr cryptokey - icmpv6 ICMPv6 - mtu int - iface *water.Interface - phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below - mutex sync.RWMutex // Protects the below + writer tunWriter + reader tunReader + config *config.NodeState + log *log.Logger + reconfigure chan chan error + listener *yggdrasil.Listener + dialer *yggdrasil.Dialer + addr address.Address + subnet address.Subnet + ckr cryptokey + icmpv6 ICMPv6 + mtu int + iface *water.Interface + phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below + //mutex sync.RWMutex // Protects the below addrToConn map[address.Address]*tunConn subnetToConn map[address.Subnet]*tunConn dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes @@ -122,8 +122,16 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener } // Start the setup process for the TUN/TAP adapter. If successful, starts the -// read/write goroutines to handle packets on that interface. +// reader actor to handle packets on that interface. func (tun *TunAdapter) Start() error { + var err error + <-tun.SyncExec(func() { + err = tun._start() + }) + return err +} + +func (tun *TunAdapter) _start() error { current := tun.config.GetCurrent() if tun.config == nil || tun.listener == nil || tun.dialer == nil { return errors.New("No configuration available to TUN/TAP") @@ -150,10 +158,8 @@ func (tun *TunAdapter) Start() error { tun.log.Debugln("Not starting TUN/TAP as ifname is none or dummy") return nil } - tun.mutex.Lock() tun.isOpen = true tun.reconfigure = make(chan chan error) - tun.mutex.Unlock() go func() { for { e := <-tun.reconfigure @@ -173,6 +179,14 @@ func (tun *TunAdapter) Start() error { // Start the setup process for the TUN/TAP adapter. If successful, starts the // read/write goroutines to handle packets on that interface. func (tun *TunAdapter) Stop() error { + var err error + <-tun.SyncExec(func() { + err = tun._stop() + }) + return err +} + +func (tun *TunAdapter) _stop() error { tun.isOpen = false // TODO: we have nothing that cleanly stops all the various goroutines opened // by TUN/TAP, e.g. readers/writers, sessions @@ -219,15 +233,17 @@ func (tun *TunAdapter) handler() error { tun.log.Errorln("TUN/TAP connection accept error:", err) return err } - if _, err := tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP handler wrap:", err) - } + <-tun.SyncExec(func() { + if _, err := tun._wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP handler wrap:", err) + } + }) } } -func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { +func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { // Prepare a session wrapper for the given connection s := tunConn{ tun: tun, @@ -240,17 +256,15 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { s.addr = *address.AddrForNodeID(&remoteNodeID) s.snet = *address.SubnetForNodeID(&remoteNodeID) // Work out if this is already a destination we already know about - tun.mutex.Lock() - defer tun.mutex.Unlock() atc, aok := tun.addrToConn[s.addr] stc, sok := tun.subnetToConn[s.snet] // If we know about a connection for this destination already then assume it // is no longer valid and close it if aok { - atc._close_nomutex() + atc._close_from_tun() err = errors.New("replaced connection for address") } else if sok { - stc._close_nomutex() + stc._close_from_tun() err = errors.New("replaced connection for subnet") } // Save the session wrapper so that we can look it up quickly next time