Merge pull request #742 from majestrate/sam-multiaccept

Multiple stream acceptors with SAM
This commit is contained in:
orignal 2016-12-18 13:16:14 -05:00 committed by GitHub
commit 9fefbb0c4a
3 changed files with 96 additions and 32 deletions

View File

@ -310,7 +310,7 @@ namespace datagram
std::vector<i2p::tunnel::TunnelMessageBlock> send; std::vector<i2p::tunnel::TunnelMessageBlock> send;
auto routingPath = GetSharedRoutingPath(); auto routingPath = GetSharedRoutingPath();
// if we don't have a routing path we will drop all queued messages // if we don't have a routing path we will drop all queued messages
if(routingPath) if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease)
{ {
for (const auto & msg : m_SendQueue) for (const auto & msg : m_SendQueue)
{ {

105
SAM.cpp
View File

@ -54,11 +54,7 @@ namespace client
case eSAMSocketTypeAcceptor: case eSAMSocketTypeAcceptor:
{ {
if (m_Session) if (m_Session)
{
m_Session->DelSocket (shared_from_this ()); m_Session->DelSocket (shared_from_this ());
if (m_Session->localDestination)
m_Session->localDestination->StopAcceptingStreams ();
}
break; break;
} }
default: default:
@ -289,6 +285,11 @@ namespace client
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
} }
else
{
// start accepting streams because we're not a datagram session
m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1));
}
if (m_Session->localDestination->IsReady ()) if (m_Session->localDestination->IsReady ())
SendSessionCreateReplyOk (); SendSessionCreateReplyOk ();
@ -401,20 +402,25 @@ namespace client
m_Session = m_Owner.FindSession (id); m_Session = m_Owner.FindSession (id);
if (m_Session) if (m_Session)
{ {
if (!m_Session->localDestination->IsAcceptingStreams ()) m_SocketType = eSAMSocketTypeAcceptor;
{ m_Session->AddSocket (shared_from_this ());
m_SocketType = eSAMSocketTypeAcceptor; SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
m_Session->AddSocket (shared_from_this ());
m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
else
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
} }
else else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
} }
void SAMSocket::Accept(std::shared_ptr<i2p::stream::Stream> stream)
{
if(stream) {
m_SocketType = eSAMSocketTypeStream;
HandleI2PAccept(stream);
} else {
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
auto s = shared_from_this ();
m_Owner.GetService ().post ([s] { s->Terminate (); });
}
}
size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)
{ {
LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len);
@ -659,10 +665,6 @@ namespace client
LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID);
m_Stream = stream; m_Stream = stream;
context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ());
auto session = m_Owner.FindSession (m_ID);
if (session)
session->localDestination->StopAcceptingStreams ();
m_SocketType = eSAMSocketTypeStream;
if (!m_IsSilent) if (!m_IsSilent)
{ {
// get remote peer address // get remote peer address
@ -704,26 +706,76 @@ namespace client
} }
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest): SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
localDestination (dest) localDestination (dest),
m_BacklogPumper(dest->GetService())
{ {
PumpBacklog();
} }
SAMSession::~SAMSession () void SAMSession::AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream)
{ {
CloseStreams(); if(!stream) return; // fail
i2p::client::context.DeleteLocalDestination (localDestination); std::unique_lock<std::mutex> lock(m_SocketsMutex);
if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) {
stream->Close();
return;
}
m_Backlog.push_back(stream);
}
void SAMSession::PumpBacklog()
{
// pump backlog every 100ms
boost::posix_time::milliseconds dlt(100);
m_BacklogPumper.expires_from_now(dlt);
m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1));
}
std::shared_ptr<SAMSocket> SAMSession::FindAcceptor()
{
for (auto & sock : m_Sockets) {
auto t = sock->GetSocketType();
if(t == eSAMSocketTypeAcceptor) {
return sock;
}
}
return nullptr;
}
void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec)
{
if(ec) return;
{
std::unique_lock<std::mutex> lock(m_SocketsMutex);
auto itr = m_Backlog.begin();
while(itr != m_Backlog.end()) {
auto sock = FindAcceptor();
if (sock) {
sock->Accept(*itr);
itr = m_Backlog.erase(itr);
} else {
++itr;
}
}
}
PumpBacklog();
} }
void SAMSession::CloseStreams () void SAMSession::CloseStreams ()
{ {
{ m_BacklogPumper.cancel();
localDestination->GetService().post([&] () {
std::lock_guard<std::mutex> lock(m_SocketsMutex); std::lock_guard<std::mutex> lock(m_SocketsMutex);
for (auto& sock : m_Sockets) { for (auto& sock : m_Sockets) {
sock->CloseStream(); sock->CloseStream();
} }
} for(auto & stream : m_Backlog) {
// XXX: should this be done inside locked parts? stream->Close();
m_Sockets.clear(); }
m_Sockets.clear();
m_Backlog.clear();
i2p::client::context.DeleteLocalDestination (localDestination);
});
} }
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):
@ -834,8 +886,9 @@ namespace client
auto session = std::make_shared<SAMSession>(localDestination); auto session = std::make_shared<SAMSession>(localDestination);
std::unique_lock<std::mutex> l(m_SessionsMutex); std::unique_lock<std::mutex> l(m_SessionsMutex);
auto ret = m_Sessions.insert (std::make_pair(id, session)); auto ret = m_Sessions.insert (std::make_pair(id, session));
if (!ret.second) if (!ret.second) {
LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); LogPrint (eLogWarning, "SAM: Session ", id, " already exists");
}
return ret.first->second; return ret.first->second;
} }
return nullptr; return nullptr;

13
SAM.h
View File

@ -21,6 +21,7 @@ namespace client
const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
const int SAM_MAX_ACCEPT_BACKLOG = 50;
const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE[] = "HELLO VERSION";
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
@ -84,6 +85,8 @@ namespace client
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
SAMSocketType GetSocketType () const { return m_SocketType; }; SAMSocketType GetSocketType () const { return m_SocketType; };
void Accept(std::shared_ptr<i2p::stream::Stream> stream);
private: private:
void Terminate (); void Terminate ();
@ -134,6 +137,8 @@ namespace client
struct SAMSession struct SAMSession
{ {
std::shared_ptr<ClientDestination> localDestination; std::shared_ptr<ClientDestination> localDestination;
boost::asio::deadline_timer m_BacklogPumper;
std::list<std::shared_ptr<i2p::stream::Stream> > m_Backlog;
std::list<std::shared_ptr<SAMSocket> > m_Sockets; std::list<std::shared_ptr<SAMSocket> > m_Sockets;
std::mutex m_SocketsMutex; std::mutex m_SocketsMutex;
@ -160,7 +165,13 @@ namespace client
} }
SAMSession (std::shared_ptr<ClientDestination> dest); SAMSession (std::shared_ptr<ClientDestination> dest);
~SAMSession ();
void AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream);
std::shared_ptr<SAMSocket> FindAcceptor();
void PumpBacklog();
void HandlePumpBacklog(const boost::system::error_code & ec);
void CloseStreams (); void CloseStreams ();
}; };