diff --git a/ClientContext.cpp b/ClientContext.cpp index 7ec699cb..31f5e640 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -372,6 +372,8 @@ namespace client options[I2CP_PARAM_INBOUND_TUNNELS_QUANTITY] = GetI2CPOption (section, I2CP_PARAM_INBOUND_TUNNELS_QUANTITY, DEFAULT_INBOUND_TUNNELS_QUANTITY); options[I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY] = GetI2CPOption (section, I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY, DEFAULT_OUTBOUND_TUNNELS_QUANTITY); options[I2CP_PARAM_TAGS_TO_SEND] = GetI2CPOption (section, I2CP_PARAM_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND); + options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MIN_TUNNEL_LATENCY, DEFAULT_MIN_TUNNEL_LATENCY); + options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MAX_TUNNEL_LATENCY, DEFAULT_MAX_TUNNEL_LATENCY); } void ClientContext::ReadI2CPOptionsFromConfig (const std::string& prefix, std::map& options) const @@ -384,7 +386,11 @@ namespace client if (i2p::config::GetOption(prefix + I2CP_PARAM_OUTBOUND_TUNNEL_LENGTH, value)) options[I2CP_PARAM_OUTBOUND_TUNNEL_LENGTH] = value; if (i2p::config::GetOption(prefix + I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY, value)) - options[I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY] = value; + options[I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY] = value; + if (i2p::config::GetOption(prefix + I2CP_PARAM_MIN_TUNNEL_LATENCY, value)) + options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = value; + if (i2p::config::GetOption(prefix + I2CP_PARAM_MAX_TUNNEL_LATENCY, value)) + options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = value; } void ClientContext::ReadTunnels () diff --git a/Config.cpp b/Config.cpp index 8d932048..ed22c561 100644 --- a/Config.cpp +++ b/Config.cpp @@ -86,7 +86,9 @@ namespace config { ("httpproxy.inbound.length", value()->default_value("3"), "HTTP proxy inbound tunnel length") ("httpproxy.outbound.length", value()->default_value("3"), "HTTP proxy outbound tunnel length") ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") - ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") + ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") + ("httpproxy.latency.min", value()->default_value(0), "HTTP proxy min latency for tunnels") + ("httpproxy.latency.max", value()->default_value(0), "HTTP proxy max latency for tunnels") ; options_description socksproxy("SOCKS Proxy options"); @@ -98,7 +100,9 @@ namespace config { ("socksproxy.inbound.length", value()->default_value("3"), "SOCKS proxy inbound tunnel length") ("socksproxy.outbound.length", value()->default_value("3"), "SOCKS proxy outbound tunnel length") ("socksproxy.inbound.quantity", value()->default_value("5"), "SOCKS proxy inbound tunnels quantity") - ("socksproxy.outbound.quantity", value()->default_value("5"), "SOCKS proxy outbound tunnels quantity") + ("socksproxy.outbound.quantity", value()->default_value("5"), "SOCKS proxy outbound tunnels quantity") + ("socksproxy.latency.min", value()->default_value(0), "SOCKS proxy min latency for tunnels") + ("socksproxy.latency.max", value()->default_value(0), "SOCKS proxy max latency for tunnels") ("socksproxy.outproxy", value()->default_value("127.0.0.1"), "Upstream outproxy address for SOCKS Proxy") ("socksproxy.outproxyport", value()->default_value(9050), "Upstream outproxy port for SOCKS Proxy") ; diff --git a/Destination.cpp b/Destination.cpp index 9df3b438..f0bc52cb 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -63,6 +63,22 @@ namespace client m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (inLen, outLen, inQty, outQty); if (explicitPeers) m_Pool->SetExplicitPeers (explicitPeers); + if(params) + { + auto itr = params->find(I2CP_PARAM_MAX_TUNNEL_LATENCY); + if (itr != params->end()) { + auto maxlatency = std::stoi(itr->second); + itr = params->find(I2CP_PARAM_MIN_TUNNEL_LATENCY); + if (itr != params->end()) { + auto minlatency = std::stoi(itr->second); + if ( minlatency > 0 && maxlatency > 0 ) { + // set tunnel pool latency + LogPrint(eLogInfo, "Destination: requiring tunnel latency [", minlatency, "ms, ", maxlatency, "ms]"); + m_Pool->RequireLatency(minlatency, maxlatency); + } + } + } + } } LeaseSetDestination::~LeaseSetDestination () diff --git a/Destination.h b/Destination.h index 121b7e16..0d3b3b4a 100644 --- a/Destination.h +++ b/Destination.h @@ -50,6 +50,12 @@ namespace client const char I2CP_PARAM_TAGS_TO_SEND[] = "crypto.tagsToSend"; const int DEFAULT_TAGS_TO_SEND = 40; + // latency + const char I2CP_PARAM_MIN_TUNNEL_LATENCY[] = "latency.min"; + const int DEFAULT_MIN_TUNNEL_LATENCY = 0; + const char I2CP_PARAM_MAX_TUNNEL_LATENCY[] = "latency.max"; + const int DEFAULT_MAX_TUNNEL_LATENCY = 0; + typedef std::function stream)> StreamRequestComplete; class LeaseSetDestination: public i2p::garlic::GarlicDestination, diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 556051aa..bf0a3159 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -300,12 +300,16 @@ namespace http { s << "Inbound tunnels:
\r\n"; for (auto & it : pool->GetInboundTunnels ()) { it->Print(s); + if(it->LatencyIsKnown()) + s << " ( " << it->GetMeanLatency() << "ms )"; ShowTunnelDetails(s, it->GetState (), it->GetNumReceivedBytes ()); } s << "
\r\n"; s << "Outbound tunnels:
\r\n"; for (auto & it : pool->GetOutboundTunnels ()) { it->Print(s); + if(it->LatencyIsKnown()) + s << " ( " << it->GetMeanLatency() << "ms )"; ShowTunnelDetails(s, it->GetState (), it->GetNumSentBytes ()); } } @@ -401,12 +405,16 @@ namespace http { s << "Inbound tunnels:
\r\n"; for (auto & it : i2p::tunnel::tunnels.GetInboundTunnels ()) { it->Print(s); + if(it->LatencyIsKnown()) + s << " ( " << it->GetMeanLatency() << "ms )"; ShowTunnelDetails(s, it->GetState (), it->GetNumReceivedBytes ()); } s << "
\r\n"; s << "Outbound tunnels:
\r\n"; for (auto & it : i2p::tunnel::tunnels.GetOutboundTunnels ()) { it->Print(s); + if(it->LatencyIsKnown()) + s << " ( " << it->GetMeanLatency() << "ms )"; ShowTunnelDetails(s, it->GetState (), it->GetNumSentBytes ()); } s << "
\r\n"; diff --git a/Tunnel.cpp b/Tunnel.cpp index faba99d3..e1f5c035 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -21,6 +21,31 @@ namespace i2p namespace tunnel { + void TunnelLatency::AddSample(Sample s) + { + std::unique_lock l(m_access); + m_samples.push_back(s); + } + + bool TunnelLatency::HasSamples() const + { + std::unique_lock l(m_access); + return m_samples.size() > 0; + } + + TunnelLatency::Latency TunnelLatency::GetMeanLatency() const + { + std::unique_lock lock(m_access); + if (m_samples.size() > 0) { + Latency l = 0; + for(auto s : m_samples) + l += s; + return l / m_samples.size(); + } + return 0; + } + + Tunnel::Tunnel (std::shared_ptr config): TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()), m_Config (config), m_Pool (nullptr), m_State (eTunnelStatePending), m_IsRecreated (false) @@ -162,6 +187,12 @@ namespace tunnel return established; } + bool Tunnel::LatencyFitsRange(uint64_t lower, uint64_t upper) const + { + auto latency = GetMeanLatency(); + return latency >= lower && latency <= upper; + } + void Tunnel::EncryptTunnelMsg (std::shared_ptr in, std::shared_ptr out) { const uint8_t * inPayload = in->GetPayload () + 4; diff --git a/Tunnel.h b/Tunnel.h index 53c93e90..f3c31461 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -79,6 +79,21 @@ namespace tunnel eTunnelStateExpiring }; + /** @brief for storing latency history */ + struct TunnelLatency + { + typedef uint64_t Sample; + typedef uint64_t Latency; + + + void AddSample(Sample s); + bool HasSamples() const; + Latency GetMeanLatency() const; + + std::vector m_samples; + mutable std::mutex m_access; + }; + class OutboundTunnel; class InboundTunnel; class Tunnel: public TunnelBase @@ -118,6 +133,14 @@ namespace tunnel void SendTunnelDataMsg (std::shared_ptr msg); void EncryptTunnelMsg (std::shared_ptr in, std::shared_ptr out); + /** @brief add latency sample */ + void AddLatencySample(const uint64_t ms) { m_Latency.AddSample(ms); } + /** @brief get this tunnel's estimated latency */ + uint64_t GetMeanLatency() const { return m_Latency.GetMeanLatency(); } + /** @breif return true if this tunnel's latency fits in range [lowerbound, upperbound] */ + bool LatencyFitsRange(uint64_t lowerbound, uint64_t upperbound) const; + + bool LatencyIsKnown() const { return m_Latency.HasSamples(); } protected: void PrintHops (std::stringstream& s) const; @@ -129,6 +152,7 @@ namespace tunnel std::shared_ptr m_Pool; // pool, tunnel belongs to, or null TunnelState m_State; bool m_IsRecreated; + TunnelLatency m_Latency; }; class OutboundTunnel: public Tunnel diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 56d72bfb..ccd4c12c 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -147,13 +147,13 @@ namespace tunnel std::shared_ptr TunnelPool::GetNextOutboundTunnel (std::shared_ptr excluded) const { - std::unique_lock l(m_OutboundTunnelsMutex); + std::unique_lock l(m_OutboundTunnelsMutex); return GetNextTunnel (m_OutboundTunnels, excluded); } std::shared_ptr TunnelPool::GetNextInboundTunnel (std::shared_ptr excluded) const { - std::unique_lock l(m_InboundTunnelsMutex); + std::unique_lock l(m_InboundTunnelsMutex); return GetNextTunnel (m_InboundTunnels, excluded); } @@ -167,11 +167,27 @@ namespace tunnel { if (it->IsEstablished () && it != excluded) { + if(HasLatencyRequirement() && it->LatencyIsKnown() && !it->LatencyFitsRange(m_MinLatency, m_MaxLatency)) { + i ++; + continue; + } tunnel = it; i++; } if (i > ind && tunnel) break; } + if(HasLatencyRequirement() && !tunnel) { + ind = rand () % (tunnels.size ()/2 + 1), i = 0; + for (const auto& it: tunnels) + { + if (it->IsEstablished () && it != excluded) + { + tunnel = it; + i++; + } + if (i > ind && tunnel) break; + } + } if (!tunnel && excluded && excluded->IsEstablished ()) tunnel = excluded; return tunnel; } @@ -322,7 +338,12 @@ namespace tunnel test.first->SetState (eTunnelStateEstablished); if (test.second->GetState () == eTunnelStateTestFailed) test.second->SetState (eTunnelStateEstablished); - LogPrint (eLogDebug, "Tunnels: test of ", msgID, " successful. ", i2p::util::GetMillisecondsSinceEpoch () - timestamp, " milliseconds"); + uint64_t dlt = i2p::util::GetMillisecondsSinceEpoch () - timestamp; + LogPrint (eLogDebug, "Tunnels: test of ", msgID, " successful. ", dlt, " milliseconds"); + // update latency + uint64_t latency = dlt / 2; + test.first->AddLatencySample(latency); + test.second->AddLatencySample(latency); } else { @@ -523,5 +544,37 @@ namespace tunnel std::lock_guard lock(m_CustomPeerSelectorMutex); return m_CustomPeerSelector != nullptr; } + + std::shared_ptr TunnelPool::GetLowestLatencyInboundTunnel(std::shared_ptr exclude) const + { + std::shared_ptr tun = nullptr; + std::unique_lock lock(m_InboundTunnelsMutex); + uint64_t min = 1000000; + for (const auto & itr : m_InboundTunnels) { + if(!itr->LatencyIsKnown()) continue; + auto l = itr->GetMeanLatency(); + if (l >= min) continue; + tun = itr; + if(tun == exclude) continue; + min = l; + } + return tun; + } + + std::shared_ptr TunnelPool::GetLowestLatencyOutboundTunnel(std::shared_ptr exclude) const + { + std::shared_ptr tun = nullptr; + std::unique_lock lock(m_OutboundTunnelsMutex); + uint64_t min = 1000000; + for (const auto & itr : m_OutboundTunnels) { + if(!itr->LatencyIsKnown()) continue; + auto l = itr->GetMeanLatency(); + if (l >= min) continue; + tun = itr; + if(tun == exclude) continue; + min = l; + } + return tun; + } } } diff --git a/TunnelPool.h b/TunnelPool.h index d5bcf18f..6a73bd67 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -69,6 +69,17 @@ namespace tunnel void SetCustomPeerSelector(TunnelPeerSelector selector); void UnsetCustomPeerSelector(); bool HasCustomPeerSelector(); + + /** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */ + void RequireLatency(uint64_t min, uint64_t max) { m_MinLatency = min; m_MaxLatency = max; } + + /** @brief return true if this tunnel pool has a latency requirement */ + bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; } + + /** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */ + std::shared_ptr GetLowestLatencyInboundTunnel(std::shared_ptr exclude=nullptr) const; + std::shared_ptr GetLowestLatencyOutboundTunnel(std::shared_ptr exclude=nullptr) const; + private: void CreateInboundTunnel (); @@ -94,6 +105,10 @@ namespace tunnel bool m_IsActive; std::mutex m_CustomPeerSelectorMutex; TunnelPeerSelector m_CustomPeerSelector; + + uint64_t m_MinLatency=0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms + uint64_t m_MaxLatency=0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms + public: // for HTTP only