Merge pull request #174 from cathugger/develop

Make TCP read timeouts configurable.
This commit is contained in:
Arceliar 2018-07-29 10:39:43 -05:00 committed by GitHub
commit ddab8ecf33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 14 deletions

View File

@ -5,6 +5,7 @@ type NodeConfig struct {
Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."`
AdminListen string `comment:"Listen address for admin connections Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X."` AdminListen string `comment:"Listen address for admin connections Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X."`
Peers []string `comment:"List of connection strings for static peers in URI format, i.e.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j"` Peers []string `comment:"List of connection strings for static peers in URI format, i.e.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j"`
ReadTimeout int32 `comment:"Read timeout for connections, specified in milliseconds. If less than 6000 and not negative, 6000 (the default) is used. If negative, reads won't time out."`
AllowedEncryptionPublicKeys []string `comment:"List of peer encryption public keys to allow or incoming TCP\nconnections from. If left empty/undefined then all connections\nwill be allowed by default."` AllowedEncryptionPublicKeys []string `comment:"List of peer encryption public keys to allow or incoming TCP\nconnections from. If left empty/undefined then all connections\nwill be allowed by default."`
EncryptionPublicKey string `comment:"Your public encryption key. Your peers may ask you for this to put\ninto their AllowedEncryptionPublicKeys configuration."` EncryptionPublicKey string `comment:"Your public encryption key. Your peers may ask you for this to put\ninto their AllowedEncryptionPublicKeys configuration."`
EncryptionPrivateKey string `comment:"Your private encryption key. DO NOT share this with anyone!"` EncryptionPrivateKey string `comment:"Your private encryption key. DO NOT share this with anyone!"`

View File

@ -97,7 +97,7 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)
c.admin.init(c, nc.AdminListen) c.admin.init(c, nc.AdminListen)
if err := c.tcp.init(c, nc.Listen); err != nil { if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil {
c.log.Println("Failed to start TCP interface") c.log.Println("Failed to start TCP interface")
return err return err
} }

View File

@ -28,7 +28,8 @@ import (
) )
const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense
const tcp_timeout = 6 * time.Second const default_tcp_timeout = 6 * time.Second
const tcp_ping_interval = (default_tcp_timeout * 2 / 3)
// Wrapper function for non tcp/ip connections. // Wrapper function for non tcp/ip connections.
func setNoDelay(c net.Conn, delay bool) { func setNoDelay(c net.Conn, delay bool) {
@ -40,11 +41,12 @@ func setNoDelay(c net.Conn, delay bool) {
// The TCP listener and information about active TCP connections, to avoid duplication. // The TCP listener and information about active TCP connections, to avoid duplication.
type tcpInterface struct { type tcpInterface struct {
core *Core core *Core
serv net.Listener serv net.Listener
mutex sync.Mutex // Protecting the below tcp_timeout time.Duration
calls map[string]struct{} mutex sync.Mutex // Protecting the below
conns map[tcpInfo](chan struct{}) calls map[string]struct{}
conns map[tcpInfo](chan struct{})
} }
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
@ -72,9 +74,14 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) {
} }
// Initializes the struct. // Initializes the struct.
func (iface *tcpInterface) init(core *Core, addr string) (err error) { func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err error) {
iface.core = core iface.core = core
iface.tcp_timeout = time.Duration(readTimeout) * time.Millisecond
if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout {
iface.tcp_timeout = default_tcp_timeout
}
iface.serv, err = net.Listen("tcp", addr) iface.serv, err = net.Listen("tcp", addr)
if err == nil { if err == nil {
iface.calls = make(map[string]struct{}) iface.calls = make(map[string]struct{})
@ -113,7 +120,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string) {
iface.calls[saddr] = struct{}{} iface.calls[saddr] = struct{}{}
defer func() { defer func() {
// Block new calls for a little while, to mitigate livelock scenarios // Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(tcp_timeout) time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock() iface.mutex.Lock()
delete(iface.calls, saddr) delete(iface.calls, saddr)
@ -168,8 +175,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
if err != nil { if err != nil {
return return
} }
timeout := time.Now().Add(tcp_timeout) if iface.tcp_timeout > 0 {
sock.SetReadDeadline(timeout) sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout))
}
_, err = sock.Read(metaBytes) _, err = sock.Read(metaBytes)
if err != nil { if err != nil {
return return
@ -254,7 +262,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
util_putBytes(msg) util_putBytes(msg)
} }
timerInterval := tcp_timeout * 2 / 3 timerInterval := tcp_ping_interval
timer := time.NewTimer(timerInterval) timer := time.NewTimer(timerInterval)
defer timer.Stop() defer timer.Stop()
for { for {
@ -321,8 +329,9 @@ func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) error {
bs := make([]byte, 2*tcp_msgSize) bs := make([]byte, 2*tcp_msgSize)
frag := bs[:0] frag := bs[:0]
for { for {
timeout := time.Now().Add(tcp_timeout) if iface.tcp_timeout > 0 {
sock.SetReadDeadline(timeout) sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout))
}
n, err := sock.Read(bs[len(frag):]) n, err := sock.Read(bs[len(frag):])
if n > 0 { if n > 0 {
frag = bs[:len(frag)+n] frag = bs[:len(frag)+n]