From 32ad4b4858535472116b11de6981cc8cd21f67ed Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 25 Sep 2024 14:34:52 -0400 Subject: [PATCH] fixed possible race conditions with m_SessionsByRouterHash --- libi2pd/SSU2.cpp | 155 +++++++++++++++++++++++------------------------ libi2pd/SSU2.h | 1 + 2 files changed, 78 insertions(+), 78 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 17bc1912..b5b9d25c 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -465,6 +465,7 @@ namespace transport auto ident = it->second->GetRemoteIdentity (); if (ident) { + std::lock_guard l(m_SessionsByRouterHashMutex); auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ()); if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ()) m_SessionsByRouterHash.erase (it1); @@ -488,22 +489,26 @@ namespace transport auto ident = session->GetRemoteIdentity (); if (ident) { - auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session); - if (!ret.second) + std::shared_ptr oldSession; { - auto oldSession = ret.first->second.lock (); - if (oldSession != session) - { - // session already exists - LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists"); - // move unsent msgs to new session - oldSession->MoveSendQueue (session); - // terminate existing - GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession)); + std::lock_guard l(m_SessionsByRouterHashMutex); + auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session); + if (!ret.second) + { + oldSession = ret.first->second.lock (); // update session ret.first->second = session; } - } + } + if (oldSession && oldSession != session) + { + // session already exists + LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists"); + // move unsent msgs to new session + oldSession->MoveSendQueue (session); + // terminate existing + GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession)); + } } } } @@ -511,12 +516,13 @@ namespace transport bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr session) { if (!session) return false; - std::unique_lock l(m_PendingOutgoingSessionsMutex); + std::lock_guard l(m_PendingOutgoingSessionsMutex); return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second; } std::shared_ptr SSU2Server::FindSession (const i2p::data::IdentHash& ident) { + std::lock_guard l(m_SessionsByRouterHashMutex); auto it = m_SessionsByRouterHash.find (ident); if (it != m_SessionsByRouterHash.end ()) { @@ -533,7 +539,7 @@ namespace transport std::shared_ptr SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const { - std::unique_lock l(m_PendingOutgoingSessionsMutex); + std::lock_guard l(m_PendingOutgoingSessionsMutex); auto it = m_PendingOutgoingSessions.find (ep); if (it != m_PendingOutgoingSessions.end ()) return it->second; @@ -542,7 +548,7 @@ namespace transport void SSU2Server::RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) { - std::unique_lock l(m_PendingOutgoingSessionsMutex); + std::lock_guard l(m_PendingOutgoingSessionsMutex); m_PendingOutgoingSessions.erase (ep); } @@ -681,7 +687,7 @@ namespace transport if (it1->second->GetState () == eSSU2SessionStateSessionRequestSent && it1->second->ProcessSessionCreated (buf, len)) { - std::unique_lock l(m_PendingOutgoingSessionsMutex); + std::lock_guard l(m_PendingOutgoingSessionsMutex); m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint } else @@ -778,13 +784,12 @@ namespace transport if (router && address) { // check if no session - auto it = m_SessionsByRouterHash.find (router->GetIdentHash ()); - if (it != m_SessionsByRouterHash.end ()) + auto existingSession = FindSession (router->GetIdentHash ()); + if (existingSession) { // session with router found, trying to send peer test if requested - auto session = it->second.lock (); - if (peerTest && session && session->IsEstablished ()) - GetService ().post ([session]() { session->SendPeerTest (); }); + if (peerTest && existingSession->IsEstablished ()) + GetService ().post ([existingSession]() { existingSession->SendPeerTest (); }); return false; } // check is no pending session @@ -841,19 +846,15 @@ namespace transport { if (it.iTag && ts < it.iExp) { - auto it1 = m_SessionsByRouterHash.find (it.iH); - if (it1 != m_SessionsByRouterHash.end ()) + auto s = FindSession (it.iH); + if (s) { - auto s = it1->second.lock (); - if (s) + auto addr = s->GetAddress (); + if (addr && addr->IsIntroducer ()) { - auto addr = s->GetAddress (); - if (addr && addr->IsIntroducer ()) - { - s->Introduce (session, it.iTag); - return; - } - } + s->Introduce (session, it.iTag); + return; + } } else indices.push_back(i); @@ -959,10 +960,7 @@ namespace transport if (!router) return false; auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address (); if (!addr) return false; - std::shared_ptr session; - auto it = m_SessionsByRouterHash.find (router->GetIdentHash ()); - if (it != m_SessionsByRouterHash.end ()) - session = it->second.lock (); + auto session = FindSession (router->GetIdentHash ()); if (session) { auto remoteAddr = session->GetAddress (); @@ -992,17 +990,20 @@ namespace transport if (ecode != boost::asio::error::operation_aborted) { auto ts = i2p::util::GetSecondsSinceEpoch (); - for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();) + { - if (it->second->IsTerminationTimeoutExpired (ts)) + std::lock_guard l(m_PendingOutgoingSessionsMutex); + for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();) { - //it->second->Terminate (); - std::unique_lock l(m_PendingOutgoingSessionsMutex); - it = m_PendingOutgoingSessions.erase (it); + if (it->second->IsTerminationTimeoutExpired (ts)) + { + //it->second->Terminate (); + it = m_PendingOutgoingSessions.erase (it); + } + else + it++; } - else - it++; - } + } for (auto it: m_Sessions) { @@ -1020,14 +1021,6 @@ namespace transport it.second->CleanUp (ts); } - for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();) - { - if (it->second.expired ()) - it = m_SessionsByRouterHash.erase (it); - else - it++; - } - ScheduleTermination (); } } @@ -1083,6 +1076,17 @@ namespace transport else it++; } + + { + std::lock_guard l(m_SessionsByRouterHashMutex); + for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();) + { + if (it->second.expired ()) + it = m_SessionsByRouterHash.erase (it); + else + it++; + } + } m_PacketsPool.CleanUpMt (); m_SentPacketsPool.CleanUp (); @@ -1204,27 +1208,26 @@ namespace transport std::unordered_set excluded; for (const auto& it : introducers) { - std::shared_ptr session; - auto it1 = m_SessionsByRouterHash.find (it); - if (it1 != m_SessionsByRouterHash.end ()) - { - session = it1->second.lock (); + std::shared_ptr session = FindSession (it); + if (session) excluded.insert (it); - } - if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer? - ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION) + if (session) { - session->SendKeepAlive (); - if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION) - newList.push_back (it); - else + if (session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer? + ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION) { - impliedList.push_back (it); // keep in introducers list, but not publish - session = nullptr; - } + session->SendKeepAlive (); + if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION) + newList.push_back (it); + else + { + impliedList.push_back (it); // keep in introducers list, but not publish + session = nullptr; + } + } + else + session = nullptr; } - else - session = nullptr; if (!session) i2p::context.RemoveSSU2Introducer (it, v4); @@ -1239,16 +1242,12 @@ namespace transport impliedList.clear (); for (auto& it : introducers) { - auto it1 = m_SessionsByRouterHash.find (it); - if (it1 != m_SessionsByRouterHash.end ()) + auto session = FindSession (it); + if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ()) { - auto session = it1->second.lock (); - if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ()) - { - session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION); - if (std::find (newList.begin (), newList.end (), it) == newList.end ()) - sessions.push_back (session); - } + session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION); + if (std::find (newList.begin (), newList.end (), it) == newList.end ()) + sessions.push_back (session); } } } diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 2c5e8cc6..06541010 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -164,6 +164,7 @@ namespace transport boost::asio::ip::address m_AddressV4, m_AddressV6; std::unordered_map > m_Sessions; std::unordered_map > m_SessionsByRouterHash; + mutable std::mutex m_SessionsByRouterHashMutex; std::map > m_PendingOutgoingSessions; mutable std::mutex m_PendingOutgoingSessionsMutex; std::map > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)