mirror of
https://github.com/PurpleI2P/i2pd
synced 2024-11-10 08:00:38 +03:00
separate receivers thread
This commit is contained in:
parent
7dd159add2
commit
f9731a76ec
79
SSU.cpp
79
SSU.cpp
@ -9,10 +9,11 @@ namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
SSUServer::SSUServer (int port): m_Thread (nullptr), m_ThreadV6 (nullptr), m_Work (m_Service),
|
||||
m_WorkV6 (m_ServiceV6),m_Endpoint (boost::asio::ip::udp::v4 (), port),
|
||||
m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Socket (m_Service, m_Endpoint),
|
||||
m_SocketV6 (m_ServiceV6), m_IntroducersUpdateTimer (m_Service)
|
||||
SSUServer::SSUServer (int port): m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr),
|
||||
m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService),
|
||||
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
|
||||
m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService),
|
||||
m_IntroducersUpdateTimer (m_Service)
|
||||
{
|
||||
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535));
|
||||
m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535));
|
||||
@ -33,12 +34,13 @@ namespace transport
|
||||
void SSUServer::Start ()
|
||||
{
|
||||
m_IsRunning = true;
|
||||
m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this));
|
||||
m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
|
||||
m_Service.post (std::bind (&SSUServer::Receive, this));
|
||||
m_ReceiversService.post (std::bind (&SSUServer::Receive, this));
|
||||
if (context.SupportsV6 ())
|
||||
{
|
||||
m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this));
|
||||
m_ServiceV6.post (std::bind (&SSUServer::ReceiveV6, this));
|
||||
m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this));
|
||||
}
|
||||
if (i2p::context.IsUnreachable ())
|
||||
ScheduleIntroducersUpdateTimer ();
|
||||
@ -52,6 +54,12 @@ namespace transport
|
||||
m_Socket.close ();
|
||||
m_ServiceV6.stop ();
|
||||
m_SocketV6.close ();
|
||||
if (m_ReceiversThread)
|
||||
{
|
||||
m_ReceiversThread->join ();
|
||||
delete m_ReceiversThread;
|
||||
m_ReceiversThread = nullptr;
|
||||
}
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
@ -95,6 +103,21 @@ namespace transport
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::RunReceivers ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_ReceiversService.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "SSU receivers: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay)
|
||||
{
|
||||
@ -119,55 +142,66 @@ namespace transport
|
||||
|
||||
void SSUServer::Receive ()
|
||||
{
|
||||
m_Socket.async_receive_from (boost::asio::buffer (m_ReceiveBuffer, SSU_MTU_V4), m_SenderEndpoint,
|
||||
boost::bind (&SSUServer::HandleReceivedFrom, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
|
||||
SSUPacket * packet = new SSUPacket ();
|
||||
m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from,
|
||||
std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet));
|
||||
}
|
||||
|
||||
void SSUServer::ReceiveV6 ()
|
||||
{
|
||||
m_SocketV6.async_receive_from (boost::asio::buffer (m_ReceiveBufferV6, SSU_MTU_V6), m_SenderEndpointV6,
|
||||
boost::bind (&SSUServer::HandleReceivedFromV6, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
|
||||
SSUPacket * packet = new SSUPacket ();
|
||||
m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from,
|
||||
std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet));
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
|
||||
{
|
||||
if (!ecode)
|
||||
{
|
||||
HandleReceivedBuffer (m_SenderEndpoint, m_ReceiveBuffer, bytes_transferred);
|
||||
packet->len = bytes_transferred;
|
||||
m_Service.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet));
|
||||
Receive ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint ("SSU receive error: ", ecode.message ());
|
||||
delete packet;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
|
||||
{
|
||||
if (!ecode)
|
||||
{
|
||||
HandleReceivedBuffer (m_SenderEndpointV6, m_ReceiveBufferV6, bytes_transferred);
|
||||
packet->len = bytes_transferred;
|
||||
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet));
|
||||
ReceiveV6 ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint ("SSU V6 receive error: ", ecode.message ());
|
||||
delete packet;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred)
|
||||
void SSUServer::HandleReceivedBuffer (SSUPacket * packet)
|
||||
{
|
||||
std::shared_ptr<SSUSession> session;
|
||||
auto it = m_Sessions.find (from);
|
||||
auto it = m_Sessions.find (packet->from);
|
||||
if (it != m_Sessions.end ())
|
||||
session = it->second;
|
||||
if (!session)
|
||||
{
|
||||
session = std::make_shared<SSUSession> (*this, from);
|
||||
session = std::make_shared<SSUSession> (*this, packet->from);
|
||||
session->WaitForConnect ();
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions[from] = session;
|
||||
m_Sessions[packet->from] = session;
|
||||
}
|
||||
LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created");
|
||||
LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
|
||||
}
|
||||
session->ProcessNextMessage (buf, bytes_transferred, from);
|
||||
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
|
||||
delete packet;
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const
|
||||
@ -256,7 +290,10 @@ namespace transport
|
||||
"] through introducer ", introducer->iHost, ":", introducer->iPort);
|
||||
session->WaitForIntroduction ();
|
||||
if (i2p::context.GetRouterInfo ().UsesIntroducer ()) // if we are unreachable
|
||||
Send (m_ReceiveBuffer, 0, remoteEndpoint); // send HolePunch
|
||||
{
|
||||
uint8_t buf[1];
|
||||
Send (buf, 0, remoteEndpoint); // send HolePunch
|
||||
}
|
||||
introducerSession->Introduce (introducer->iTag, introducer->iKey);
|
||||
}
|
||||
else
|
||||
|
23
SSU.h
23
SSU.h
@ -23,6 +23,13 @@ namespace transport
|
||||
const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds
|
||||
const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
|
||||
const size_t SSU_MAX_NUM_INTRODUCERS = 3;
|
||||
|
||||
struct SSUPacket
|
||||
{
|
||||
i2p::crypto::AESAlignedBuffer<1500> buf;
|
||||
boost::asio::ip::udp::endpoint from;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
class SSUServer
|
||||
{
|
||||
@ -50,11 +57,12 @@ namespace transport
|
||||
|
||||
void Run ();
|
||||
void RunV6 ();
|
||||
void RunReceivers ();
|
||||
void Receive ();
|
||||
void ReceiveV6 ();
|
||||
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred);
|
||||
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
|
||||
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
|
||||
void HandleReceivedBuffer (SSUPacket * packet);
|
||||
|
||||
template<typename Filter>
|
||||
std::shared_ptr<SSUSession> GetRandomSession (Filter filter);
|
||||
@ -66,16 +74,13 @@ namespace transport
|
||||
private:
|
||||
|
||||
bool m_IsRunning;
|
||||
std::thread * m_Thread, * m_ThreadV6;
|
||||
boost::asio::io_service m_Service, m_ServiceV6;
|
||||
boost::asio::io_service::work m_Work, m_WorkV6;
|
||||
std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread;
|
||||
boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService;
|
||||
boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork;
|
||||
boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6;
|
||||
boost::asio::ip::udp::socket m_Socket, m_SocketV6;
|
||||
boost::asio::ip::udp::endpoint m_SenderEndpoint, m_SenderEndpointV6;
|
||||
boost::asio::deadline_timer m_IntroducersUpdateTimer;
|
||||
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
|
||||
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer;
|
||||
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6;
|
||||
std::mutex m_SessionsMutex;
|
||||
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions;
|
||||
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer
|
||||
|
Loading…
Reference in New Issue
Block a user