diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 96658db5..f971e250 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -158,8 +158,7 @@ namespace transport m_ConnectedRecently.clear (); m_RequestedPeerTests.clear (); - for (auto it: m_ReceivedPacketsQueue) - m_PacketsArrayPool.ReleaseMt (it); + m_PacketsPool.ReleaseMt (m_ReceivedPacketsQueue); m_ReceivedPacketsQueue.clear (); } @@ -368,28 +367,23 @@ namespace transport return; } packet->len = bytes_transferred; + InsertToReceivedPacketsQueue (packet); + size_t numPackets = 1; boost::system::error_code ec; size_t moreBytes = socket.available (ec); if (!ec && moreBytes) { - auto packets = m_PacketsArrayPool.AcquireMt (); - packets->AddPacket (packet); - while (moreBytes && packets->numPackets < SSU2_MAX_NUM_PACKETS_PER_BATCH) - { + do + { packet = m_PacketsPool.AcquireMt (); packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); if (!ec) { i2p::transport::transports.UpdateReceivedBytes (packet->len); + numPackets++; if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE) - { - if (!packets->AddPacket (packet)) - { - LogPrint (eLogError, "SSU2: Received packets array is full"); - m_PacketsPool.ReleaseMt (packet); - } - } + InsertToReceivedPacketsQueue (packet); else // drop too short packets m_PacketsPool.ReleaseMt (packet); moreBytes = socket.available(ec); @@ -402,25 +396,8 @@ namespace transport break; } } - InsertToReceivedPacketsQueue (packets); + while (moreBytes && numPackets < SSU2_MAX_NUM_PACKETS_PER_BATCH); } - else - { - bool added = false; - { - // try to add single packet to existing packets array in queue - std::lock_guard l(m_ReceivedPacketsQueueMutex); - if (!m_ReceivedPacketsQueue.empty ()) - added = m_ReceivedPacketsQueue.back ()->AddPacket (packet); - } - if (!added) - { - // create new packets array for single packet - auto packets = m_PacketsArrayPool.AcquireMt (); - packets->AddPacket (packet); - InsertToReceivedPacketsQueue (packets); - } - } Receive (socket); } else @@ -447,47 +424,39 @@ namespace transport } } - void SSU2Server::HandleReceivedPackets (Packets * packets) + void SSU2Server::HandleReceivedPackets (std::list&& packets) { - if (!packets) return; + if (packets.empty ()) return; if (m_IsThroughProxy) - for (size_t i = 0; i < packets->numPackets; i++) - { - auto& packet = (*packets)[i]; - ProcessNextPacketFromProxy (packet->buf, packet->len); - } + for (auto it: packets) + ProcessNextPacketFromProxy (it->buf, it->len); else - for (size_t i = 0; i < packets->numPackets; i++) - { - auto& packet = (*packets)[i]; - ProcessNextPacket (packet->buf, packet->len, packet->from); - } - m_PacketsPool.ReleaseMt (packets->data (), packets->numPackets); - m_PacketsArrayPool.ReleaseMt (packets); + for (auto it: packets) + ProcessNextPacket (it->buf, it->len, it->from); + m_PacketsPool.ReleaseMt (packets); if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated) m_LastSession->FlushData (); } - void SSU2Server::InsertToReceivedPacketsQueue (Packets * packets) + void SSU2Server::InsertToReceivedPacketsQueue (Packet * packet) { - if (!packets) return; + if (!packet) return; bool empty = false; { std::lock_guard l(m_ReceivedPacketsQueueMutex); empty = m_ReceivedPacketsQueue.empty (); - m_ReceivedPacketsQueue.push_back (packets); + m_ReceivedPacketsQueue.push_back (packet); } if (empty) { GetService ().post([this]() { - std::list receivedPackets; + std::list receivedPackets; { std::lock_guard l(m_ReceivedPacketsQueueMutex); m_ReceivedPacketsQueue.swap (receivedPackets); } - for (auto it: receivedPackets) - HandleReceivedPackets (it); + HandleReceivedPackets (std::move (receivedPackets)); }); } } @@ -1167,7 +1136,6 @@ namespace transport } m_PacketsPool.CleanUpMt (); - m_PacketsArrayPool.CleanUpMt (); m_SentPacketsPool.CleanUp (); m_IncompleteMessagesPool.CleanUp (); m_FragmentsPool.CleanUp (); diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 494c11c6..7a02730b 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -43,7 +43,7 @@ namespace transport const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds const int SSU2_HOLE_PUNCH_EXPIRATION = 150; // in seconds - const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 32; + const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 64; class SSU2Server: private i2p::util::RunnableServiceWithWork { @@ -53,20 +53,6 @@ namespace transport size_t len; boost::asio::ip::udp::endpoint from; }; - - struct Packets: public std::array - { - size_t numPackets = 0; - bool AddPacket (Packet *p) - { - if (p && numPackets < size ()) - { - data()[numPackets] = p; numPackets++; - return true; - } - return false; - } - }; class ReceiveService: public i2p::util::RunnableService { @@ -146,9 +132,9 @@ namespace transport void Receive (boost::asio::ip::udp::socket& socket); void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); - void HandleReceivedPackets (Packets * packets); + void HandleReceivedPackets (std::list&& packets); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); - void InsertToReceivedPacketsQueue (Packets * packets); + void InsertToReceivedPacketsQueue (Packet * packet); void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); @@ -193,7 +179,6 @@ namespace transport std::unordered_map, uint64_t > > m_PeerTests; // nonce->(Alice, timestamp). We are Bob std::list > m_Introducers, m_IntroducersV6; // introducers we are connected to i2p::util::MemoryPoolMt m_PacketsPool; - i2p::util::MemoryPoolMt m_PacketsArrayPool; i2p::util::MemoryPool m_SentPacketsPool; i2p::util::MemoryPool m_IncompleteMessagesPool; i2p::util::MemoryPool m_FragmentsPool; @@ -207,7 +192,7 @@ namespace transport std::mt19937 m_Rng; std::map m_ConnectedRecently; // endpoint -> last activity time in seconds std::unordered_map, uint64_t > > m_RequestedPeerTests; // nonce->(Alice, timestamp) - std::list m_ReceivedPacketsQueue; + std::list m_ReceivedPacketsQueue; mutable std::mutex m_ReceivedPacketsQueueMutex; // proxy