diff --git a/libi2pd/Queue.h b/libi2pd/Queue.h index 441f8c3a..daca14c2 100644 --- a/libi2pd/Queue.h +++ b/libi2pd/Queue.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2020, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -107,8 +107,21 @@ namespace util return GetNonThreadSafe (true); } - private: + void GetWholeQueue (std::queue& queue) + { + if (!queue.empty ()) + { + std::queue newQueue; + queue.swap (newQueue); + } + { + std::unique_lock l(m_QueueMutex); + m_Queue.swap (queue); + } + } + private: + Element GetNonThreadSafe (bool peek = false) { if (!m_Queue.empty ()) diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index 1b63b7a7..1aa8528e 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -483,14 +483,17 @@ namespace tunnel { try { - auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec - if (msg) + std::queue > msgs; + if (m_Queue.Wait (1,0)) // 1 sec { + m_Queue.GetWholeQueue (msgs); int numMsgs = 0; uint32_t prevTunnelID = 0, tunnelID = 0; std::shared_ptr prevTunnel; - do + while (!msgs.empty ()) { + auto msg = msgs.front (); msgs.pop (); + if (!msg) continue; std::shared_ptr tunnel; uint8_t typeID = msg->GetTypeID (); switch (typeID) @@ -530,17 +533,18 @@ namespace tunnel LogPrint (eLogWarning, "Tunnel: Unexpected message type ", (int) typeID); } - msg = (numMsgs <= MAX_TUNNEL_MSGS_BATCH_SIZE) ? m_Queue.Get () : nullptr; - if (msg) - { - prevTunnelID = tunnelID; - prevTunnel = tunnel; - numMsgs++; - } - else if (tunnel) - tunnel->FlushTunnelDataMsgs (); + prevTunnelID = tunnelID; + prevTunnel = tunnel; + numMsgs++; + + if (msgs.empty ()) + { + if (numMsgs < MAX_TUNNEL_MSGS_BATCH_SIZE && !m_Queue.IsEmpty ()) + m_Queue.GetWholeQueue (msgs); // try more + else if (tunnel) + tunnel->FlushTunnelDataMsgs (); // otherwise flush last + } } - while (msg); } if (i2p::transport::transports.IsOnline())