fixed possible race conditions with m_SessionsByRouterHash

This commit is contained in:
orignal 2024-09-25 14:34:52 -04:00
parent 98669eff4f
commit 32ad4b4858
2 changed files with 78 additions and 78 deletions

View File

@ -465,6 +465,7 @@ namespace transport
auto ident = it->second->GetRemoteIdentity (); auto ident = it->second->GetRemoteIdentity ();
if (ident) if (ident)
{ {
std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ()); auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ());
if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ()) if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ())
m_SessionsByRouterHash.erase (it1); m_SessionsByRouterHash.erase (it1);
@ -488,22 +489,26 @@ namespace transport
auto ident = session->GetRemoteIdentity (); auto ident = session->GetRemoteIdentity ();
if (ident) if (ident)
{ {
auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session); std::shared_ptr<SSU2Session> oldSession;
if (!ret.second)
{ {
auto oldSession = ret.first->second.lock (); std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
if (oldSession != session) auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session);
{ if (!ret.second)
// session already exists {
LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists"); oldSession = ret.first->second.lock ();
// move unsent msgs to new session
oldSession->MoveSendQueue (session);
// terminate existing
GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession));
// update session // update session
ret.first->second = session; ret.first->second = session;
} }
} }
if (oldSession && oldSession != session)
{
// session already exists
LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists");
// move unsent msgs to new session
oldSession->MoveSendQueue (session);
// terminate existing
GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession));
}
} }
} }
} }
@ -511,12 +516,13 @@ namespace transport
bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session) bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session)
{ {
if (!session) return false; if (!session) return false;
std::unique_lock<std::mutex> l(m_PendingOutgoingSessionsMutex); std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second; return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second;
} }
std::shared_ptr<SSU2Session> SSU2Server::FindSession (const i2p::data::IdentHash& ident) std::shared_ptr<SSU2Session> SSU2Server::FindSession (const i2p::data::IdentHash& ident)
{ {
std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
auto it = m_SessionsByRouterHash.find (ident); auto it = m_SessionsByRouterHash.find (ident);
if (it != m_SessionsByRouterHash.end ()) if (it != m_SessionsByRouterHash.end ())
{ {
@ -533,7 +539,7 @@ namespace transport
std::shared_ptr<SSU2Session> SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const std::shared_ptr<SSU2Session> SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const
{ {
std::unique_lock<std::mutex> l(m_PendingOutgoingSessionsMutex); std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
auto it = m_PendingOutgoingSessions.find (ep); auto it = m_PendingOutgoingSessions.find (ep);
if (it != m_PendingOutgoingSessions.end ()) if (it != m_PendingOutgoingSessions.end ())
return it->second; return it->second;
@ -542,7 +548,7 @@ namespace transport
void SSU2Server::RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) void SSU2Server::RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep)
{ {
std::unique_lock<std::mutex> l(m_PendingOutgoingSessionsMutex); std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
m_PendingOutgoingSessions.erase (ep); m_PendingOutgoingSessions.erase (ep);
} }
@ -681,7 +687,7 @@ namespace transport
if (it1->second->GetState () == eSSU2SessionStateSessionRequestSent && if (it1->second->GetState () == eSSU2SessionStateSessionRequestSent &&
it1->second->ProcessSessionCreated (buf, len)) it1->second->ProcessSessionCreated (buf, len))
{ {
std::unique_lock<std::mutex> l(m_PendingOutgoingSessionsMutex); std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint
} }
else else
@ -778,13 +784,12 @@ namespace transport
if (router && address) if (router && address)
{ {
// check if no session // check if no session
auto it = m_SessionsByRouterHash.find (router->GetIdentHash ()); auto existingSession = FindSession (router->GetIdentHash ());
if (it != m_SessionsByRouterHash.end ()) if (existingSession)
{ {
// session with router found, trying to send peer test if requested // session with router found, trying to send peer test if requested
auto session = it->second.lock (); if (peerTest && existingSession->IsEstablished ())
if (peerTest && session && session->IsEstablished ()) GetService ().post ([existingSession]() { existingSession->SendPeerTest (); });
GetService ().post ([session]() { session->SendPeerTest (); });
return false; return false;
} }
// check is no pending session // check is no pending session
@ -841,19 +846,15 @@ namespace transport
{ {
if (it.iTag && ts < it.iExp) if (it.iTag && ts < it.iExp)
{ {
auto it1 = m_SessionsByRouterHash.find (it.iH); auto s = FindSession (it.iH);
if (it1 != m_SessionsByRouterHash.end ()) if (s)
{ {
auto s = it1->second.lock (); auto addr = s->GetAddress ();
if (s) if (addr && addr->IsIntroducer ())
{ {
auto addr = s->GetAddress (); s->Introduce (session, it.iTag);
if (addr && addr->IsIntroducer ()) return;
{ }
s->Introduce (session, it.iTag);
return;
}
}
} }
else else
indices.push_back(i); indices.push_back(i);
@ -959,10 +960,7 @@ namespace transport
if (!router) return false; if (!router) return false;
auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address (); auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address ();
if (!addr) return false; if (!addr) return false;
std::shared_ptr<SSU2Session> session; auto session = FindSession (router->GetIdentHash ());
auto it = m_SessionsByRouterHash.find (router->GetIdentHash ());
if (it != m_SessionsByRouterHash.end ())
session = it->second.lock ();
if (session) if (session)
{ {
auto remoteAddr = session->GetAddress (); auto remoteAddr = session->GetAddress ();
@ -992,17 +990,20 @@ namespace transport
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();)
{ {
if (it->second->IsTerminationTimeoutExpired (ts)) std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();)
{ {
//it->second->Terminate (); if (it->second->IsTerminationTimeoutExpired (ts))
std::unique_lock<std::mutex> l(m_PendingOutgoingSessionsMutex); {
it = m_PendingOutgoingSessions.erase (it); //it->second->Terminate ();
it = m_PendingOutgoingSessions.erase (it);
}
else
it++;
} }
else }
it++;
}
for (auto it: m_Sessions) for (auto it: m_Sessions)
{ {
@ -1020,14 +1021,6 @@ namespace transport
it.second->CleanUp (ts); it.second->CleanUp (ts);
} }
for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();)
{
if (it->second.expired ())
it = m_SessionsByRouterHash.erase (it);
else
it++;
}
ScheduleTermination (); ScheduleTermination ();
} }
} }
@ -1083,6 +1076,17 @@ namespace transport
else else
it++; it++;
} }
{
std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();)
{
if (it->second.expired ())
it = m_SessionsByRouterHash.erase (it);
else
it++;
}
}
m_PacketsPool.CleanUpMt (); m_PacketsPool.CleanUpMt ();
m_SentPacketsPool.CleanUp (); m_SentPacketsPool.CleanUp ();
@ -1204,27 +1208,26 @@ namespace transport
std::unordered_set<i2p::data::IdentHash> excluded; std::unordered_set<i2p::data::IdentHash> excluded;
for (const auto& it : introducers) for (const auto& it : introducers)
{ {
std::shared_ptr<SSU2Session> session; std::shared_ptr<SSU2Session> session = FindSession (it);
auto it1 = m_SessionsByRouterHash.find (it); if (session)
if (it1 != m_SessionsByRouterHash.end ())
{
session = it1->second.lock ();
excluded.insert (it); excluded.insert (it);
} if (session)
if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer?
ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION)
{ {
session->SendKeepAlive (); if (session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer?
if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION) ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION)
newList.push_back (it);
else
{ {
impliedList.push_back (it); // keep in introducers list, but not publish session->SendKeepAlive ();
session = nullptr; if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION)
} newList.push_back (it);
else
{
impliedList.push_back (it); // keep in introducers list, but not publish
session = nullptr;
}
}
else
session = nullptr;
} }
else
session = nullptr;
if (!session) if (!session)
i2p::context.RemoveSSU2Introducer (it, v4); i2p::context.RemoveSSU2Introducer (it, v4);
@ -1239,16 +1242,12 @@ namespace transport
impliedList.clear (); impliedList.clear ();
for (auto& it : introducers) for (auto& it : introducers)
{ {
auto it1 = m_SessionsByRouterHash.find (it); auto session = FindSession (it);
if (it1 != m_SessionsByRouterHash.end ()) if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ())
{ {
auto session = it1->second.lock (); session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION);
if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ()) if (std::find (newList.begin (), newList.end (), it) == newList.end ())
{ sessions.push_back (session);
session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION);
if (std::find (newList.begin (), newList.end (), it) == newList.end ())
sessions.push_back (session);
}
} }
} }
} }

View File

@ -164,6 +164,7 @@ namespace transport
boost::asio::ip::address m_AddressV4, m_AddressV6; boost::asio::ip::address m_AddressV4, m_AddressV6;
std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions; std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions;
std::unordered_map<i2p::data::IdentHash, std::weak_ptr<SSU2Session> > m_SessionsByRouterHash; std::unordered_map<i2p::data::IdentHash, std::weak_ptr<SSU2Session> > m_SessionsByRouterHash;
mutable std::mutex m_SessionsByRouterHashMutex;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
mutable std::mutex m_PendingOutgoingSessionsMutex; mutable std::mutex m_PendingOutgoingSessionsMutex;
std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)