close unclaimed acceptors after 3 seconds

This commit is contained in:
orignal 2023-11-17 18:50:52 -05:00
parent 94255ebaf4
commit d327533b56
2 changed files with 40 additions and 17 deletions

View File

@ -601,11 +601,21 @@ namespace client
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
} }
else if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) else
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts)
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front ();
if (socket)
m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket));
}
if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE)
{ {
// already accepting, queue up // already accepting, queue up
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
session->acceptQueue.push_back (shared_from_this()); session->acceptQueue.push_back (std::make_pair(shared_from_this(), ts));
} }
else else
{ {
@ -613,6 +623,7 @@ namespace client
SendStreamI2PError ("Already accepting"); SendStreamI2PError ("Already accepting");
} }
} }
}
else else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
} }
@ -1065,7 +1076,17 @@ namespace client
if (session && !session->acceptQueue.empty ()) if (session && !session->acceptQueue.empty ())
{ {
// pending acceptors // pending acceptors
auto socket = session->acceptQueue.front (); auto ts = i2p::util::GetSecondsSinceEpoch ();
while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts)
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front ();
if (socket)
m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket));
}
if (!session->acceptQueue.empty ())
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front (); session->acceptQueue.pop_front ();
if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor) if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor)
{ {
@ -1073,6 +1094,7 @@ namespace client
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1)); session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1));
} }
} }
}
if (!m_IsSilent) if (!m_IsSilent)
{ {
// get remote peer address // get remote peer address

View File

@ -31,7 +31,8 @@ namespace client
const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds
const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 64; const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 50;
const size_t SAM_SESSION_MAX_ACCEPT_INTERVAL = 3; // in seconds
const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE[] = "HELLO VERSION";
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
@ -191,7 +192,7 @@ namespace client
std::string Name; std::string Name;
SAMSessionType Type; SAMSessionType Type;
std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint; // TODO: move std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint; // TODO: move
std::list<std::shared_ptr<SAMSocket> > acceptQueue; std::list<std::pair<std::shared_ptr<SAMSocket>, uint64_t> > acceptQueue; // socket, receive time in seconds
SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type); SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type);
virtual ~SAMSession () {}; virtual ~SAMSession () {};