diff --git a/libi2pd/I2NPProtocol.cpp b/libi2pd/I2NPProtocol.cpp index 4cceda8f..3b2c204e 100644 --- a/libi2pd/I2NPProtocol.cpp +++ b/libi2pd/I2NPProtocol.cpp @@ -932,14 +932,8 @@ namespace i2p void I2NPMessagesHandler::Flush () { if (!m_TunnelMsgs.empty ()) - { i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs); - m_TunnelMsgs.clear (); - } if (!m_TunnelGatewayMsgs.empty ()) - { i2p::tunnel::tunnels.PostTunnelData (m_TunnelGatewayMsgs); - m_TunnelGatewayMsgs.clear (); - } } } diff --git a/libi2pd/I2NPProtocol.h b/libi2pd/I2NPProtocol.h index 4e26fc94..5971ce17 100644 --- a/libi2pd/I2NPProtocol.h +++ b/libi2pd/I2NPProtocol.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "Crypto.h" #include "I2PEndian.h" @@ -328,7 +329,7 @@ namespace tunnel private: - std::vector > m_TunnelMsgs, m_TunnelGatewayMsgs; + std::list > m_TunnelMsgs, m_TunnelGatewayMsgs; }; } diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 8abcb4d2..20b2fd42 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -122,7 +122,7 @@ namespace data uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup; int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0; - std::queue > msgs; + std::list > msgs; while (m_IsRunning) { try @@ -132,7 +132,7 @@ namespace data m_Queue.GetWholeQueue (msgs); while (!msgs.empty ()) { - auto msg = msgs.front (); msgs.pop (); + auto msg = msgs.front (); msgs.pop_front (); if (!msg) continue; LogPrint(eLogDebug, "NetDb: Got request with type ", (int) msg->GetTypeID ()); switch (msg->GetTypeID ()) diff --git a/libi2pd/Queue.h b/libi2pd/Queue.h index daca14c2..ec62bddf 100644 --- a/libi2pd/Queue.h +++ b/libi2pd/Queue.h @@ -9,8 +9,7 @@ #ifndef QUEUE_H__ #define QUEUE_H__ -#include -#include +#include #include #include #include @@ -29,22 +28,20 @@ namespace util void Put (Element e) { std::unique_lock l(m_QueueMutex); - m_Queue.push (std::move(e)); + m_Queue.push_back (std::move(e)); m_NonEmpty.notify_one (); } - templateclass Container, typename... R> - void Put (const Container& vec) + void Put (std::list& list) { - if (!vec.empty ()) + if (!list.empty ()) { std::unique_lock l(m_QueueMutex); - for (const auto& it: vec) - m_Queue.push (std::move(it)); + m_Queue.splice (m_Queue.end (), list); m_NonEmpty.notify_one (); - } - } - + } + } + Element GetNext () { std::unique_lock l(m_QueueMutex); @@ -107,11 +104,11 @@ namespace util return GetNonThreadSafe (true); } - void GetWholeQueue (std::queue& queue) + void GetWholeQueue (std::list& queue) { if (!queue.empty ()) { - std::queue newQueue; + std::list newQueue; queue.swap (newQueue); } { @@ -128,7 +125,7 @@ namespace util { auto el = m_Queue.front (); if (!peek) - m_Queue.pop (); + m_Queue.pop_front (); return el; } return nullptr; @@ -136,7 +133,7 @@ namespace util private: - std::queue m_Queue; + std::list m_Queue; std::mutex m_QueueMutex; std::condition_variable m_NonEmpty; }; diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index 37d5a9b7..ca110ccc 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -479,7 +479,7 @@ namespace tunnel std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready uint64_t lastTs = 0, lastPoolsTs = 0, lastMemoryPoolTs = 0; - std::queue > msgs; + std::list > msgs; while (m_IsRunning) { try @@ -492,7 +492,7 @@ namespace tunnel std::shared_ptr prevTunnel; while (!msgs.empty ()) { - auto msg = msgs.front (); msgs.pop (); + auto msg = msgs.front (); msgs.pop_front (); if (!msg) continue; std::shared_ptr tunnel; uint8_t typeID = msg->GetTypeID (); @@ -830,7 +830,7 @@ namespace tunnel if (msg) m_Queue.Put (msg); } - void Tunnels::PostTunnelData (const std::vector >& msgs) + void Tunnels::PostTunnelData (std::list >& msgs) { m_Queue.Put (msgs); } diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 6b014af2..00a05386 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -229,7 +229,7 @@ namespace tunnel std::shared_ptr CreateInboundTunnel (std::shared_ptr config, std::shared_ptr pool, std::shared_ptr outboundTunnel); std::shared_ptr CreateOutboundTunnel (std::shared_ptr config, std::shared_ptr pool); void PostTunnelData (std::shared_ptr msg); - void PostTunnelData (const std::vector >& msgs); + void PostTunnelData (std::list >& msgs); // and cleanup msgs void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); std::shared_ptr CreateTunnelPool (int numInboundHops,