2017-12-29 07:16:20 +03:00
package yggdrasil
// This sends packets to peers using TCP as a transport
// It's generally better tested than the UDP implementation
// Using it regularly is insane, but I find TCP easier to test/debug with it
// Updating and optimizing the UDP version is a higher priority
// TODO:
// Something needs to make sure we're getting *valid* packets
// Could be used to DoS (connect, give someone else's keys, spew garbage)
// I guess the "peer" part should watch for link packets, disconnect?
2018-06-10 01:46:19 +03:00
// TCP connections start with a metadata exchange.
// It involves exchanging version numbers and crypto keys
// See version.go for version metadata format
2018-06-13 01:50:08 +03:00
import (
"errors"
"fmt"
2018-07-19 04:03:24 +03:00
"io"
2018-06-21 18:39:43 +03:00
"math/rand"
2018-06-13 01:50:08 +03:00
"net"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/proxy"
2018-12-15 05:49:18 +03:00
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2018-06-13 01:50:08 +03:00
)
2017-12-29 07:16:20 +03:00
2018-01-05 01:37:51 +03:00
const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense
2018-07-29 17:30:13 +03:00
const default_tcp_timeout = 6 * time . Second
const tcp_ping_interval = ( default_tcp_timeout * 2 / 3 )
2017-12-29 07:16:20 +03:00
2018-06-11 02:03:28 +03:00
// The TCP listener and information about active TCP connections, to avoid duplication.
2017-12-29 07:16:20 +03:00
type tcpInterface struct {
2018-07-29 17:30:13 +03:00
core * Core
2018-12-30 18:21:09 +03:00
reconfigure chan chan error
2018-07-29 17:30:13 +03:00
serv net . Listener
2018-12-30 18:21:09 +03:00
serv_stop chan bool
2018-07-29 17:30:13 +03:00
tcp_timeout time . Duration
2018-12-29 22:14:26 +03:00
tcp_addr string
2018-07-29 17:30:13 +03:00
mutex sync . Mutex // Protecting the below
calls map [ string ] struct { }
conns map [ tcpInfo ] ( chan struct { } )
2017-12-29 07:16:20 +03:00
}
2018-06-11 02:03:28 +03:00
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable).
2018-02-18 05:44:23 +03:00
type tcpInfo struct {
2018-12-15 05:49:18 +03:00
box crypto . BoxPubKey
sig crypto . SigPubKey
2018-04-20 15:41:09 +03:00
remoteAddr string
2018-12-30 19:48:34 +03:00
remotePort string
2017-12-29 07:16:20 +03:00
}
2018-12-15 03:15:35 +03:00
// Wrapper function to set additional options for specific connection types.
func ( iface * tcpInterface ) setExtraOptions ( c net . Conn ) {
switch sock := c . ( type ) {
case * net . TCPConn :
sock . SetNoDelay ( true )
// TODO something for socks5
default :
}
}
2018-06-11 02:03:28 +03:00
// Returns the address of the listener.
2018-05-28 00:13:37 +03:00
func ( iface * tcpInterface ) getAddr ( ) * net . TCPAddr {
return iface . serv . Addr ( ) . ( * net . TCPAddr )
}
2018-06-11 02:03:28 +03:00
// Attempts to initiate a connection to the provided address.
2018-09-25 17:32:45 +03:00
func ( iface * tcpInterface ) connect ( addr string , intf string ) {
iface . call ( addr , nil , intf )
2018-05-28 00:13:37 +03:00
}
2018-06-11 02:03:28 +03:00
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address.
2018-05-28 00:13:37 +03:00
func ( iface * tcpInterface ) connectSOCKS ( socksaddr , peeraddr string ) {
2018-09-25 17:32:45 +03:00
iface . call ( peeraddr , & socksaddr , "" )
2018-05-28 00:13:37 +03:00
}
2018-06-11 02:03:28 +03:00
// Initializes the struct.
2018-12-29 22:14:26 +03:00
func ( iface * tcpInterface ) init ( core * Core ) ( err error ) {
2018-01-05 01:37:51 +03:00
iface . core = core
2018-12-30 18:21:09 +03:00
iface . serv_stop = make ( chan bool , 1 )
iface . reconfigure = make ( chan chan error , 1 )
go func ( ) {
for {
select {
case e := <- iface . reconfigure :
iface . core . configMutex . RLock ( )
updated := iface . core . config . Listen != iface . core . configOld . Listen
iface . core . configMutex . RUnlock ( )
if updated {
iface . serv_stop <- true
iface . serv . Close ( )
e <- iface . listen ( )
} else {
e <- nil
}
}
}
} ( )
return iface . listen ( )
}
func ( iface * tcpInterface ) listen ( ) error {
var err error
2018-12-29 22:53:31 +03:00
iface . core . configMutex . RLock ( )
2018-12-29 22:14:26 +03:00
iface . tcp_addr = iface . core . config . Listen
iface . tcp_timeout = time . Duration ( iface . core . config . ReadTimeout ) * time . Millisecond
2018-12-29 22:53:31 +03:00
iface . core . configMutex . RUnlock ( )
2018-12-30 18:21:09 +03:00
2018-07-29 17:30:13 +03:00
if iface . tcp_timeout >= 0 && iface . tcp_timeout < default_tcp_timeout {
iface . tcp_timeout = default_tcp_timeout
}
2018-12-29 22:14:26 +03:00
iface . serv , err = net . Listen ( "tcp" , iface . tcp_addr )
2018-04-19 17:30:40 +03:00
if err == nil {
iface . calls = make ( map [ string ] struct { } )
iface . conns = make ( map [ tcpInfo ] ( chan struct { } ) )
go iface . listener ( )
2018-12-30 18:21:09 +03:00
return nil
2018-01-05 01:37:51 +03:00
}
2018-05-28 00:13:37 +03:00
return err
2017-12-29 07:16:20 +03:00
}
2018-06-11 02:03:28 +03:00
// Runs the listener, which spawns off goroutines for incoming connections.
2017-12-29 07:16:20 +03:00
func ( iface * tcpInterface ) listener ( ) {
2018-01-05 01:37:51 +03:00
defer iface . serv . Close ( )
2018-03-07 12:41:04 +03:00
iface . core . log . Println ( "Listening for TCP on:" , iface . serv . Addr ( ) . String ( ) )
2018-01-05 01:37:51 +03:00
for {
2018-04-19 17:30:40 +03:00
sock , err := iface . serv . Accept ( )
2018-12-30 18:21:09 +03:00
select {
case <- iface . serv_stop :
iface . core . log . Println ( "Stopping listener" )
return
default :
if err != nil {
panic ( err )
}
go iface . handler ( sock , true )
2018-01-05 01:37:51 +03:00
}
}
2017-12-29 07:16:20 +03:00
}
2018-06-11 02:03:28 +03:00
// Checks if a connection already exists.
// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address.
// If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns.
2018-09-25 17:32:45 +03:00
func ( iface * tcpInterface ) call ( saddr string , socksaddr * string , sintf string ) {
2018-01-05 01:37:51 +03:00
go func ( ) {
2018-09-25 20:05:57 +03:00
callname := saddr
if sintf != "" {
callname = fmt . Sprintf ( "%s/%s" , saddr , sintf )
}
2018-01-05 01:37:51 +03:00
quit := false
iface . mutex . Lock ( )
2018-09-25 17:32:45 +03:00
if _ , isIn := iface . calls [ callname ] ; isIn {
2018-01-05 01:37:51 +03:00
quit = true
} else {
2018-09-25 17:32:45 +03:00
iface . calls [ callname ] = struct { } { }
2018-01-05 01:37:51 +03:00
defer func ( ) {
2018-06-16 23:31:25 +03:00
// Block new calls for a little while, to mitigate livelock scenarios
2018-07-29 17:30:13 +03:00
time . Sleep ( default_tcp_timeout )
2018-06-21 18:39:43 +03:00
time . Sleep ( time . Duration ( rand . Intn ( 1000 ) ) * time . Millisecond )
2018-01-05 01:37:51 +03:00
iface . mutex . Lock ( )
2018-09-25 17:32:45 +03:00
delete ( iface . calls , callname )
2018-01-05 01:37:51 +03:00
iface . mutex . Unlock ( )
} ( )
}
iface . mutex . Unlock ( )
2018-06-14 17:11:34 +03:00
if quit {
return
}
var conn net . Conn
var err error
2018-06-14 17:21:35 +03:00
if socksaddr != nil {
2018-09-25 21:46:06 +03:00
if sintf != "" {
return
}
2018-06-14 17:21:35 +03:00
var dialer proxy . Dialer
dialer , err = proxy . SOCKS5 ( "tcp" , * socksaddr , nil , proxy . Direct )
if err != nil {
return
}
2018-06-14 17:11:34 +03:00
conn , err = dialer . Dial ( "tcp" , saddr )
if err != nil {
return
}
conn = & wrappedConn {
c : conn ,
raddr : & wrappedAddr {
network : "tcp" ,
addr : saddr ,
} ,
}
} else {
2018-09-25 17:32:45 +03:00
dialer := net . Dialer { }
if sintf != "" {
ief , err := net . InterfaceByName ( sintf )
2018-10-04 14:26:08 +03:00
if err != nil {
return
} else {
2018-10-07 19:13:41 +03:00
if ief . Flags & net . FlagUp == 0 {
2018-09-27 14:14:55 +03:00
return
2018-09-25 21:46:06 +03:00
}
2018-09-25 17:32:45 +03:00
addrs , err := ief . Addrs ( )
if err == nil {
dst , err := net . ResolveTCPAddr ( "tcp" , saddr )
if err != nil {
return
}
for _ , addr := range addrs {
src , _ , err := net . ParseCIDR ( addr . String ( ) )
if err != nil {
continue
}
if ( src . To4 ( ) != nil ) == ( dst . IP . To4 ( ) != nil ) && src . IsGlobalUnicast ( ) {
dialer . LocalAddr = & net . TCPAddr {
IP : src ,
Port : 0 ,
}
2018-09-25 21:46:06 +03:00
break
2018-09-25 17:32:45 +03:00
}
}
if dialer . LocalAddr == nil {
return
}
}
}
}
conn , err = dialer . Dial ( "tcp" , saddr )
2018-01-05 01:37:51 +03:00
if err != nil {
return
}
}
2018-06-14 17:11:34 +03:00
iface . handler ( conn , false )
2018-01-05 01:37:51 +03:00
} ( )
2017-12-29 07:16:20 +03:00
}
2018-06-11 02:03:28 +03:00
// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.
// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout).
2018-05-07 00:32:34 +03:00
func ( iface * tcpInterface ) handler ( sock net . Conn , incoming bool ) {
2018-01-05 01:37:51 +03:00
defer sock . Close ( )
2018-12-15 03:15:35 +03:00
iface . setExtraOptions ( sock )
2018-01-05 01:37:51 +03:00
// Get our keys
2018-12-15 05:49:18 +03:00
myLinkPub , myLinkPriv := crypto . NewBoxKeys ( ) // ephemeral link keys
2018-06-10 01:46:19 +03:00
meta := version_getBaseMetadata ( )
meta . box = iface . core . boxPub
meta . sig = iface . core . sigPub
meta . link = * myLinkPub
metaBytes := meta . encode ( )
_ , err := sock . Write ( metaBytes )
2018-01-05 01:37:51 +03:00
if err != nil {
return
}
2018-07-29 17:30:13 +03:00
if iface . tcp_timeout > 0 {
sock . SetReadDeadline ( time . Now ( ) . Add ( iface . tcp_timeout ) )
}
2018-06-10 02:38:30 +03:00
_ , err = sock . Read ( metaBytes )
2018-01-05 01:37:51 +03:00
if err != nil {
return
}
2018-06-10 01:46:19 +03:00
meta = version_metadata { } // Reset to zero value
2018-06-10 02:38:30 +03:00
if ! meta . decode ( metaBytes ) || ! meta . check ( ) {
// Failed to decode and check the metadata
// If it's a version mismatch issue, then print an error message
2018-06-10 01:46:19 +03:00
base := version_getBaseMetadata ( )
if meta . meta == base . meta {
if meta . ver > base . ver {
iface . core . log . Println ( "Failed to connect to node:" , sock . RemoteAddr ( ) . String ( ) , "version:" , meta . ver )
} else if meta . ver == base . ver && meta . minorVer > base . minorVer {
iface . core . log . Println ( "Failed to connect to node:" , sock . RemoteAddr ( ) . String ( ) , "version:" , fmt . Sprintf ( "%d.%d" , meta . ver , meta . minorVer ) )
}
}
2018-06-10 02:38:30 +03:00
// TODO? Block forever to prevent future connection attempts? suppress future messages about the same node?
2018-01-05 01:37:51 +03:00
return
}
2018-06-10 01:46:19 +03:00
info := tcpInfo { // used as a map key, so don't include ephemeral link key
box : meta . box ,
sig : meta . sig ,
}
2018-01-05 01:37:51 +03:00
// Quit the parent call if this is a connection to ourself
equiv := func ( k1 , k2 [ ] byte ) bool {
for idx := range k1 {
if k1 [ idx ] != k2 [ idx ] {
return false
}
}
return true
}
2018-02-18 05:44:23 +03:00
if equiv ( info . box [ : ] , iface . core . boxPub [ : ] ) {
2018-01-05 01:37:51 +03:00
return
2018-06-13 01:50:08 +03:00
}
2018-02-18 05:44:23 +03:00
if equiv ( info . sig [ : ] , iface . core . sigPub [ : ] ) {
2018-01-05 01:37:51 +03:00
return
}
2018-05-07 00:32:34 +03:00
// Check if we're authorized to connect to this key / IP
2018-05-23 13:28:20 +03:00
if incoming && ! iface . core . peers . isAllowedEncryptionPublicKey ( & info . box ) {
2018-05-07 00:32:34 +03:00
// Allow unauthorized peers if they're link-local
raddrStr , _ , _ := net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
raddr := net . ParseIP ( raddrStr )
if ! raddr . IsLinkLocalUnicast ( ) {
return
}
}
2018-02-18 05:44:23 +03:00
// Check if we already have a connection to this node, close and block if yes
2018-12-30 19:48:34 +03:00
info . remoteAddr , info . remotePort , _ = net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
2018-02-18 05:44:23 +03:00
iface . mutex . Lock ( )
if blockChan , isIn := iface . conns [ info ] ; isIn {
iface . mutex . Unlock ( )
sock . Close ( )
<- blockChan
return
}
blockChan := make ( chan struct { } )
iface . conns [ info ] = blockChan
iface . mutex . Unlock ( )
2018-02-20 08:22:36 +03:00
defer func ( ) {
iface . mutex . Lock ( )
delete ( iface . conns , info )
iface . mutex . Unlock ( )
close ( blockChan )
} ( )
2018-01-05 01:37:51 +03:00
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
2018-12-15 05:49:18 +03:00
p := iface . core . peers . newPeer ( & info . box , & info . sig , crypto . GetSharedKey ( myLinkPriv , & meta . link ) , sock . RemoteAddr ( ) . String ( ) )
2018-06-07 01:44:10 +03:00
p . linkOut = make ( chan [ ] byte , 1 )
2018-01-05 01:37:51 +03:00
in := func ( bs [ ] byte ) {
2018-06-07 01:44:10 +03:00
p . handlePacket ( bs )
2018-01-05 01:37:51 +03:00
}
2018-06-24 05:51:32 +03:00
out := make ( chan [ ] byte , 1 )
2018-01-05 01:37:51 +03:00
defer close ( out )
go func ( ) {
2018-06-24 05:51:32 +03:00
// This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
2018-06-25 04:20:07 +03:00
send := func ( msg [ ] byte ) {
msgLen := wire_encode_uint64 ( uint64 ( len ( msg ) ) )
buf := net . Buffers { tcp_msg [ : ] , msgLen , msg }
buf . WriteTo ( sock )
atomic . AddUint64 ( & p . bytesSent , uint64 ( len ( tcp_msg ) + len ( msgLen ) + len ( msg ) ) )
2018-12-15 05:49:18 +03:00
util . PutBytes ( msg )
2018-06-25 04:20:07 +03:00
}
2018-07-29 17:30:13 +03:00
timerInterval := tcp_ping_interval
2018-06-08 00:49:51 +03:00
timer := time . NewTimer ( timerInterval )
defer timer . Stop ( )
2018-06-07 01:44:10 +03:00
for {
2018-06-24 05:51:32 +03:00
select {
case msg := <- p . linkOut :
// Always send outgoing link traffic first, if needed
2018-06-25 04:20:07 +03:00
send ( msg )
2018-06-24 05:51:32 +03:00
continue
default :
2018-06-07 23:04:17 +03:00
}
2018-06-24 05:51:32 +03:00
// Otherwise wait reset the timer and wait for something to do
2018-06-08 00:49:51 +03:00
timer . Stop ( )
select {
case <- timer . C :
default :
}
timer . Reset ( timerInterval )
2018-06-07 01:44:10 +03:00
select {
2018-06-08 00:49:51 +03:00
case _ = <- timer . C :
2018-06-25 04:20:07 +03:00
send ( nil ) // TCP keep-alive traffic
2018-06-07 01:44:10 +03:00
case msg := <- p . linkOut :
2018-06-25 04:20:07 +03:00
send ( msg )
2018-06-07 01:44:10 +03:00
case msg , ok := <- out :
if ! ok {
return
}
2018-06-25 04:20:07 +03:00
send ( msg ) // Block until the socket write has finished
2018-06-24 05:51:32 +03:00
// Now inform the switch that we're ready for more traffic
p . core . switchTable . idleIn <- p . port
2018-01-05 01:37:51 +03:00
}
}
} ( )
2018-06-24 05:51:32 +03:00
p . core . switchTable . idleIn <- p . port // Start in the idle state
2018-01-05 01:37:51 +03:00
p . out = func ( msg [ ] byte ) {
defer func ( ) { recover ( ) } ( )
2018-06-23 04:39:57 +03:00
out <- msg
2018-01-05 01:37:51 +03:00
}
2018-05-06 01:14:03 +03:00
p . close = func ( ) { sock . Close ( ) }
2018-06-07 01:44:10 +03:00
go p . linkLoop ( )
2018-01-05 01:37:51 +03:00
defer func ( ) {
// Put all of our cleanup here...
2018-05-06 01:14:03 +03:00
p . core . peers . removePeer ( p . port )
2018-01-05 01:37:51 +03:00
} ( )
2018-09-25 17:32:45 +03:00
us , _ , _ := net . SplitHostPort ( sock . LocalAddr ( ) . String ( ) )
2018-04-20 15:41:09 +03:00
them , _ , _ := net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
2018-12-15 05:49:18 +03:00
themNodeID := crypto . GetNodeID ( & info . box )
themAddr := address . AddrForNodeID ( themNodeID )
2018-01-05 01:37:51 +03:00
themAddrString := net . IP ( themAddr [ : ] ) . String ( )
themString := fmt . Sprintf ( "%s@%s" , themAddrString , them )
2018-12-30 18:21:09 +03:00
iface . core . log . Printf ( "Connected: %s, source: %s" , themString , us )
2018-07-19 04:03:24 +03:00
err = iface . reader ( sock , in ) // In this goroutine, because of defers
if err == nil {
2018-12-30 18:21:09 +03:00
iface . core . log . Printf ( "Disconnected: %s, source: %s" , themString , us )
2018-07-19 04:03:24 +03:00
} else {
2018-12-30 18:21:09 +03:00
iface . core . log . Printf ( "Disconnected: %s, source: %s, error: %s" , themString , us , err )
2018-07-19 04:03:24 +03:00
}
2018-01-05 01:37:51 +03:00
return
2017-12-29 07:16:20 +03:00
}
2018-06-11 02:03:28 +03:00
// This reads from the socket into a []byte buffer for incomping messages.
// It copies completed messages out of the cache into a new slice, and passes them to the peer struct via the provided `in func([]byte)` argument.
// Then it shifts the incomplete fragments of data forward so future reads won't overwrite it.
2018-07-19 04:03:24 +03:00
func ( iface * tcpInterface ) reader ( sock net . Conn , in func ( [ ] byte ) ) error {
2018-01-05 01:37:51 +03:00
bs := make ( [ ] byte , 2 * tcp_msgSize )
frag := bs [ : 0 ]
for {
2018-07-29 17:30:13 +03:00
if iface . tcp_timeout > 0 {
sock . SetReadDeadline ( time . Now ( ) . Add ( iface . tcp_timeout ) )
}
2018-01-05 01:37:51 +03:00
n , err := sock . Read ( bs [ len ( frag ) : ] )
2018-07-20 00:58:53 +03:00
if n > 0 {
frag = bs [ : len ( frag ) + n ]
for {
msg , ok , err2 := tcp_chop_msg ( & frag )
if err2 != nil {
return fmt . Errorf ( "Message error: %v" , err2 )
}
if ! ok {
// We didn't get the whole message yet
break
}
2018-12-15 05:49:18 +03:00
newMsg := append ( util . GetBytes ( ) , msg ... )
2018-07-20 00:58:53 +03:00
in ( newMsg )
2018-12-15 05:49:18 +03:00
util . Yield ( )
2018-07-20 00:58:53 +03:00
}
frag = append ( bs [ : 0 ] , frag ... )
}
2018-01-05 01:37:51 +03:00
if err != nil || n == 0 {
2018-07-19 04:03:24 +03:00
if err != io . EOF {
return err
}
return nil
2018-01-05 01:37:51 +03:00
}
}
2017-12-29 07:16:20 +03:00
}
////////////////////////////////////////////////////////////////////////////////
2018-06-11 02:03:28 +03:00
// These are 4 bytes of padding used to catch if something went horribly wrong with the tcp connection.
2017-12-29 07:16:20 +03:00
var tcp_msg = [ ... ] byte { 0xde , 0xad , 0xb1 , 0x75 } // "dead bits"
2018-06-11 02:03:28 +03:00
// This takes a pointer to a slice as an argument.
// It checks if there's a complete message and, if so, slices out those parts and returns the message, true, and nil.
// If there's no error, but also no complete message, it returns nil, false, and nil.
// If there's an error, it returns nil, false, and the error, which the reader then handles (currently, by returning from the reader, which causes the connection to close).
2017-12-29 07:16:20 +03:00
func tcp_chop_msg ( bs * [ ] byte ) ( [ ] byte , bool , error ) {
2018-01-05 01:37:51 +03:00
// Returns msg, ok, err
if len ( * bs ) < len ( tcp_msg ) {
return nil , false , nil
}
for idx := range tcp_msg {
if ( * bs ) [ idx ] != tcp_msg [ idx ] {
return nil , false , errors . New ( "Bad message!" )
}
}
msgLen , msgLenLen := wire_decode_uint64 ( ( * bs ) [ len ( tcp_msg ) : ] )
if msgLen > tcp_msgSize {
return nil , false , errors . New ( "Oversized message!" )
}
msgBegin := len ( tcp_msg ) + msgLenLen
msgEnd := msgBegin + int ( msgLen )
if msgLenLen == 0 || len ( * bs ) < msgEnd {
// We don't have the full message
// Need to buffer this and wait for the rest to come in
return nil , false , nil
}
msg := ( * bs ) [ msgBegin : msgEnd ]
( * bs ) = ( * bs ) [ msgEnd : ]
return msg , true , nil
2017-12-29 07:16:20 +03:00
}