From 48f7131a7dc6d2f6b0eac98fba4d4d45eba52bd0 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 14 Oct 2024 18:55:41 -0400 Subject: [PATCH] received packets queue --- libi2pd/SSU2.cpp | 61 +++++++++++++++++++++++++++++++++++------------- libi2pd/SSU2.h | 9 ++++--- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 83d23dd2..96658db5 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -157,6 +157,10 @@ namespace transport m_IntroducersV6.clear (); m_ConnectedRecently.clear (); m_RequestedPeerTests.clear (); + + for (auto it: m_ReceivedPacketsQueue) + m_PacketsArrayPool.ReleaseMt (it); + m_ReceivedPacketsQueue.clear (); } void SSU2Server::SetLocalAddress (const boost::asio::ip::address& localAddress) @@ -398,10 +402,25 @@ namespace transport break; } } - GetService ().post (std::bind (&SSU2Server::HandleReceivedPackets, this, packets)); + InsertToReceivedPacketsQueue (packets); } else - GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet)); + { + bool added = false; + { + // try to add single packet to existing packets array in queue + std::lock_guard l(m_ReceivedPacketsQueueMutex); + if (!m_ReceivedPacketsQueue.empty ()) + added = m_ReceivedPacketsQueue.back ()->AddPacket (packet); + } + if (!added) + { + // create new packets array for single packet + auto packets = m_PacketsArrayPool.AcquireMt (); + packets->AddPacket (packet); + InsertToReceivedPacketsQueue (packets); + } + } Receive (socket); } else @@ -428,20 +447,6 @@ namespace transport } } - void SSU2Server::HandleReceivedPacket (Packet * packet) - { - if (packet) - { - if (m_IsThroughProxy) - ProcessNextPacketFromProxy (packet->buf, packet->len); - else - ProcessNextPacket (packet->buf, packet->len, packet->from); - m_PacketsPool.ReleaseMt (packet); - if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated) - m_LastSession->FlushData (); - } - } - void SSU2Server::HandleReceivedPackets (Packets * packets) { if (!packets) return; @@ -463,6 +468,30 @@ namespace transport m_LastSession->FlushData (); } + void SSU2Server::InsertToReceivedPacketsQueue (Packets * packets) + { + if (!packets) return; + bool empty = false; + { + std::lock_guard l(m_ReceivedPacketsQueueMutex); + empty = m_ReceivedPacketsQueue.empty (); + m_ReceivedPacketsQueue.push_back (packets); + } + if (empty) + { + GetService ().post([this]() + { + std::list receivedPackets; + { + std::lock_guard l(m_ReceivedPacketsQueueMutex); + m_ReceivedPacketsQueue.swap (receivedPackets); + } + for (auto it: receivedPackets) + HandleReceivedPackets (it); + }); + } + } + void SSU2Server::AddSession (std::shared_ptr session) { if (session) diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 09f764c6..494c11c6 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -64,7 +65,7 @@ namespace transport return true; } return false; - } + } }; class ReceiveService: public i2p::util::RunnableService @@ -145,10 +146,10 @@ namespace transport void Receive (boost::asio::ip::udp::socket& socket); void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); - void HandleReceivedPacket (Packet * packet); void HandleReceivedPackets (Packets * packets); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); - + void InsertToReceivedPacketsQueue (Packets * packets); + void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); @@ -206,6 +207,8 @@ namespace transport std::mt19937 m_Rng; std::map m_ConnectedRecently; // endpoint -> last activity time in seconds std::unordered_map, uint64_t > > m_RequestedPeerTests; // nonce->(Alice, timestamp) + std::list m_ReceivedPacketsQueue; + mutable std::mutex m_ReceivedPacketsQueueMutex; // proxy bool m_IsThroughProxy;