From c3a1631319cb770bef735a5d28ce96f2ea44ba87 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 26 Sep 2024 18:38:17 -0400 Subject: [PATCH] use weak_ptr for Bob's peer tests and relay tags --- libi2pd/SSU2.cpp | 15 +++++++------ libi2pd/SSU2.h | 3 ++- libi2pd/SSU2Session.cpp | 48 ++++++++++++++++++++++------------------- libi2pd/SSU2Session.h | 2 +- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 80ddbed4..6378d7b8 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -370,7 +370,7 @@ namespace transport { std::vector packets; packets.push_back (packet); - while (moreBytes && packets.size () < 32) + while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH) { packet = m_PacketsPool.AcquireMt (); packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); @@ -599,10 +599,13 @@ namespace transport auto it = m_Relays.find (tag); if (it != m_Relays.end ()) { - if (it->second->IsEstablished ()) - return it->second; - else - m_Relays.erase (it); + if (!it->second.expired ()) + { + auto s = it->second.lock (); + if (s && s->IsEstablished ()) + return s; + } + m_Relays.erase (it); } return nullptr; } @@ -1045,7 +1048,7 @@ namespace transport auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = m_Relays.begin (); it != m_Relays.begin ();) { - if (it->second && it->second->GetState () == eSSU2SessionStateTerminated) + if (it->second.expired ()) it = m_Relays.erase (it); else it++; diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 381b90c8..bae33aa2 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -40,6 +40,7 @@ namespace transport const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds const int SSU2_HOLE_PUNCH_EXPIRATION = 150; // in seconds + const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 32; class SSU2Server: private i2p::util::RunnableServiceWithWork { @@ -168,7 +169,7 @@ namespace transport std::map > m_PendingOutgoingSessions; mutable std::mutex m_PendingOutgoingSessionsMutex; std::map > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) - std::unordered_map > m_Relays; // we are introducer, relay tag -> session + std::unordered_map > m_Relays; // we are introducer, relay tag -> session std::list m_Introducers, m_IntroducersV6; // introducers we are connected to i2p::util::MemoryPoolMt m_PacketsPool; i2p::util::MemoryPool m_SentPacketsPool; diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index bc47690d..e9994b9f 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -2252,27 +2252,31 @@ namespace transport case 3: // Bob from Charlie { auto it = m_PeerTests.find (nonce); - if (it != m_PeerTests.end () && it->second.first) + if (it != m_PeerTests.end ()) { - uint8_t payload[SSU2_MAX_PACKET_SIZE]; - // Charlie's RouterInfo - auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); - if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; - size_t payloadSize = r ? CreateRouterInfoBlock (payload, m_MaxPayloadSize - len - 32, r) : 0; - if (!payloadSize && r) - it->second.first->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); - if (payloadSize + len + 16 > m_MaxPayloadSize) - { - // doesn't fit one message, send RouterInfo in separate message - it->second.first->SendData (payload, payloadSize); - payloadSize = 0; - } - // PeerTest to Alice - payloadSize += CreatePeerTestBlock (payload + payloadSize, m_MaxPayloadSize, 4, - (SSU2PeerTestCode)buf[1], GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset); - if (payloadSize < m_MaxPayloadSize) - payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize); - it->second.first->SendData (payload, payloadSize); + auto aliceSession = it->second.first.lock (); + if (aliceSession && aliceSession->IsEstablished ()) + { + uint8_t payload[SSU2_MAX_PACKET_SIZE]; + // Charlie's RouterInfo + auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); + if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; + size_t payloadSize = r ? CreateRouterInfoBlock (payload, m_MaxPayloadSize - len - 32, r) : 0; + if (!payloadSize && r) + aliceSession->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); + if (payloadSize + len + 16 > m_MaxPayloadSize) + { + // doesn't fit one message, send RouterInfo in separate message + aliceSession->SendData (payload, payloadSize); + payloadSize = 0; + } + // PeerTest to Alice + payloadSize += CreatePeerTestBlock (payload + payloadSize, m_MaxPayloadSize, 4, + (SSU2PeerTestCode)buf[1], GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset); + if (payloadSize < m_MaxPayloadSize) + payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize); + aliceSession->SendData (payload, payloadSize); + } m_PeerTests.erase (it); } else @@ -3028,9 +3032,9 @@ namespace transport } for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();) { - if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT) + if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT || it->second.first.expired ()) { - LogPrint (eLogWarning, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds, deleted"); + LogPrint (eLogWarning, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds or session invalid. Deleted"); it = m_PeerTests.erase (it); } else diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index d261bc6d..fdd7cc8e 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -373,7 +373,7 @@ namespace transport std::map > m_SentPackets; // packetNum -> packet std::unordered_map > m_IncompleteMessages; // msgID -> I2NP std::unordered_map, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice - std::unordered_map, uint64_t > > m_PeerTests; // same as for relay sessions + std::unordered_map, uint64_t > > m_PeerTests; // nonce->(Alice, timestamp). We are Bob std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; bool m_IsDataReceived;