don't connect peer test session. Use weak_ptr for seesions by hash

This commit is contained in:
orignal 2024-09-24 20:03:15 -04:00
parent edb2ba7107
commit 0912de5b77
4 changed files with 67 additions and 42 deletions

View File

@ -450,6 +450,7 @@ namespace transport
if (session) if (session)
{ {
m_Sessions.emplace (session->GetConnID (), session); m_Sessions.emplace (session->GetConnID (), session);
if (session->GetState () != eSSU2SessionStatePeerTest)
AddSessionByRouterHash (session); AddSessionByRouterHash (session);
} }
} }
@ -458,14 +459,17 @@ namespace transport
{ {
auto it = m_Sessions.find (connID); auto it = m_Sessions.find (connID);
if (it != m_Sessions.end ()) if (it != m_Sessions.end ())
{
if (it->second->GetState () != eSSU2SessionStatePeerTest)
{ {
auto ident = it->second->GetRemoteIdentity (); auto ident = it->second->GetRemoteIdentity ();
if (ident) if (ident)
{ {
auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ()); auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ());
if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second) if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ())
m_SessionsByRouterHash.erase (it1); m_SessionsByRouterHash.erase (it1);
} }
}
if (m_LastSession == it->second) if (m_LastSession == it->second)
m_LastSession = nullptr; m_LastSession = nullptr;
m_Sessions.erase (it); m_Sessions.erase (it);
@ -480,20 +484,24 @@ namespace transport
if (ident) if (ident)
{ {
auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session); auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session);
if (!ret.second && ret.first->second != session) if (!ret.second)
{
auto oldSession = ret.first->second.lock ();
if (oldSession != session)
{ {
// session already exists // session already exists
LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists"); LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists");
// move unsent msgs to new session // move unsent msgs to new session
ret.first->second->MoveSendQueue (session); oldSession->MoveSendQueue (session);
// terminate existing // terminate existing
GetService ().post (std::bind (&SSU2Session::RequestTermination, ret.first->second, eSSU2TerminationReasonReplacedByNewSession)); GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession));
// update session // update session
ret.first->second = session; ret.first->second = session;
} }
} }
} }
} }
}
bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session) bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session)
{ {
@ -506,7 +514,7 @@ namespace transport
{ {
auto it = m_SessionsByRouterHash.find (ident); auto it = m_SessionsByRouterHash.find (ident);
if (it != m_SessionsByRouterHash.end ()) if (it != m_SessionsByRouterHash.end ())
return it->second; return it->second.lock ();
return nullptr; return nullptr;
} }
@ -761,11 +769,9 @@ namespace transport
if (it != m_SessionsByRouterHash.end ()) if (it != m_SessionsByRouterHash.end ())
{ {
// session with router found, trying to send peer test if requested // session with router found, trying to send peer test if requested
if (peerTest && it->second->IsEstablished ()) auto session = it->second.lock ();
{ if (peerTest && session && session->IsEstablished ())
auto session = it->second;
GetService ().post ([session]() { session->SendPeerTest (); }); GetService ().post ([session]() { session->SendPeerTest (); });
}
return false; return false;
} }
// check is no pending session // check is no pending session
@ -825,13 +831,17 @@ namespace transport
auto it1 = m_SessionsByRouterHash.find (it.iH); auto it1 = m_SessionsByRouterHash.find (it.iH);
if (it1 != m_SessionsByRouterHash.end ()) if (it1 != m_SessionsByRouterHash.end ())
{ {
auto addr = it1->second->GetAddress (); auto s = it1->second.lock ();
if (s)
{
auto addr = s->GetAddress ();
if (addr && addr->IsIntroducer ()) if (addr && addr->IsIntroducer ())
{ {
it1->second->Introduce (session, it.iTag); s->Introduce (session, it.iTag);
return; return;
} }
} }
}
else else
indices.push_back(i); indices.push_back(i);
} }
@ -936,17 +946,19 @@ namespace transport
if (!router) return false; if (!router) return false;
auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address (); auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address ();
if (!addr) return false; if (!addr) return false;
std::shared_ptr<SSU2Session> session;
auto it = m_SessionsByRouterHash.find (router->GetIdentHash ()); auto it = m_SessionsByRouterHash.find (router->GetIdentHash ());
if (it != m_SessionsByRouterHash.end ()) if (it != m_SessionsByRouterHash.end ())
session = it->second.lock ();
if (session)
{ {
auto remoteAddr = it->second->GetAddress (); auto remoteAddr = session->GetAddress ();
if (!remoteAddr || !remoteAddr->IsPeerTesting () || if (!remoteAddr || !remoteAddr->IsPeerTesting () ||
(v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false; (v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false;
auto s = it->second; if (session->IsEstablished ())
if (s->IsEstablished ()) GetService ().post ([session]() { session->SendPeerTest (); });
GetService ().post ([s]() { s->SendPeerTest (); });
else else
s->SetOnEstablished ([s]() { s->SendPeerTest (); }); session->SetOnEstablished ([session]() { session->SendPeerTest (); });
return true; return true;
} }
else else
@ -997,7 +1009,7 @@ namespace transport
for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();) for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();)
{ {
if (it->second && it->second->GetState () == eSSU2SessionStateTerminated) if (it->second.expired ())
it = m_SessionsByRouterHash.erase (it); it = m_SessionsByRouterHash.erase (it);
else else
it++; it++;
@ -1183,7 +1195,7 @@ namespace transport
auto it1 = m_SessionsByRouterHash.find (it); auto it1 = m_SessionsByRouterHash.find (it);
if (it1 != m_SessionsByRouterHash.end ()) if (it1 != m_SessionsByRouterHash.end ())
{ {
session = it1->second; session = it1->second.lock ();
excluded.insert (it); excluded.insert (it);
} }
if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer? if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer?
@ -1217,8 +1229,8 @@ namespace transport
auto it1 = m_SessionsByRouterHash.find (it); auto it1 = m_SessionsByRouterHash.find (it);
if (it1 != m_SessionsByRouterHash.end ()) if (it1 != m_SessionsByRouterHash.end ())
{ {
auto session = it1->second; auto session = it1->second.lock ();
if (session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ()) if (session && session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing ())
{ {
session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION); session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION);
if (std::find (newList.begin (), newList.end (), it) == newList.end ()) if (std::find (newList.begin (), newList.end (), it) == newList.end ())

View File

@ -162,7 +162,7 @@ namespace transport
boost::asio::ip::udp::socket m_SocketV4, m_SocketV6; boost::asio::ip::udp::socket m_SocketV4, m_SocketV6;
boost::asio::ip::address m_AddressV4, m_AddressV6; boost::asio::ip::address m_AddressV4, m_AddressV6;
std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions; std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions;
std::unordered_map<i2p::data::IdentHash, std::shared_ptr<SSU2Session> > m_SessionsByRouterHash; std::unordered_map<i2p::data::IdentHash, std::weak_ptr<SSU2Session> > m_SessionsByRouterHash;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
mutable std::mutex m_PendingOutgoingSessionsMutex; mutable std::mutex m_PendingOutgoingSessionsMutex;
std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)

View File

@ -3140,7 +3140,7 @@ namespace transport
SendPeerTest (7, buf + offset, len - offset); SendPeerTest (7, buf + offset, len - offset);
else else
LogPrint (eLogWarning, "SSU2: Unknown address for peer test 6"); LogPrint (eLogWarning, "SSU2: Unknown address for peer test 6");
GetServer ().RemoveSession (~htobe64 (((uint64_t)nonce << 32) | nonce)); GetServer ().RemoveSession (GetConnID ());
break; break;
} }
case 7: // Alice from Charlie 2 case 7: // Alice from Charlie 2
@ -3148,7 +3148,7 @@ namespace transport
auto addr = GetAddress (); auto addr = GetAddress ();
if (addr && addr->IsV6 ()) if (addr && addr->IsV6 ())
i2p::context.SetStatusV6 (eRouterStatusOK); // set status OK for ipv6 even if from SSU2 i2p::context.SetStatusV6 (eRouterStatusOK); // set status OK for ipv6 even if from SSU2
GetServer ().RemoveSession (htobe64 (((uint64_t)nonce << 32) | nonce)); GetServer ().RemoveSession (GetConnID ());
break; break;
} }
default: default:
@ -3203,5 +3203,16 @@ namespace transport
SetAddress (addr); SetAddress (addr);
SendPeerTest (msg, signedData, signedDataLen); SendPeerTest (msg, signedData, signedDataLen);
} }
void SSU2PeerTestSession::Connect ()
{
LogPrint (eLogError, "SSU2: Can't connect peer test session");
}
bool SSU2PeerTestSession::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len)
{
LogPrint (eLogError, "SSU2: Can't handle incoming message in peer test session");
return false;
}
} }
} }

View File

@ -248,7 +248,7 @@ namespace transport
void SetOnEstablished (OnEstablished e) { m_OnEstablished = e; }; void SetOnEstablished (OnEstablished e) { m_OnEstablished = e; };
OnEstablished GetOnEstablished () const { return m_OnEstablished; }; OnEstablished GetOnEstablished () const { return m_OnEstablished; };
void Connect (); virtual void Connect ();
bool Introduce (std::shared_ptr<SSU2Session> session, uint32_t relayTag); bool Introduce (std::shared_ptr<SSU2Session> session, uint32_t relayTag);
void WaitForIntroduction (); void WaitForIntroduction ();
void SendPeerTest (); // Alice, Data message void SendPeerTest (); // Alice, Data message
@ -268,7 +268,7 @@ namespace transport
SSU2SessionState GetState () const { return m_State; }; SSU2SessionState GetState () const { return m_State; };
void SetState (SSU2SessionState state) { m_State = state; }; void SetState (SSU2SessionState state) { m_State = state; };
bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len); virtual bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len);
bool ProcessSessionCreated (uint8_t * buf, size_t len); bool ProcessSessionCreated (uint8_t * buf, size_t len);
bool ProcessSessionConfirmed (uint8_t * buf, size_t len); bool ProcessSessionConfirmed (uint8_t * buf, size_t len);
bool ProcessRetry (uint8_t * buf, size_t len); bool ProcessRetry (uint8_t * buf, size_t len);
@ -404,6 +404,8 @@ namespace transport
void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen, void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr); std::shared_ptr<const i2p::data::RouterInfo::Address> addr);
bool ProcessPeerTest (uint8_t * buf, size_t len) override; bool ProcessPeerTest (uint8_t * buf, size_t len) override;
void Connect () override; // outgoing
bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) override; // incoming
private: private: