Merge pull request #2029 from Vort/tunnel_test_msg

add tunnel test message
This commit is contained in:
orignal 2024-02-27 08:03:27 -05:00 committed by GitHub
commit 3215125950
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 78 additions and 35 deletions

View File

@ -367,9 +367,11 @@ namespace client
HandleDataMessage (payload, len); HandleDataMessage (payload, len);
break; break;
case eI2NPDeliveryStatus: case eI2NPDeliveryStatus:
// try tunnel test first HandleDeliveryStatusMessage (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET));
if (!m_Pool || !m_Pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET))) break;
HandleDeliveryStatusMessage (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET)); case eI2NPTunnelTest:
if (m_Pool)
m_Pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET));
break; break;
case eI2NPDatabaseStore: case eI2NPDatabaseStore:
HandleDatabaseStoreMessage (payload, len); HandleDatabaseStoreMessage (payload, len);

View File

@ -115,6 +115,17 @@ namespace i2p
return newMsg; return newMsg;
} }
std::shared_ptr<I2NPMessage> CreateTunnelTestMsg (uint32_t msgID)
{
auto m = NewI2NPShortMessage ();
uint8_t * buf = m->GetPayload ();
htobe32buf (buf + TUNNEL_TEST_MSGID_OFFSET, msgID);
htobe64buf (buf + TUNNEL_TEST_TIMESTAMP_OFFSET, i2p::util::GetSteadyMicroseconds ());
m->len += TUNNEL_TEST_SIZE;
m->FillI2NPMessageHeader (eI2NPTunnelTest);
return m;
}
std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID) std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID)
{ {
auto m = NewI2NPShortMessage (); auto m = NewI2NPShortMessage ();
@ -870,6 +881,10 @@ namespace i2p
i2p::context.ProcessDeliveryStatusMessage (msg); i2p::context.ProcessDeliveryStatusMessage (msg);
break; break;
} }
case eI2NPTunnelTest:
if (msg->from && msg->from->GetTunnelPool ())
msg->from->GetTunnelPool ()->ProcessTunnelTest (msg);
break;
case eI2NPVariableTunnelBuild: case eI2NPVariableTunnelBuild:
case eI2NPTunnelBuild: case eI2NPTunnelBuild:
case eI2NPShortTunnelBuild: case eI2NPShortTunnelBuild:

View File

@ -48,6 +48,11 @@ namespace i2p
const size_t DELIVERY_STATUS_TIMESTAMP_OFFSET = DELIVERY_STATUS_MSGID_OFFSET + 4; const size_t DELIVERY_STATUS_TIMESTAMP_OFFSET = DELIVERY_STATUS_MSGID_OFFSET + 4;
const size_t DELIVERY_STATUS_SIZE = DELIVERY_STATUS_TIMESTAMP_OFFSET + 8; const size_t DELIVERY_STATUS_SIZE = DELIVERY_STATUS_TIMESTAMP_OFFSET + 8;
// TunnelTest
const size_t TUNNEL_TEST_MSGID_OFFSET = 0;
const size_t TUNNEL_TEST_TIMESTAMP_OFFSET = TUNNEL_TEST_MSGID_OFFSET + 4;
const size_t TUNNEL_TEST_SIZE = TUNNEL_TEST_TIMESTAMP_OFFSET + 8;
// DatabaseStore // DatabaseStore
const size_t DATABASE_STORE_KEY_OFFSET = 0; const size_t DATABASE_STORE_KEY_OFFSET = 0;
const size_t DATABASE_STORE_TYPE_OFFSET = DATABASE_STORE_KEY_OFFSET + 32; const size_t DATABASE_STORE_TYPE_OFFSET = DATABASE_STORE_KEY_OFFSET + 32;
@ -116,7 +121,8 @@ namespace i2p
eI2NPVariableTunnelBuild = 23, eI2NPVariableTunnelBuild = 23,
eI2NPVariableTunnelBuildReply = 24, eI2NPVariableTunnelBuildReply = 24,
eI2NPShortTunnelBuild = 25, eI2NPShortTunnelBuild = 25,
eI2NPShortTunnelBuildReply = 26 eI2NPShortTunnelBuildReply = 26,
eI2NPTunnelTest = 231
}; };
const uint8_t TUNNEL_BUILD_RECORD_GATEWAY_FLAG = 0x80; const uint8_t TUNNEL_BUILD_RECORD_GATEWAY_FLAG = 0x80;
@ -279,6 +285,7 @@ namespace tunnel
std::shared_ptr<I2NPMessage> CreateI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from = nullptr); std::shared_ptr<I2NPMessage> CreateI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from = nullptr);
std::shared_ptr<I2NPMessage> CopyI2NPMessage (std::shared_ptr<I2NPMessage> msg); std::shared_ptr<I2NPMessage> CopyI2NPMessage (std::shared_ptr<I2NPMessage> msg);
std::shared_ptr<I2NPMessage> CreateTunnelTestMsg (uint32_t msgID);
std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID); std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID);
std::shared_ptr<I2NPMessage> CreateRouterInfoDatabaseLookupMsg (const uint8_t * key, const uint8_t * from, std::shared_ptr<I2NPMessage> CreateRouterInfoDatabaseLookupMsg (const uint8_t * key, const uint8_t * from,
uint32_t replyTunnelID, bool exploratory = false, std::set<i2p::data::IdentHash> * excludedPeers = nullptr); uint32_t replyTunnelID, bool exploratory = false, std::set<i2p::data::IdentHash> * excludedPeers = nullptr);

View File

@ -1152,13 +1152,13 @@ namespace i2p
bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID) bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID)
{ {
if (typeID == eI2NPDeliveryStatus) if (typeID == eI2NPTunnelTest)
{ {
// try tunnel test // try tunnel test
auto pool = GetTunnelPool (); auto pool = GetTunnelPool ();
if (pool && pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET))) if (pool && pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET)))
return true; return true;
} }
auto msg = CreateI2NPMessage (typeID, payload, len, msgID); auto msg = CreateI2NPMessage (typeID, payload, len, msgID);
if (!msg) return false; if (!msg) return false;
i2p::HandleI2NPMessage (msg); i2p::HandleI2NPMessage (msg);

View File

@ -232,6 +232,12 @@ namespace util
return GetLocalHoursSinceEpoch () + g_TimeOffset/3600; return GetLocalHoursSinceEpoch () + g_TimeOffset/3600;
} }
uint64_t GetSteadyMicroseconds()
{
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
void GetCurrentDate (char * date) void GetCurrentDate (char * date)
{ {
GetDateString (GetSecondsSinceEpoch (), date); GetDateString (GetSecondsSinceEpoch (), date);

View File

@ -24,6 +24,8 @@ namespace util
uint32_t GetMinutesSinceEpoch (); uint32_t GetMinutesSinceEpoch ();
uint32_t GetHoursSinceEpoch (); uint32_t GetHoursSinceEpoch ();
uint64_t GetSteadyMicroseconds();
void GetCurrentDate (char * date); // returns date as YYYYMMDD string, 9 bytes void GetCurrentDate (char * date); // returns date as YYYYMMDD string, 9 bytes
void GetDateString (uint64_t timestamp, char * date); // timestamp is seconds since epoch, returns date as YYYYMMDD string, 9 bytes void GetDateString (uint64_t timestamp, char * date); // timestamp is seconds since epoch, returns date as YYYYMMDD string, 9 bytes
void AdjustTimeOffset (int64_t offset); // in seconds from current void AdjustTimeOffset (int64_t offset); // in seconds from current

View File

@ -33,7 +33,7 @@ namespace tunnel
TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()), TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()),
m_Config (config), m_IsShortBuildMessage (false), m_Pool (nullptr), m_Config (config), m_IsShortBuildMessage (false), m_Pool (nullptr),
m_State (eTunnelStatePending), m_FarEndTransports (i2p::data::RouterInfo::eAllTransports), m_State (eTunnelStatePending), m_FarEndTransports (i2p::data::RouterInfo::eAllTransports),
m_IsRecreated (false), m_Latency (0) m_IsRecreated (false), m_Latency (UNKNOWN_LATENCY)
{ {
} }
@ -198,10 +198,10 @@ namespace tunnel
return established; return established;
} }
bool Tunnel::LatencyFitsRange(uint64_t lower, uint64_t upper) const bool Tunnel::LatencyFitsRange(int lowerbound, int upperbound) const
{ {
auto latency = GetMeanLatency(); auto latency = GetMeanLatency();
return latency >= lower && latency <= upper; return latency >= lowerbound && latency <= upperbound;
} }
void Tunnel::EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out) void Tunnel::EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out)

View File

@ -39,6 +39,7 @@ namespace tunnel
const int TUNNEL_CREATION_TIMEOUT = 30; // 30 seconds const int TUNNEL_CREATION_TIMEOUT = 30; // 30 seconds
const int STANDARD_NUM_RECORDS = 4; // in VariableTunnelBuild message const int STANDARD_NUM_RECORDS = 4; // in VariableTunnelBuild message
const int MAX_NUM_RECORDS = 8; const int MAX_NUM_RECORDS = 8;
const int UNKNOWN_LATENCY = -1;
const int HIGH_LATENCY_PER_HOP = 250; // in milliseconds const int HIGH_LATENCY_PER_HOP = 250; // in milliseconds
const int MAX_TUNNEL_MSGS_BATCH_SIZE = 100; // handle messages without interrupt const int MAX_TUNNEL_MSGS_BATCH_SIZE = 100; // handle messages without interrupt
const uint16_t DEFAULT_MAX_NUM_TRANSIT_TUNNELS = 5000; const uint16_t DEFAULT_MAX_NUM_TRANSIT_TUNNELS = 5000;
@ -108,14 +109,14 @@ namespace tunnel
void EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out) override; void EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out) override;
/** @brief add latency sample */ /** @brief add latency sample */
void AddLatencySample(const uint64_t ms) { m_Latency = (m_Latency + ms) >> 1; } void AddLatencySample(const int us) { m_Latency = LatencyIsKnown() ? (m_Latency + us) >> 1 : us; }
/** @brief get this tunnel's estimated latency */ /** @brief get this tunnel's estimated latency */
uint64_t GetMeanLatency() const { return m_Latency; } int GetMeanLatency() const { return (m_Latency + 500) / 1000; }
/** @brief return true if this tunnel's latency fits in range [lowerbound, upperbound] */ /** @brief return true if this tunnel's latency fits in range [lowerbound, upperbound] */
bool LatencyFitsRange(uint64_t lowerbound, uint64_t upperbound) const; bool LatencyFitsRange(int lowerbound, int upperbound) const;
bool LatencyIsKnown() const { return m_Latency > 0; } bool LatencyIsKnown() const { return m_Latency != UNKNOWN_LATENCY; }
bool IsSlow () const { return LatencyIsKnown() && (int)m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); } bool IsSlow () const { return LatencyIsKnown() && m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); }
/** visit all hops we currently store */ /** visit all hops we currently store */
void VisitTunnelHops(TunnelHopVisitor v); void VisitTunnelHops(TunnelHopVisitor v);
@ -129,7 +130,7 @@ namespace tunnel
TunnelState m_State; TunnelState m_State;
i2p::data::RouterInfo::CompatibleTransports m_FarEndTransports; i2p::data::RouterInfo::CompatibleTransports m_FarEndTransports;
bool m_IsRecreated; // if tunnel is replaced by new, or new tunnel requested to replace bool m_IsRecreated; // if tunnel is replaced by new, or new tunnel requested to replace
uint64_t m_Latency; // in milliseconds int m_Latency; // in microseconds
}; };
class OutboundTunnel: public Tunnel class OutboundTunnel: public Tunnel

View File

@ -399,7 +399,7 @@ namespace tunnel
std::unique_lock<std::mutex> l(m_TestsMutex); std::unique_lock<std::mutex> l(m_TestsMutex);
m_Tests[msgID] = it; m_Tests[msgID] = it;
} }
auto msg = CreateDeliveryStatusMsg (msgID); auto msg = CreateTunnelTestMsg (msgID);
auto outbound = it.first; auto outbound = it.first;
auto s = shared_from_this (); auto s = shared_from_this ();
msg->onDrop = [msgID, outbound, s]() msg->onDrop = [msgID, outbound, s]()
@ -452,16 +452,23 @@ namespace tunnel
buf += 4; buf += 4;
uint64_t timestamp = bufbe64toh (buf); uint64_t timestamp = bufbe64toh (buf);
if (!ProcessDeliveryStatus (msgID, timestamp)) if (m_LocalDestination)
{ m_LocalDestination->ProcessDeliveryStatusMessage (msg);
if (m_LocalDestination) else
m_LocalDestination->ProcessDeliveryStatusMessage (msg); LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped");
else
LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped");
}
} }
bool TunnelPool::ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp) void TunnelPool::ProcessTunnelTest (std::shared_ptr<I2NPMessage> msg)
{
const uint8_t * buf = msg->GetPayload ();
uint32_t msgID = bufbe32toh (buf);
buf += 4;
uint64_t timestamp = bufbe64toh (buf);
ProcessTunnelTest (msgID, timestamp);
}
bool TunnelPool::ProcessTunnelTest (uint32_t msgID, uint64_t timestamp)
{ {
decltype(m_Tests)::mapped_type test; decltype(m_Tests)::mapped_type test;
bool found = false; bool found = false;
@ -477,8 +484,10 @@ namespace tunnel
} }
if (found) if (found)
{ {
uint64_t dlt = i2p::util::GetMillisecondsSinceEpoch () - timestamp; int dlt = (int)((int64_t)i2p::util::GetSteadyMicroseconds () - (int64_t)timestamp);
LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " milliseconds"); LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " microseconds");
if (dlt < 0) // should not happen
dlt = 0;
int numHops = 0; int numHops = 0;
if (test.first) numHops += test.first->GetNumHops (); if (test.first) numHops += test.first->GetNumHops ();
if (test.second) numHops += test.second->GetNumHops (); if (test.second) numHops += test.second->GetNumHops ();
@ -488,20 +497,20 @@ namespace tunnel
if (test.first->GetState () != eTunnelStateExpiring) if (test.first->GetState () != eTunnelStateExpiring)
test.first->SetState (eTunnelStateEstablished); test.first->SetState (eTunnelStateEstablished);
// update latency // update latency
uint64_t latency = 0; int latency = 0;
if (numHops) latency = dlt*test.first->GetNumHops ()/numHops; if (numHops) latency = dlt*test.first->GetNumHops ()/numHops;
if (!latency) latency = dlt/2; if (!latency) latency = dlt/2;
test.first->AddLatencySample(latency); test.first->AddLatencySample (latency);
} }
if (test.second) if (test.second)
{ {
if (test.second->GetState () != eTunnelStateExpiring) if (test.second->GetState () != eTunnelStateExpiring)
test.second->SetState (eTunnelStateEstablished); test.second->SetState (eTunnelStateEstablished);
// update latency // update latency
uint64_t latency = 0; int latency = 0;
if (numHops) latency = dlt*test.second->GetNumHops ()/numHops; if (numHops) latency = dlt*test.second->GetNumHops ()/numHops;
if (!latency) latency = dlt/2; if (!latency) latency = dlt/2;
test.second->AddLatencySample(latency); test.second->AddLatencySample (latency);
} }
} }
return found; return found;

View File

@ -85,7 +85,8 @@ namespace tunnel
void ManageTunnels (uint64_t ts); void ManageTunnels (uint64_t ts);
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg); void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg); void ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg);
bool ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp); void ProcessTunnelTest (std::shared_ptr<I2NPMessage> msg);
bool ProcessTunnelTest (uint32_t msgID, uint64_t timestamp);
bool IsExploratory () const; bool IsExploratory () const;
bool IsActive () const { return m_IsActive; }; bool IsActive () const { return m_IsActive; };
@ -105,7 +106,7 @@ namespace tunnel
bool HasCustomPeerSelector(); bool HasCustomPeerSelector();
/** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */ /** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */
void RequireLatency(uint64_t min, uint64_t max) { m_MinLatency = min; m_MaxLatency = max; } void RequireLatency(int min, int max) { m_MinLatency = min; m_MaxLatency = max; }
/** @brief return true if this tunnel pool has a latency requirement */ /** @brief return true if this tunnel pool has a latency requirement */
bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; } bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; }
@ -150,8 +151,8 @@ namespace tunnel
std::mutex m_CustomPeerSelectorMutex; std::mutex m_CustomPeerSelectorMutex;
ITunnelPeerSelector * m_CustomPeerSelector; ITunnelPeerSelector * m_CustomPeerSelector;
uint64_t m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms int m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms
uint64_t m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms int m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms
std::random_device m_Rd; std::random_device m_Rd;
std::mt19937 m_Rng; std::mt19937 m_Rng;