From 5891b1ceb2a7755b56006118b9b7c26c8f722b4e Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 5 Apr 2022 16:14:13 -0400 Subject: [PATCH] separate receive thread --- libi2pd/SSU2.cpp | 47 +++++++++++++++++++++++++++++++++++++++-------- libi2pd/SSU2.h | 14 +++++++++++++- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 38431f41..b960a366 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -1109,7 +1109,8 @@ namespace transport } SSU2Server::SSU2Server (): - RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), + RunnableServiceWithWork ("SSU2"), m_ReceiveServiceV4 ("SSU2v4"), m_ReceiveServiceV6 ("SSU2v6"), + m_SocketV4 (m_ReceiveServiceV4.GetService ()), m_SocketV6 (m_ReceiveServiceV6.GetService ()), m_TerminationTimer (GetService ()), m_ResendTimer (GetService ()) { } @@ -1139,9 +1140,25 @@ namespace transport if (port) { if (address->IsV4 ()) - Receive (OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v4(), port))); + { + m_ReceiveServiceV4.Start (); + OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v4(), port)); + m_ReceiveServiceV4.GetService ().post( + [this]() + { + Receive (m_SocketV4); + }); + } if (address->IsV6 ()) - Receive (OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v6(), port))); + { + m_ReceiveServiceV6.Start (); + OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v6(), port)); + m_ReceiveServiceV6.GetService ().post( + [this]() + { + Receive (m_SocketV6); + }); + } } else LogPrint (eLogError, "SSU2: Can't start server because port not specified"); @@ -1153,6 +1170,12 @@ namespace transport void SSU2Server::Stop () { + if (context.SupportsV4 ()) + m_ReceiveServiceV4.Stop (); + + if (context.SupportsV6 ()) + m_ReceiveServiceV6.Stop (); + if (IsRunning ()) m_TerminationTimer.cancel (); @@ -1161,7 +1184,7 @@ namespace transport boost::asio::ip::udp::socket& SSU2Server::OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint) { - boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_Socket; + boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_SocketV4; try { socket.open (localEndpoint.protocol ()); @@ -1194,8 +1217,7 @@ namespace transport { i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); packet->len = bytes_transferred; - ProcessNextPacket (packet->buf, packet->len, packet->from); - m_PacketsPool.ReleaseMt (packet); + GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet)); Receive (socket); } else @@ -1211,6 +1233,15 @@ namespace transport } } } + + void SSU2Server::HandleReceivedPacket (Packet * packet) + { + if (packet) + { + ProcessNextPacket (packet->buf, packet->len, packet->from); + m_PacketsPool.ReleaseMt (packet); + } + } void SSU2Server::AddSession (std::shared_ptr session) { @@ -1275,7 +1306,7 @@ namespace transport if (to.address ().is_v6 ()) m_SocketV6.send_to (bufs, to, 0, ec); else - m_Socket.send_to (bufs, to, 0, ec); + m_SocketV4.send_to (bufs, to, 0, ec); i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); } @@ -1292,7 +1323,7 @@ namespace transport if (to.address ().is_v6 ()) m_SocketV6.send_to (bufs, to, 0, ec); else - m_Socket.send_to (bufs, to, 0, ec); + m_SocketV4.send_to (bufs, to, 0, ec); i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen); } diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index abf6a18c..5198a1c5 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -209,6 +209,16 @@ namespace transport size_t len; boost::asio::ip::udp::endpoint from; }; + + class ReceiveService: public i2p::util::RunnableService + { + public: + + ReceiveService (const std::string& name): RunnableService (name) {}; + boost::asio::io_service& GetService () { return GetIOService (); }; + void Start () { StartIOService (); }; + void Stop () { StopIOService (); }; + }; public: @@ -241,6 +251,7 @@ namespace transport void Receive (boost::asio::ip::udp::socket& socket); void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); + void HandleReceivedPacket (Packet * packet); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ScheduleTermination (); @@ -251,7 +262,8 @@ namespace transport private: - boost::asio::ip::udp::socket m_Socket, m_SocketV6; + ReceiveService m_ReceiveServiceV4, m_ReceiveServiceV6; + boost::asio::ip::udp::socket m_SocketV4, m_SocketV6; std::unordered_map > m_Sessions; std::map > m_PendingOutgoingSessions; std::map > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)