From 62b811c2c150dd502bde36f9960a8ece112c992c Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 28 Sep 2024 09:49:45 -0400 Subject: [PATCH] use memory pool for SSU2 received packets arrays --- libi2pd/SSU2.cpp | 36 ++++++++++++++++++++++++------------ libi2pd/SSU2.h | 18 +++++++++++++++++- libi2pd/util.h | 12 ++++++++++-- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index d661c33b..ec36d6fb 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -368,9 +368,9 @@ namespace transport size_t moreBytes = socket.available (ec); if (!ec && moreBytes) { - std::vector packets; - packets.push_back (packet); - while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH) + auto packets = m_PacketsArrayPool.AcquireMt (); + packets->AddPacket (packet); + while (moreBytes && packets->numPackets < SSU2_MAX_NUM_PACKETS_PER_BATCH) { packet = m_PacketsPool.AcquireMt (); packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); @@ -378,7 +378,13 @@ namespace transport { i2p::transport::transports.UpdateReceivedBytes (packet->len); if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE) - packets.push_back (packet); + { + if (!packets->AddPacket (packet)) + { + LogPrint (eLogError, "SSU2: Received packets array is full"); + m_PacketsPool.ReleaseMt (packet); + } + } else // drop too short packets m_PacketsPool.ReleaseMt (packet); moreBytes = socket.available(ec); @@ -391,10 +397,7 @@ namespace transport break; } } - GetService ().post ([packets = std::move (packets), this]() mutable - { - HandleReceivedPackets (std::move (packets)); - }); + GetService ().post (std::bind (&SSU2Server::HandleReceivedPackets, this, packets)); } else GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet)); @@ -438,15 +441,23 @@ namespace transport } } - void SSU2Server::HandleReceivedPackets (std::vector&& packets) + void SSU2Server::HandleReceivedPackets (Packets * packets) { + if (!packets) return; if (m_IsThroughProxy) - for (auto& packet: packets) + for (size_t i = 0; i < packets->numPackets; i++) + { + auto& packet = (*packets)[i]; ProcessNextPacketFromProxy (packet->buf, packet->len); + } else - for (auto& packet: packets) + for (size_t i = 0; i < packets->numPackets; i++) + { + auto& packet = (*packets)[i]; ProcessNextPacket (packet->buf, packet->len, packet->from); - m_PacketsPool.ReleaseMt (std::move (packets)); + } + m_PacketsPool.ReleaseMt (packets->data (), packets->numPackets); + m_PacketsArrayPool.ReleaseMt (packets); if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated) m_LastSession->FlushData (); } @@ -1098,6 +1109,7 @@ namespace transport } m_PacketsPool.CleanUpMt (); + m_PacketsArrayPool.CleanUpMt (); m_SentPacketsPool.CleanUp (); m_IncompleteMessagesPool.CleanUp (); m_FragmentsPool.CleanUp (); diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 36b946c5..97ef879f 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include "util.h" @@ -51,6 +52,20 @@ namespace transport boost::asio::ip::udp::endpoint from; }; + struct Packets: public std::array + { + size_t numPackets = 0; + bool AddPacket (Packet *p) + { + if (p && numPackets < size ()) + { + data()[numPackets] = p; numPackets++; + return true; + } + return false; + } + }; + class ReceiveService: public i2p::util::RunnableService { public: @@ -127,7 +142,7 @@ namespace transport void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); void HandleReceivedPacket (Packet * packet); - void HandleReceivedPackets (std::vector&& packets); + void HandleReceivedPackets (Packets * packets); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ScheduleTermination (); @@ -172,6 +187,7 @@ namespace transport std::unordered_map > m_Relays; // we are introducer, relay tag -> session std::list > m_Introducers, m_IntroducersV6; // introducers we are connected to i2p::util::MemoryPoolMt m_PacketsPool; + i2p::util::MemoryPoolMt m_PacketsArrayPool; i2p::util::MemoryPool m_SentPacketsPool; i2p::util::MemoryPool m_IncompleteMessagesPool; i2p::util::MemoryPool m_FragmentsPool; diff --git a/libi2pd/util.h b/libi2pd/util.h index dbc69c25..e39a9259 100644 --- a/libi2pd/util.h +++ b/libi2pd/util.h @@ -131,14 +131,22 @@ namespace util this->Release (t); } + void ReleaseMt (T * * arr, size_t num) + { + if (!arr || !num) return; + std::lock_guard l(m_Mutex); + for (size_t i = 0; i < num; i++) + this->Release (arr[i]); + } + templateclass C, typename... R> - void ReleaseMt(C&& c) + void ReleaseMt(const C& c) { std::lock_guard l(m_Mutex); for (auto& it: c) this->Release (it); } - + template std::shared_ptr AcquireSharedMt (TArgs&&... args) {