From ec1f41b13cef0cef17936bbd6210b9c885d88b91 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 15 Oct 2024 15:05:18 -0400 Subject: [PATCH] insert multiple packets to the queue using splice --- libi2pd/SSU2.cpp | 48 +++++++++++++++++++++++++++++++----------------- libi2pd/SSU2.h | 2 ++ 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index f971e250..574aea55 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -367,23 +367,22 @@ namespace transport return; } packet->len = bytes_transferred; - InsertToReceivedPacketsQueue (packet); - - size_t numPackets = 1; + boost::system::error_code ec; size_t moreBytes = socket.available (ec); if (!ec && moreBytes) { - do + std::list packets; + packets.push_back (packet); + while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH) { packet = m_PacketsPool.AcquireMt (); packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); if (!ec) { i2p::transport::transports.UpdateReceivedBytes (packet->len); - numPackets++; if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE) - InsertToReceivedPacketsQueue (packet); + packets.push_back (packet); else // drop too short packets m_PacketsPool.ReleaseMt (packet); moreBytes = socket.available(ec); @@ -396,8 +395,10 @@ namespace transport break; } } - while (moreBytes && numPackets < SSU2_MAX_NUM_PACKETS_PER_BATCH); + InsertToReceivedPacketsQueue (packets); } + else + InsertToReceivedPacketsQueue (packet); Receive (socket); } else @@ -448,17 +449,30 @@ namespace transport m_ReceivedPacketsQueue.push_back (packet); } if (empty) + GetService ().post([this]() { HandleReceivedPacketsQueue (); }); + } + + void SSU2Server::InsertToReceivedPacketsQueue (std::list& packets) + { + if (packets.empty ()) return; + bool empty = false; { - GetService ().post([this]() - { - std::list receivedPackets; - { - std::lock_guard l(m_ReceivedPacketsQueueMutex); - m_ReceivedPacketsQueue.swap (receivedPackets); - } - HandleReceivedPackets (std::move (receivedPackets)); - }); - } + std::lock_guard l(m_ReceivedPacketsQueueMutex); + empty = m_ReceivedPacketsQueue.empty (); + m_ReceivedPacketsQueue.splice (m_ReceivedPacketsQueue.end (), packets); + } + if (empty) + GetService ().post([this]() { HandleReceivedPacketsQueue (); }); + } + + void SSU2Server::HandleReceivedPacketsQueue () + { + std::list receivedPackets; + { + std::lock_guard l(m_ReceivedPacketsQueueMutex); + m_ReceivedPacketsQueue.swap (receivedPackets); + } + HandleReceivedPackets (std::move (receivedPackets)); } void SSU2Server::AddSession (std::shared_ptr session) diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 7a02730b..426c6a10 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -135,6 +135,8 @@ namespace transport void HandleReceivedPackets (std::list&& packets); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void InsertToReceivedPacketsQueue (Packet * packet); + void InsertToReceivedPacketsQueue (std::list& packets); + void HandleReceivedPacketsQueue (); void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode);