From 23e66671c2fcd1fc11abd11f07ec18f93a346aab Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 20:36:50 -0400 Subject: [PATCH] intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages --- libi2pd/NTCP2.cpp | 40 ++++++++++++++++++++++++++++---------- libi2pd/NTCP2.h | 9 ++++++--- libi2pd/NetDb.cpp | 2 +- libi2pd/SSU2Session.cpp | 25 +++++++++++++++++++----- libi2pd/SSU2Session.h | 6 ++++-- libi2pd/TransportSession.h | 8 ++++++-- libi2pd/Transports.cpp | 36 ++++++++++++++++++++++++---------- libi2pd/Transports.h | 7 ++++--- libi2pd/TunnelGateway.cpp | 2 +- 9 files changed, 98 insertions(+), 37 deletions(-) diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index a05469f1..747dc0b0 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -375,6 +375,8 @@ namespace transport m_Socket.close (); transports.PeerDisconnected (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ()); + if (!m_IntermediateQueue.empty ()) + m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -1207,7 +1209,7 @@ namespace transport void NTCP2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::vector > msgs; + std::list > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -1216,7 +1218,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->PostI2NPMessages (msgs); + other->SendI2NPMessages (msgs); } size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) @@ -1297,20 +1299,38 @@ namespace transport m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go } - void NTCP2Session::SendI2NPMessages (const std::vector >& msgs) + void NTCP2Session::SendI2NPMessages (std::list >& msgs) { - m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs)); + if (m_IsTerminated || msgs.empty ()) return; + bool empty = false; + { + std::lock_guard l(m_IntermediateQueueMutex); + empty = m_IntermediateQueue.empty (); + m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); + } + if (empty) + m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ())); } - void NTCP2Session::PostI2NPMessages (std::vector > msgs) + void NTCP2Session::PostI2NPMessages () { if (m_IsTerminated) return; + std::list > msgs; + { + std::lock_guard l(m_IntermediateQueueMutex); + m_IntermediateQueue.swap (msgs); + } bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2; - for (auto it: msgs) - if (isSemiFull && it->onDrop) - it->Drop (); // drop earlier because we can handle it - else - m_SendQueue.push_back (std::move (it)); + if (isSemiFull) + { + for (auto it: msgs) + if (it->onDrop) + it->Drop (); // drop earlier because we can handle it + else + m_SendQueue.push_back (std::move (it)); + } + else + m_SendQueue.splice (m_SendQueue.end (), msgs); if (!m_IsSending && m_IsEstablished) SendQueue (); diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index f7912b54..27acb529 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -153,7 +153,7 @@ namespace transport void ServerLogin (); // Bob void SendLocalRouterInfo (bool update) override; // after handshake or by update - void SendI2NPMessages (const std::vector >& msgs) override; + void SendI2NPMessages (std::list >& msgs) override; void MoveSendQueue (std::shared_ptr other); private: @@ -196,7 +196,7 @@ namespace transport void SendRouterInfo (); void SendTermination (NTCP2TerminationReason reason); void SendTerminationAndTerminate (NTCP2TerminationReason reason); - void PostI2NPMessages (std::vector > msgs); + void PostI2NPMessages (); private: @@ -229,7 +229,10 @@ namespace transport bool m_IsSending, m_IsReceiving; std::list > m_SendQueue; uint64_t m_NextRouterInfoResendTime; // seconds since epoch - + + std::list > m_IntermediateQueue; // from transports + mutable std::mutex m_IntermediateQueueMutex; + uint16_t m_PaddingSizes[16]; int m_NextPaddingSize; }; diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 341d617e..c96bcf95 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -480,7 +480,7 @@ namespace data void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) { LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64()); - std::vector > requests; + std::list > requests; i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash(); i2p::data::IdentHash ih = ri.GetIdentHash(); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 3489a6ba..9ceb491e 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -293,6 +293,8 @@ namespace transport m_SentHandshakePacket.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr); m_PathChallenge.reset (nullptr); + if (!m_IntermediateQueue.empty ()) + m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -372,14 +374,27 @@ namespace transport } - void SSU2Session::SendI2NPMessages (const std::vector >& msgs) + void SSU2Session::SendI2NPMessages (std::list >& msgs) { - m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); + if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) return; + bool empty = false; + { + std::lock_guard l(m_IntermediateQueueMutex); + empty = m_IntermediateQueue.empty (); + m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); + } + if (empty) + m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ())); } - void SSU2Session::PostI2NPMessages (std::vector > msgs) + void SSU2Session::PostI2NPMessages () { if (m_State == eSSU2SessionStateTerminated) return; + std::list > msgs; + { + std::lock_guard l(m_IntermediateQueueMutex); + m_IntermediateQueue.swap (msgs); + } uint64_t mts = i2p::util::GetMonotonicMicroseconds (); bool isSemiFull = false; if (m_SendQueue.size ()) @@ -415,7 +430,7 @@ namespace transport void SSU2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::vector > msgs; + std::list > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -424,7 +439,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->PostI2NPMessages (msgs); + other->SendI2NPMessages (msgs); } bool SSU2Session::SendQueue () diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index d54731dc..4b3139a7 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -261,7 +261,7 @@ namespace transport void FlushData (); void Done () override; void SendLocalRouterInfo (bool update) override; - void SendI2NPMessages (const std::vector >& msgs) override; + void SendI2NPMessages (std::list >& msgs) override; void MoveSendQueue (std::shared_ptr other); uint32_t GetRelayTag () const override { return m_RelayTag; }; size_t Resend (uint64_t ts); // return number of resent packets @@ -307,7 +307,7 @@ namespace transport void Established (); void ScheduleConnectTimer (); void HandleConnectTimer (const boost::system::error_code& ecode); - void PostI2NPMessages (std::vector > msgs); + void PostI2NPMessages (); bool SendQueue (); // returns true if ack block was sent bool SendFragmentedMessage (std::shared_ptr msg); void ResendHandshakePacket (); @@ -381,6 +381,8 @@ namespace transport std::unordered_map, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; + std::list > m_IntermediateQueue; // from transports + mutable std::mutex m_IntermediateQueueMutex; bool m_IsDataReceived; double m_RTT; int m_MsgLocalExpirationTimeout; diff --git a/libi2pd/TransportSession.h b/libi2pd/TransportSession.h index c6bf0de3..6c878d11 100644 --- a/libi2pd/TransportSession.h +++ b/libi2pd/TransportSession.h @@ -144,8 +144,12 @@ namespace transport void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; }; virtual uint32_t GetRelayTag () const { return 0; }; - virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); }; - virtual void SendI2NPMessages (const std::vector >& msgs) = 0; + virtual void SendLocalRouterInfo (bool update = false) + { + std::list > msgs{ CreateDatabaseStoreMsg () }; + SendI2NPMessages (msgs); + }; + virtual void SendI2NPMessages (std::list >& msgs) = 0; virtual bool IsEstablished () const = 0; private: diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 34bc6142..3954e2cf 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -450,15 +450,23 @@ namespace transport void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { if (m_IsOnline) - SendMessages (ident, std::vector > {msg }); + SendMessages (ident, { msg }); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs) { m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); } - void Transports::PostMessages (i2p::data::IdentHash ident, std::vector > msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) + { + m_Service->post ([this, ident, msgs = std::move(msgs)] () + { + PostMessages (ident, msgs); + }); + } + + void Transports::PostMessages (i2p::data::IdentHash ident, std::list > msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { @@ -517,11 +525,16 @@ namespace transport return; } } - for (auto& it1: msgs) - if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) - it1->Drop (); // drop earlier because we can handle it - else - peer->delayedMessages.push_back (it1); + if (sz > MAX_NUM_DELAYED_MESSAGES/2) + { + for (auto& it1: msgs) + if (it1->onDrop) + it1->Drop (); // drop earlier because we can handle it + else + peer->delayedMessages.push_back (it1); + } + else + peer->delayedMessages.splice (peer->delayedMessages.end (), msgs); } else { @@ -865,7 +878,7 @@ namespace transport if (it->second->delayedMessages.size () > 0) { // check if first message is our DatabaseStore (publishing) - auto firstMsg = peer->delayedMessages[0]; + auto firstMsg = peer->delayedMessages.front (); if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) sendDatabaseStore = false; // we have it in the list already @@ -887,7 +900,10 @@ namespace transport return; } if (!session->IsOutgoing ()) // incoming - session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore + { + std::list > msgs{ CreateDatabaseStoreMsg () }; + session->SendI2NPMessages (msgs); // send DatabaseStore + } auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed if (r) r->GetProfile ()->Connected (); auto ts = i2p::util::GetSecondsSinceEpoch (); diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 70273094..e18ec29d 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -71,7 +71,7 @@ namespace transport std::shared_ptr router; std::list > sessions; uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime; - std::vector > delayedMessages; + std::list > delayedMessages; std::vector priority; bool isHighBandwidth, isEligible; @@ -141,7 +141,8 @@ namespace transport void ReuseX25519KeysPair (std::shared_ptr pair); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs); + void SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs); + void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); @@ -185,7 +186,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (i2p::data::IdentHash ident, std::vector > msgs); + void PostMessages (i2p::data::IdentHash ident, std::list > msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 85ff224e..77110c39 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -221,7 +221,7 @@ namespace tunnel void TunnelGateway::SendBuffer () { m_Buffer.CompleteCurrentTunnelDataMessage (); - std::vector > newTunnelMsgs; + std::list > newTunnelMsgs; const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); for (auto& tunnelMsg : tunnelDataMsgs) {