diff --git a/AddressBook.cpp b/AddressBook.cpp index b3f0f1ce..b993f456 100644 --- a/AddressBook.cpp +++ b/AddressBook.cpp @@ -546,8 +546,8 @@ namespace client auto datagram = dest->GetDatagramDestination (); if (!datagram) datagram = dest->CreateDatagramDestination (); - datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), + datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), ADDRESS_RESPONSE_DATAGRAM_PORT); } } @@ -558,8 +558,7 @@ namespace client if (dest) { auto datagram = dest->GetDatagramDestination (); - if (datagram) - datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT); + if (datagram) datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT); } } diff --git a/ClientContext.cpp b/ClientContext.cpp index aa18ecdf..b7300ad2 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -39,8 +39,9 @@ namespace client m_SharedLocalDestination->Start (); } + m_AddressBook.Start (); - + std::shared_ptr localDestination; bool httproxy; i2p::config::GetOption("httpproxy.enabled", httproxy); if (httproxy) { @@ -51,8 +52,10 @@ namespace client if (httpProxyKeys.length () > 0) { i2p::data::PrivateKeys keys; - LoadPrivateKeys (keys, httpProxyKeys); - localDestination = CreateNewLocalDestination (keys, false); + if(LoadPrivateKeys (keys, httpProxyKeys)) + localDestination = CreateNewLocalDestination (keys, false); + else + LogPrint(eLogError, "Clients: failed to load HTTP Proxy key"); } try { m_HttpProxy = new i2p::proxy::HTTPProxy(httpProxyAddr, httpProxyPort, localDestination); @@ -61,7 +64,7 @@ namespace client LogPrint(eLogError, "Clients: Exception in HTTP Proxy: ", e.what()); } } - + bool socksproxy; i2p::config::GetOption("socksproxy.enabled", socksproxy); if (socksproxy) { std::string socksProxyKeys; i2p::config::GetOption("socksproxy.keys", socksProxyKeys); @@ -83,10 +86,10 @@ namespace client LogPrint(eLogError, "Clients: Exception in SOCKS Proxy: ", e.what()); } } - - // I2P tunnels - ReadTunnels (); + // I2P tunnels + ReadTunnels (); + // SAM bool sam; i2p::config::GetOption("sam.enabled", sam); if (sam) { @@ -134,6 +137,13 @@ namespace client } m_AddressBook.StartResolvers (); + + // start UDP cleanup + if (!m_ServerForwards.empty ()) + { + m_CleanupUDPTimer.reset (new boost::asio::deadline_timer(m_SharedLocalDestination->GetService ())); + ScheduleCleanupUDP(); + } } void ClientContext::Stop () @@ -193,20 +203,37 @@ namespace client } LogPrint(eLogInfo, "Clients: stopping AddressBook"); - m_AddressBook.Stop (); + m_AddressBook.Stop (); + + { + std::lock_guard lock(m_ForwardsMutex); + m_ServerForwards.clear(); + m_ClientForwards.clear(); + } + + if (m_CleanupUDPTimer) + { + m_CleanupUDPTimer->cancel (); + m_CleanupUDPTimer = nullptr; + } + for (auto& it: m_Destinations) it.second->Stop (); m_Destinations.clear (); - m_SharedLocalDestination = nullptr; + m_SharedLocalDestination = nullptr; } void ClientContext::ReloadConfig () { - ReadTunnels (); // TODO: it reads new tunnels only, should be implemented better + std::string config; i2p::config::GetOption("conf", config); + i2p::config::ParseConfig(config); + Stop(); + Start(); } - void ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType) + bool ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType) { + bool success = true; std::string fullPath = i2p::fs::DataDirPath (filename); std::ifstream s(fullPath, std::ifstream::binary); if (s.is_open ()) @@ -216,9 +243,14 @@ namespace client s.seekg (0, std::ios::beg); uint8_t * buf = new uint8_t[len]; s.read ((char *)buf, len); - keys.FromBuffer (buf, len); + if(!keys.FromBuffer (buf, len)) + { + LogPrint (eLogError, "Clients: failed to load keyfile ", filename); + success = false; + } + else + LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded"); delete[] buf; - LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded"); } else { @@ -232,7 +264,31 @@ namespace client delete[] buf; LogPrint (eLogInfo, "Clients: New private keys file ", fullPath, " for ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " created"); - } + } + return success; + } + + std::vector > ClientContext::GetForwardInfosFor(const i2p::data::IdentHash & destination) + { + std::vector > infos; + std::lock_guard lock(m_ForwardsMutex); + for(const auto & c : m_ClientForwards) + { + if (c.second->IsLocalDestination(destination)) + { + for (auto & i : c.second->GetSessions()) infos.push_back(i); + break; + } + } + for(const auto & s : m_ServerForwards) + { + if(std::get<0>(s.first) == destination) + { + for( auto & i : s.second->GetSessions()) infos.push_back(i); + break; + } + } + return infos; } std::shared_ptr ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType, @@ -337,7 +393,7 @@ namespace client try { std::string type = section.second.get (I2P_TUNNELS_SECTION_TYPE); - if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT) + if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { // mandatory params std::string dest = section.second.get (I2P_CLIENT_TUNNEL_DESTINATION); @@ -355,22 +411,43 @@ namespace client if (keys.length () > 0) { i2p::data::PrivateKeys k; - LoadPrivateKeys (k, keys, sigType); - localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); + if(LoadPrivateKeys (k, keys, sigType)) + { + localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); + if (!localDestination) + localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options); + } + } + if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { + // udp client + // TODO: hostnames + boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); if (!localDestination) - localDestination = CreateNewLocalDestination (k, false, &options); + { + localDestination = m_SharedLocalDestination; + } + auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort); + if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start(); + } + else + LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); + + } else { + // tcp client + auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); + if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), + std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start (); + numClientTunnels++; + } + else + LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); } - auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); - if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), - std::unique_ptr(clientTunnel))).second) - { - clientTunnel->Start (); - numClientTunnels++; - } - else - LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); } - else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC) + else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { // mandatory params std::string host = section.second.get (I2P_SERVER_TUNNEL_HOST); @@ -383,17 +460,43 @@ namespace client std::string webircpass = section.second.get (I2P_SERVER_TUNNEL_WEBIRC_PASSWORD, ""); bool gzip = section.second.get (I2P_SERVER_TUNNEL_GZIP, true); i2p::data::SigningKeyType sigType = section.second.get (I2P_SERVER_TUNNEL_SIGNATURE_TYPE, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); + uint32_t maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN); + std::string address = section.second.get (I2P_SERVER_TUNNEL_ADDRESS, "127.0.0.1"); + // I2CP std::map options; ReadI2CPOptions (section, options); std::shared_ptr localDestination = nullptr; i2p::data::PrivateKeys k; - LoadPrivateKeys (k, keys, sigType); + if(!LoadPrivateKeys (k, keys, sigType)) + continue; localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); if (!localDestination) localDestination = CreateNewLocalDestination (k, true, &options); - + if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) + { + // udp server tunnel + // TODO: hostnames + auto localAddress = boost::asio::ip::address::from_string(address); + boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); + I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); + std::lock_guard lock(m_ForwardsMutex); + if(m_ServerForwards.insert( + std::make_pair( + std::make_pair( + localDestination->GetIdentHash(), port), + std::unique_ptr(serverTunnel))).second) + { + serverTunnel->Start(); + LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32()); + } + else + LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); + + continue; + } + I2PServerTunnel * serverTunnel; if (type == I2P_TUNNELS_SECTION_TYPE_HTTP) serverTunnel = new I2PServerTunnelHTTP (name, host, port, localDestination, hostOverride, inPort, gzip); @@ -402,6 +505,10 @@ namespace client else // regular server tunnel by default serverTunnel = new I2PServerTunnel (name, host, port, localDestination, inPort, gzip); + LogPrint(eLogInfo, "Clients: Set Max Conns To ", maxConns); + serverTunnel->SetMaxConnsPerMinute(maxConns); + + if (accessList.length () > 0) { std::set idents; @@ -439,6 +546,26 @@ namespace client } LogPrint (eLogInfo, "Clients: ", numClientTunnels, " I2P client tunnels created"); LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created"); - } + } + + void ClientContext::ScheduleCleanupUDP() + { + if (m_CleanupUDPTimer) + { + // schedule cleanup in 17 seconds + m_CleanupUDPTimer->expires_from_now (boost::posix_time::seconds (17)); + m_CleanupUDPTimer->async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); + } + } + + void ClientContext::CleanupUDP(const boost::system::error_code & ecode) + { + if(!ecode) + { + std::lock_guard lock(m_ForwardsMutex); + for (auto & s : m_ServerForwards ) s.second->ExpireStale(); + ScheduleCleanupUDP(); + } + } } } diff --git a/ClientContext.h b/ClientContext.h index f5696902..1a41acfc 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -24,6 +24,8 @@ namespace client const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server"; const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http"; const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; + const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; + const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; @@ -39,7 +41,8 @@ namespace client const char I2P_SERVER_TUNNEL_ACCESS_LIST[] = "accesslist"; const char I2P_SERVER_TUNNEL_GZIP[] = "gzip"; const char I2P_SERVER_TUNNEL_WEBIRC_PASSWORD[] = "webircpassword"; - + const char I2P_SERVER_TUNNEL_ADDRESS[] = "address"; + class ClientContext { public: @@ -59,11 +62,13 @@ namespace client const std::map * params = nullptr); void DeleteLocalDestination (std::shared_ptr destination); std::shared_ptr FindLocalDestination (const i2p::data::IdentHash& destination) const; - void LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); + bool LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); AddressBook& GetAddressBook () { return m_AddressBook; }; const SAMBridge * GetSAMBridge () const { return m_SamBridge; }; - + + std::vector > GetForwardInfosFor(const i2p::data::IdentHash & destination); + private: void ReadTunnels (); @@ -72,6 +77,9 @@ namespace client template void ReadI2CPOptions (const Section& section, std::map& options) const; + void CleanupUDP(const boost::system::error_code & ecode); + void ScheduleCleanupUDP(); + private: std::mutex m_DestinationsMutex; @@ -84,15 +92,24 @@ namespace client i2p::proxy::SOCKSProxy * m_SocksProxy; std::map > m_ClientTunnels; // local endpoint->tunnel std::map, std::unique_ptr > m_ServerTunnels; // ->tunnel + + std::mutex m_ForwardsMutex; + std::map > m_ClientForwards; // local endpoint -> udp tunnel + std::map, std::unique_ptr > m_ServerForwards; // -> udp tunnel + SAMBridge * m_SamBridge; BOBCommandChannel * m_BOBCommandChannel; I2CPServer * m_I2CPServer; + std::unique_ptr m_CleanupUDPTimer; + public: // for HTTP const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; }; const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; }; + const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; } + const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; } }; extern ClientContext context; diff --git a/Config.cpp b/Config.cpp index a629f2d4..33231aa4 100644 --- a/Config.cpp +++ b/Config.cpp @@ -151,6 +151,23 @@ namespace config { options_description reseed("Reseed options"); reseed.add_options() ("reseed.file", value()->default_value(""), "Path to .su3 file") +#ifdef MESHNET + ("reseed.urls", value()->default_value("https://reseed.i2p.rocks:8443/"), "Reseed URLs, separated by comma") +#else + ("reseed.urls", value()->default_value( + "https://reseed.i2p-projekt.de/," + "https://i2p.mooo.com/netDb/," + "https://netdb.i2p2.no/," + "https://us.reseed.i2p2.no:444/," + "https://uk.reseed.i2p2.no:444/," + "https://i2p.manas.ca:8443/," + "https://i2p-0.manas.ca:8443/," + "https://reseed.i2p.vzaws.com:8443/," + "https://user.mx24.eu/," + "https://download.xxlspeed.com/," + "https://reseed-ru.lngserv.ru/" + ), "Reseed URLs, separated by comma") +#endif ; options_description trust("Trust options"); diff --git a/DaemonLinux.cpp b/DaemonLinux.cpp index f6664926..22d7dec8 100644 --- a/DaemonLinux.cpp +++ b/DaemonLinux.cpp @@ -20,7 +20,7 @@ void handle_signal(int sig) switch (sig) { case SIGHUP: - LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening log..."); + LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening logs and tunnel configuration..."); i2p::log::Logger().Reopen (); i2p::client::context.ReloadConfig(); break; diff --git a/Datagram.cpp b/Datagram.cpp index 2015622c..b9188864 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -12,72 +12,41 @@ namespace i2p namespace datagram { DatagramDestination::DatagramDestination (std::shared_ptr owner): - m_Owner (owner), m_Receiver (nullptr) + m_Owner (owner.get()), m_Receiver (nullptr) { } DatagramDestination::~DatagramDestination () { + m_Sessions.clear(); } void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) { + auto owner = m_Owner; + auto i = owner->GetIdentity(); uint8_t buf[MAX_DATAGRAM_SIZE]; - auto identityLen = m_Owner->GetIdentity ()->ToBuffer (buf, MAX_DATAGRAM_SIZE); + auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE); uint8_t * signature = buf + identityLen; - auto signatureLen = m_Owner->GetIdentity ()->GetSignatureLen (); + auto signatureLen = i->GetSignatureLen (); uint8_t * buf1 = signature + signatureLen; size_t headerLen = identityLen + signatureLen; memcpy (buf1, payload, len); - if (m_Owner->GetIdentity ()->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) + if (i->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) { uint8_t hash[32]; SHA256(buf1, len, hash); - m_Owner->Sign (hash, 32, signature); + owner->Sign (hash, 32, signature); } else - m_Owner->Sign (buf1, len, signature); + owner->Sign (buf1, len, signature); - auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto remote = m_Owner->FindLeaseSet (ident); - if (remote) - m_Owner->GetService ().post (std::bind (&DatagramDestination::SendMsg, this, msg, remote)); - else - m_Owner->RequestDestination (ident, std::bind (&DatagramDestination::HandleLeaseSetRequestComplete, this, std::placeholders::_1, msg)); + auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); + auto session = ObtainSession(ident); + session->SendMsg(msg); } - void DatagramDestination::HandleLeaseSetRequestComplete (std::shared_ptr remote, std::shared_ptr msg) - { - if (remote) - SendMsg (msg, remote); - } - - void DatagramDestination::SendMsg (std::shared_ptr msg, std::shared_ptr remote) - { - auto outboundTunnel = m_Owner->GetTunnelPool ()->GetNextOutboundTunnel (); - auto leases = remote->GetNonExpiredLeases (); - if (!leases.empty () && outboundTunnel) - { - std::vector msgs; - uint32_t i = rand () % leases.size (); - auto garlic = m_Owner->WrapMessage (remote, msg, true); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { - i2p::tunnel::eDeliveryTypeTunnel, - leases[i]->tunnelGateway, leases[i]->tunnelID, - garlic - }); - outboundTunnel->SendTunnelDataMsg (msgs); - } - else - { - if (outboundTunnel) - LogPrint (eLogWarning, "Failed to send datagram. All leases expired"); - else - LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels"); - } - } void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { @@ -98,18 +67,26 @@ namespace datagram if (verified) { - auto it = m_ReceiversByPorts.find (toPort); - if (it != m_ReceiversByPorts.end ()) - it->second (identity, fromPort, toPort, buf + headerLen, len -headerLen); - else if (m_Receiver != nullptr) - m_Receiver (identity, fromPort, toPort, buf + headerLen, len -headerLen); + auto r = FindReceiver(toPort); + if(r) + r(identity, fromPort, toPort, buf + headerLen, len -headerLen); else - LogPrint (eLogWarning, "Receiver for datagram is not set"); + LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort); } else LogPrint (eLogWarning, "Datagram signature verification failed"); } + DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port) + { + std::lock_guard lock(m_ReceiversMutex); + Receiver r = m_Receiver; + auto itr = m_ReceiversByPorts.find(port); + if (itr != m_ReceiversByPorts.end()) + r = itr->second; + return r; + } + void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { // unzip it @@ -137,7 +114,328 @@ namespace datagram else msg = nullptr; return msg; - } + } + + void DatagramDestination::CleanUp () + { + if (m_Sessions.empty ()) return; + auto now = i2p::util::GetMillisecondsSinceEpoch(); + LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); + std::lock_guard lock(m_SessionsMutex); + // for each session ... + for (auto it = m_Sessions.begin (); it != m_Sessions.end (); ) + { + // check if expired + if (now - it->second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) + { + LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", it->first.ToBase32()); + it = m_Sessions.erase (it); // we are expired + } + else + it++; + } + } + + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + { + std::shared_ptr session = nullptr; + std::lock_guard lock(m_SessionsMutex); + auto itr = m_Sessions.find(ident); + if (itr == m_Sessions.end()) { + // not found, create new session + session = std::make_shared(m_Owner, ident); + m_Sessions[ident] = session; + } else { + session = itr->second; + } + return session; + } + + std::shared_ptr DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote) + { + std::lock_guard lock(m_SessionsMutex); + for ( auto & item : m_Sessions) + { + if(item.first == remote) return std::make_shared(item.second->GetSessionInfo()); + } + return nullptr; + } + + DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent) : + m_LocalDestination(localDestination), + m_RemoteIdentity(remoteIdent), + m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()), + m_LastPathChange(0), + m_LastSuccess(0) + { + } + + void DatagramSession::SendMsg(std::shared_ptr msg) + { + // we used this session + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + // schedule send + m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); + } + + DatagramSession::Info DatagramSession::GetSessionInfo() const + { + if(!m_RoutingSession) + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if (!routingPath) + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + auto lease = routingPath->remoteLease; + auto tunnel = routingPath->outboundTunnel; + if(lease) + { + if(tunnel) + return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + else + return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse, m_LastSuccess); + } + else if(tunnel) + return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + else + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + } + + void DatagramSession::HandleSend(std::shared_ptr msg) + { + if(!m_RoutingSession) + { + // try to get one + if(m_RemoteLeaseSet) m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); + else + { + UpdateLeaseSet(msg); + return; + } + } + // do we have a routing session? + if(m_RoutingSession) + { + // should we switch paths? + if(ShouldUpdateRoutingPath ()) + { + LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); + // try switching paths + auto path = GetNextRoutingPath(); + if(path) + UpdateRoutingPath (path); + else + ResetRoutingPath(); + } + auto routingPath = m_RoutingSession->GetSharedRoutingPath (); + // make sure we have a routing path + if (routingPath) + { + auto outboundTunnel = routingPath->outboundTunnel; + if (outboundTunnel) + { + if(outboundTunnel->IsEstablished()) + { + m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch (); + // we have a routing path and routing session and the outbound tunnel we are using is good + // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW + auto m = m_RoutingSession->WrapSingleMessage(msg); + routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ + i2p::tunnel::eDeliveryTypeTunnel, + routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, + m + }}); + return; + } + } + } + } + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time + if (m_LastPathChange && now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath(); + UpdateLeaseSet(msg); + + } + + void DatagramSession::UpdateRoutingPath(const std::shared_ptr & path) + { + if(m_RoutingSession == nullptr && m_RemoteLeaseSet) + m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); + if(!m_RoutingSession) return; + // set routing path and update time we last updated the routing path + m_RoutingSession->SetSharedRoutingPath (path); + m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch (); + } + + bool DatagramSession::ShouldUpdateRoutingPath() const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // we need to rotate paths becuase the routing path is too old + if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true; + // our path looks dead so we need to rotate paths + if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return true; + // if we have a routing session and routing path we don't need to switch paths + return m_RoutingSession != nullptr && m_RoutingSession->GetSharedRoutingPath () != nullptr; + } + + + bool DatagramSession::ShouldSwitchLease() const + { + std::shared_ptr routingPath = nullptr; + std::shared_ptr currentLease = nullptr; + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath (); + if(routingPath) + currentLease = routingPath->remoteLease; + if(currentLease) // if we have a lease return true if it's about to expire otherwise return false + return currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE ); + // we have no current lease, we should switch + return true; + } + + std::shared_ptr DatagramSession::GetNextRoutingPath() + { + std::shared_ptr outboundTunnel = nullptr; + std::shared_ptr routingPath = nullptr; + // get existing routing path if we have one + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath(); + // do we have an existing outbound tunnel and routing path? + if(routingPath && routingPath->outboundTunnel) + { + // is the outbound tunnel we are using good? + if (routingPath->outboundTunnel->IsEstablished()) + { + // ya so let's stick with it + outboundTunnel = routingPath->outboundTunnel; + } + else + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + } + // do we have an outbound tunnel that works already ? + if(!outboundTunnel) + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started + + if(outboundTunnel) + { + std::shared_ptr lease = nullptr; + // should we switch leases ? + if (ShouldSwitchLease ()) + { + // yes, get next available lease + lease = GetNextLease(); + } + else if (routingPath) + { + if(routingPath->remoteLease) + { + if(routingPath->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE)) + lease = GetNextLease(); + else + lease = routingPath->remoteLease; + } + } + else + lease = GetNextLease(); + if(lease) + { + // we have a valid lease to use and an outbound tunnel + // create new routing path + uint32_t now = i2p::util::GetSecondsSinceEpoch(); + routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ + outboundTunnel, + lease, + 0, + now, + 0 + }); + } + else // we don't have a new routing path to give + routingPath = nullptr; + } + return routingPath; + } + + void DatagramSession::ResetRoutingPath() + { + if(m_RoutingSession) + { + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path + { + // get outbound tunnel on this path + auto outboundTunnel = routingPath->outboundTunnel; + // is this outbound tunnel there and established + if (outboundTunnel && outboundTunnel->IsEstablished()) + m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine + } + // reset the routing path + UpdateRoutingPath(nullptr); + } + } + + std::shared_ptr DatagramSession::GetNextLease() + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + std::shared_ptr next = nullptr; + if(m_RemoteLeaseSet) + { + std::vector exclude; + for(const auto & ident : m_InvalidIBGW) + exclude.push_back(ident); + // find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge + auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool { + if(exclude.size()) + { + auto end = std::end(exclude); + return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool { + return ident == l.tunnelGateway; + }) != end; + } + else + return false; + }); + if(leases.size()) + { + // pick random valid next lease + uint32_t idx = rand() % leases.size(); + next = leases[idx]; + } + else + LogPrint(eLogWarning, "DatagramDestination: no leases to use"); + } + return next; + } + + void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) + { + LogPrint(eLogInfo, "DatagramSession: updating lease set"); + m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); + } + + void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) + { + if(remoteIdent) + { + // update routing session + if(m_RoutingSession) + m_RoutingSession = nullptr; + m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); + // clear invalid IBGW as we have a new lease set + m_InvalidIBGW.clear(); + m_RemoteLeaseSet = remoteIdent; + // update routing path + auto path = GetNextRoutingPath(); + if (path) + UpdateRoutingPath(path); + else + ResetRoutingPath(); + // send the message that was queued if it was provided + if(msg) + HandleSend(msg); + } + } } } diff --git a/Datagram.h b/Datagram.h index c593fad2..f3aae6f9 100644 --- a/Datagram.h +++ b/Datagram.h @@ -9,6 +9,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Garlic.h" namespace i2p { @@ -18,13 +19,97 @@ namespace client } namespace datagram { - const size_t MAX_DATAGRAM_SIZE = 32768; + // milliseconds for max session idle time + const uint64_t DATAGRAM_SESSION_MAX_IDLE = 10 * 60 * 1000; + // milliseconds for how long we try sticking to a dead routing path before trying to switch + const uint64_t DATAGRAM_SESSION_PATH_TIMEOUT = 5000; + // milliseconds interval a routing path is used before switching + const uint64_t DATAGRAM_SESSION_PATH_SWITCH_INTERVAL = 60 * 1000; + // milliseconds before lease expire should we try switching leases + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW = 10 * 1000; + // milliseconds fudge factor for leases handover + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000; + + + class DatagramSession + { + public: + DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent); + + /** send an i2np message to remote endpoint for this session */ + void SendMsg(std::shared_ptr msg); + /** get the last time in milliseconds for when we used this datagram session */ + uint64_t LastActivity() const { return m_LastUse; } + /** get the last time in milliseconds when we successfully sent data */ + uint64_t LastSuccess() const { return m_LastSuccess; } + struct Info + { + std::shared_ptr IBGW; + std::shared_ptr OBEP; + const uint64_t activity; + const uint64_t success; + Info() : IBGW(nullptr), OBEP(nullptr), activity(0), success(0) {} + Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a, const uint64_t s) : + activity(a), + success(s) { + if(ibgw) IBGW = std::make_shared(ibgw); + else IBGW = nullptr; + if(obep) OBEP = std::make_shared(obep); + else OBEP = nullptr; + } + }; + + Info GetSessionInfo() const; + + + private: + + /** update our routing path we are using, mark that we have changed paths */ + void UpdateRoutingPath(const std::shared_ptr & path); + + /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */ + bool ShouldUpdateRoutingPath() const; + + /** return true if we should switch the lease for out routing path otherwise return false */ + bool ShouldSwitchLease() const; + + /** get next usable routing path, try reusing outbound tunnels */ + std::shared_ptr GetNextRoutingPath(); + /** + * mark current routing path as invalid and clear it + * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time + */ + void ResetRoutingPath(); + + /** get next usable lease, does not fetch or update if expired or have no lease set */ + std::shared_ptr GetNextLease(); + + void HandleSend(std::shared_ptr msg); + void HandleGotLeaseSet(std::shared_ptr remoteIdent, + std::shared_ptr msg); + void UpdateLeaseSet(std::shared_ptr msg=nullptr); + + private: + i2p::client::ClientDestination * m_LocalDestination; + i2p::data::IdentHash m_RemoteIdentity; + std::shared_ptr m_RoutingSession; + // Ident hash of IBGW that are invalid + std::vector m_InvalidIBGW; + std::shared_ptr m_RemoteLeaseSet; + uint64_t m_LastUse; + uint64_t m_LastPathChange; + uint64_t m_LastSuccess; + }; + + const size_t MAX_DATAGRAM_SIZE = 32768; class DatagramDestination { typedef std::function Receiver; public: + DatagramDestination (std::shared_ptr owner); ~DatagramDestination (); @@ -34,21 +119,31 @@ namespace datagram void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; }; void ResetReceiver () { m_Receiver = nullptr; }; - void SetReceiver (const Receiver& receiver, uint16_t port) { m_ReceiversByPorts[port] = receiver; }; - void ResetReceiver (uint16_t port) { m_ReceiversByPorts.erase (port); }; + void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; }; + void ResetReceiver (uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); }; + + std::shared_ptr GetInfoForRemote(const i2p::data::IdentHash & remote); + + // clean up stale sessions + void CleanUp (); private: - - void HandleLeaseSetRequestComplete (std::shared_ptr leaseSet, std::shared_ptr msg); + + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); - void SendMsg (std::shared_ptr msg, std::shared_ptr remote); + void HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + /** find a receiver by port, if none by port is found try default receiever, otherwise returns nullptr */ + Receiver FindReceiver(uint16_t port); + private: - - std::shared_ptr m_Owner; + i2p::client::ClientDestination * m_Owner; Receiver m_Receiver; // default + std::mutex m_SessionsMutex; + std::map > m_Sessions; + std::mutex m_ReceiversMutex; std::map m_ReceiversByPorts; i2p::data::GzipInflator m_Inflator; diff --git a/Destination.cpp b/Destination.cpp index 42351787..48717f5f 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -168,14 +168,32 @@ namespace client else return false; } - + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { + std::lock_guard lock(m_RemoteLeaseSetsMutex); auto it = m_RemoteLeaseSets.find (ident); if (it != m_RemoteLeaseSets.end ()) - { + { if (!it->second->IsExpired ()) + { + if (it->second->ExpiresSoon()) + { + LogPrint(eLogDebug, "Destination: Lease Set expires soon, updating before expire"); + // update now before expiration for smooth handover + RequestDestination(ident, [this, ident] (std::shared_ptr ls) { + if(ls && !ls->IsExpired()) + { + ls->PopulateLeases(); + { + std::lock_guard _lock(m_RemoteLeaseSetsMutex); + m_RemoteLeaseSets[ident] = ls; + } + } + }); + } return it->second; + } else LogPrint (eLogWarning, "Destination: remote LeaseSet expired"); } @@ -185,7 +203,10 @@ namespace client if (ls && !ls->IsExpired ()) { ls->PopulateLeases (); // since we don't store them in netdb - m_RemoteLeaseSets[ident] = ls; + { + std::lock_guard lock(m_RemoteLeaseSetsMutex); + m_RemoteLeaseSets[ident] = ls; + } return ls; } } @@ -203,6 +224,7 @@ namespace client void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet) { m_LeaseSet.reset (newLeaseSet); + i2p::garlic::GarlicDestination::SetLeaseSetUpdated (); if (m_IsPublic) { m_PublishVerificationTimer.cancel (); @@ -279,6 +301,7 @@ namespace client if (buf[DATABASE_STORE_TYPE_OFFSET] == 1) // LeaseSet { LogPrint (eLogDebug, "Remote LeaseSet"); + std::lock_guard lock(m_RemoteLeaseSetsMutex); auto it = m_RemoteLeaseSets.find (buf + DATABASE_STORE_KEY_OFFSET); if (it != m_RemoteLeaseSets.end ()) { @@ -391,8 +414,7 @@ namespace client } void LeaseSetDestination::SetLeaseSetUpdated () - { - i2p::garlic::GarlicDestination::SetLeaseSetUpdated (); + { UpdateLeaseSet (); } @@ -619,6 +641,7 @@ namespace client { CleanupExpiredTags (); CleanupRemoteLeaseSets (); + CleanupDestination (); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); @@ -628,6 +651,7 @@ namespace client void LeaseSetDestination::CleanupRemoteLeaseSets () { auto ts = i2p::util::GetMillisecondsSinceEpoch (); + std::lock_guard lock(m_RemoteLeaseSetsMutex); for (auto it = m_RemoteLeaseSets.begin (); it != m_RemoteLeaseSets.end ();) { if (it->second->IsEmpty () || ts > it->second->GetExpirationTime ()) // leaseset expired @@ -642,7 +666,8 @@ namespace client ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), - m_Keys (keys), m_DatagramDestination (nullptr) + m_Keys (keys), m_DatagramDestination (nullptr), + m_ReadyChecker(GetService()) { if (isPublic) PersistTemporaryKeys (); @@ -654,8 +679,6 @@ namespace client ClientDestination::~ClientDestination () { - if (m_DatagramDestination) - delete m_DatagramDestination; } bool ClientDestination::Start () @@ -676,22 +699,44 @@ namespace client { if (LeaseSetDestination::Stop ()) { + m_ReadyChecker.cancel(); m_StreamingDestination->Stop (); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) it.second->Stop (); - if (m_DatagramDestination) - { - auto d = m_DatagramDestination; - m_DatagramDestination = nullptr; - delete d; - } - return true; + if(m_DatagramDestination) + delete m_DatagramDestination; + m_DatagramDestination = nullptr; + return true; } else return false; } + void ClientDestination::Ready(ReadyPromise & p) + { + ScheduleCheckForReady(&p); + } + + void ClientDestination::ScheduleCheckForReady(ReadyPromise * p) + { + // tick every 100ms + m_ReadyChecker.expires_from_now(boost::posix_time::milliseconds(100)); + m_ReadyChecker.async_wait([&, p] (const boost::system::error_code & ecode) { + HandleCheckForReady(ecode, p); + }); + } + + void ClientDestination::HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p) + { + if(ecode) // error happened + p->set_value(nullptr); + else if(IsReady()) // we are ready + p->set_value(std::shared_ptr(this)); + else // we are not ready + ScheduleCheckForReady(p); + } + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); @@ -796,9 +841,9 @@ namespace client return dest; } - i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { - if (!m_DatagramDestination) + if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); return m_DatagramDestination; } @@ -848,5 +893,11 @@ namespace client Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO SetLeaseSet (leaseSet); } + + void ClientDestination::CleanupDestination () + { + if (m_DatagramDestination) m_DatagramDestination->CleanUp (); + } + } } diff --git a/Destination.h b/Destination.h index ac7ef7c9..22ffa603 100644 --- a/Destination.h +++ b/Destination.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "Identity.h" #include "TunnelPool.h" @@ -95,6 +96,7 @@ namespace client protected: void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet); + virtual void CleanupDestination () {}; // additional clean up in derived classes // I2CP virtual void HandleDataMessage (const uint8_t * buf, size_t len) = 0; virtual void CreateNewLeaseSet (std::vector > tunnels) = 0; @@ -122,6 +124,7 @@ namespace client std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; + mutable std::mutex m_RemoteLeaseSetsMutex; std::map > m_RemoteLeaseSets; std::map > m_LeaseSetRequests; @@ -142,13 +145,19 @@ namespace client class ClientDestination: public LeaseSetDestination { public: - + // type for informing that a client destination is ready + typedef std::promise > ReadyPromise; + ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params = nullptr); ~ClientDestination (); bool Start (); bool Stop (); - + + // informs promise with shared_from_this() when this destination is ready to use + // if cancelled before ready, informs promise with nullptr + void Ready(ReadyPromise & p); + const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); }; @@ -163,8 +172,8 @@ namespace client bool IsAcceptingStreams () const; // datagram - i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; - i2p::datagram::DatagramDestination * CreateDatagramDestination (); + i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; + i2p::datagram::DatagramDestination * CreateDatagramDestination (); // implements LocalDestination const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; @@ -172,6 +181,7 @@ namespace client protected: + void CleanupDestination (); // I2CP void HandleDataMessage (const uint8_t * buf, size_t len); void CreateNewLeaseSet (std::vector > tunnels); @@ -182,6 +192,9 @@ namespace client { return std::static_pointer_cast(shared_from_this ()); } void PersistTemporaryKeys (); + void ScheduleCheckForReady(ReadyPromise * p); + void HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p); + private: i2p::data::PrivateKeys m_Keys; @@ -189,8 +202,10 @@ namespace client std::shared_ptr m_StreamingDestination; // default std::map > m_StreamingDestinationsByPorts; - i2p::datagram::DatagramDestination * m_DatagramDestination; + i2p::datagram::DatagramDestination * m_DatagramDestination; + boost::asio::deadline_timer m_ReadyChecker; + public: // for HTTP only diff --git a/FS.cpp b/FS.cpp index 663c1916..a809e8c4 100644 --- a/FS.cpp +++ b/FS.cpp @@ -158,6 +158,13 @@ namespace fs { } void HashedStorage::Traverse(std::vector & files) { + Iterate([&files] (const std::string & fname) { + files.push_back(fname); + }); + } + + void HashedStorage::Iterate(FilenameVisitor v) + { boost::filesystem::path p(root); boost::filesystem::recursive_directory_iterator it(p); boost::filesystem::recursive_directory_iterator end; @@ -166,7 +173,7 @@ namespace fs { if (!boost::filesystem::is_regular_file( it->status() )) continue; const std::string & t = it->path().string(); - files.push_back(t); + v(t); } } } // fs diff --git a/FS.h b/FS.h index 0437ccf9..c476aa63 100644 --- a/FS.h +++ b/FS.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace i2p { namespace fs { @@ -43,6 +44,7 @@ namespace fs { std::string suffix; /**< suffix of file in storage (extension) */ public: + typedef std::function FilenameVisitor; HashedStorage(const char *n, const char *p1, const char *p2, const char *s): name(n), prefix1(p1), prefix2(p2), suffix(s) {}; @@ -58,6 +60,8 @@ namespace fs { void Remove(const std::string & ident); /** find all files in storage and store list in provided vector */ void Traverse(std::vector & files); + /** visit every file in this storage with a visitor */ + void Iterate(FilenameVisitor v); }; /** @brief Returns current application name, default 'i2pd' */ diff --git a/Family.cpp b/Family.cpp index ff09f2f5..c1840e51 100644 --- a/Family.cpp +++ b/Family.cpp @@ -114,6 +114,12 @@ namespace data { uint8_t buf[50], signatureBuf[64]; size_t len = family.length (), signatureLen = strlen (signature); + if (len + 32 > 50) + { + LogPrint (eLogError, "Family: ", family, " is too long"); + return false; + } + memcpy (buf, family.c_str (), len); memcpy (buf + len, (const uint8_t *)ident, 32); len += 32; diff --git a/Garlic.cpp b/Garlic.cpp index c3c0e045..1cd1676e 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -178,7 +178,7 @@ namespace garlic // create message if (!tagFound) // new session { - LogPrint (eLogWarning, "Garlic: No tags available, will use ElGamal"); + LogPrint (eLogInfo, "Garlic: No tags available, will use ElGamal"); if (!m_Destination) { LogPrint (eLogError, "Garlic: Can't use ElGamal for unknown destination"); @@ -247,7 +247,7 @@ namespace garlic size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, std::shared_ptr msg, UnconfirmedTags * newTags) { - uint64_t ts = i2p::util::GetMillisecondsSinceEpoch () + 8000; // 8 sec + uint64_t ts = i2p::util::GetMillisecondsSinceEpoch (); uint32_t msgID; RAND_bytes ((uint8_t *)&msgID, 4); size_t size = 0; @@ -258,9 +258,11 @@ namespace garlic if (m_Owner) { // resubmit non-confirmed LeaseSet - if (m_LeaseSetUpdateStatus == eLeaseSetSubmitted && - i2p::util::GetMillisecondsSinceEpoch () > m_LeaseSetSubmissionTime + LEASET_CONFIRMATION_TIMEOUT) - m_LeaseSetUpdateStatus = eLeaseSetUpdated; + if (m_LeaseSetUpdateStatus == eLeaseSetSubmitted && ts > m_LeaseSetSubmissionTime + LEASET_CONFIRMATION_TIMEOUT) + { + m_LeaseSetUpdateStatus = eLeaseSetUpdated; + SetSharedRoutingPath (nullptr); // invalidate path since leaseset was not confirmed + } // attach DeviveryStatus if necessary if (newTags || m_LeaseSetUpdateStatus == eLeaseSetUpdated) // new tags created or leaseset updated @@ -286,7 +288,7 @@ namespace garlic { m_LeaseSetUpdateStatus = eLeaseSetSubmitted; m_LeaseSetUpdateMsgID = msgID; - m_LeaseSetSubmissionTime = i2p::util::GetMillisecondsSinceEpoch (); + m_LeaseSetSubmissionTime = ts; // clove if our leaseSet must be attached auto leaseSet = CreateDatabaseStoreMsg (m_Owner->GetLeaseSet ()); size += CreateGarlicClove (payload + size, leaseSet, false); @@ -303,7 +305,7 @@ namespace garlic size += 3; htobe32buf (payload + size, msgID); // MessageID size += 4; - htobe64buf (payload + size, ts); // Expiration of message + htobe64buf (payload + size, ts + 8000); // Expiration of message, 8 sec size += 8; return size; } diff --git a/Garlic.h b/Garlic.h index 6a92b94a..76d8eaf3 100644 --- a/Garlic.h +++ b/Garlic.h @@ -104,10 +104,11 @@ namespace garlic { if (m_LeaseSetUpdateStatus != eLeaseSetDoNotSend) m_LeaseSetUpdateStatus = eLeaseSetUpdated; }; + bool IsLeaseSetNonConfirmed () const { return m_LeaseSetUpdateStatus == eLeaseSetSubmitted; }; std::shared_ptr GetSharedRoutingPath (); void SetSharedRoutingPath (std::shared_ptr path); - + private: size_t CreateAESBlock (uint8_t * buf, std::shared_ptr msg); diff --git a/HTTPServer.cpp b/HTTPServer.cpp index fdc5b38f..2e9a02ca 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -337,7 +337,8 @@ namespace http { s << "" << it->GetWindowSize () << ""; s << "" << (int)it->GetStatus () << ""; s << "
\r\n" << std::endl; - } + } + s << ""; } } @@ -422,7 +423,7 @@ namespace http { s << " Accept transit tunnels
\r\n"; #if (!defined(WIN32) && !defined(QT_GUI_LIB) && !defined(ANDROID)) if (Daemon.gracefullShutdownInterval) - s << " Cancel gracefull shutdown
"; + s << " Cancel gracefull shutdown
"; else s << " Start gracefull shutdown
\r\n"; #endif @@ -564,6 +565,32 @@ namespace http { s << ":" << it.second->GetLocalPort (); s << "
\r\n"<< std::endl; } + auto& clientForwards = i2p::client::context.GetClientForwards (); + if (!clientForwards.empty ()) + { + s << "
\r\nClient Forwards:
\r\n
\r\n"; + for (auto& it: clientForwards) + { + auto& ident = it.second->GetLocalDestination ()->GetIdentHash(); + s << ""; + s << it.second->GetName () << " ⇐ "; + s << i2p::client::context.GetAddressBook ().ToAddress(ident); + s << "
\r\n"<< std::endl; + } + } + auto& serverForwards = i2p::client::context.GetServerForwards (); + if (!serverForwards.empty ()) + { + s << "
\r\nServer Forwards:
\r\n
\r\n"; + for (auto& it: serverForwards) + { + auto& ident = it.second->GetLocalDestination ()->GetIdentHash(); + s << ""; + s << it.second->GetName () << " ⇐ "; + s << i2p::client::context.GetAddressBook ().ToAddress(ident); + s << "
\r\n"<< std::endl; + } + } } HTTPConnection::HTTPConnection (std::shared_ptr socket): @@ -762,13 +789,13 @@ namespace http { reply.add_header("Content-Type", "text/html"); reply.body = content; - std::string res = reply.to_string(); - boost::asio::async_write (*m_Socket, boost::asio::buffer(res), + m_SendBuffer = reply.to_string(); + boost::asio::async_write (*m_Socket, boost::asio::buffer(m_SendBuffer), std::bind (&HTTPConnection::Terminate, shared_from_this (), std::placeholders::_1)); } HTTPServer::HTTPServer (const std::string& address, int port): - m_Thread (nullptr), m_Work (m_Service), + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::address::from_string(address), port)) { } @@ -797,6 +824,7 @@ namespace http { i2p::config::SetOption("http.pass", pass); LogPrint(eLogInfo, "HTTPServer: password set to ", pass); } + m_IsRunning = true; m_Thread = std::unique_ptr(new std::thread (std::bind (&HTTPServer::Run, this))); m_Acceptor.listen (); Accept (); @@ -804,9 +832,11 @@ namespace http { void HTTPServer::Stop () { + m_IsRunning = false; m_Acceptor.close(); m_Service.stop (); - if (m_Thread) { + if (m_Thread) + { m_Thread->join (); m_Thread = nullptr; } @@ -814,7 +844,17 @@ namespace http { void HTTPServer::Run () { - m_Service.run (); + while (m_IsRunning) + { + try + { + m_Service.run (); + } + catch (std::exception& ex) + { + LogPrint (eLogError, "HTTPServer: runtime exception: ", ex.what ()); + } + } } void HTTPServer::Accept () @@ -828,7 +868,13 @@ namespace http { std::shared_ptr newSocket) { if (ecode) + { + if(newSocket) newSocket->close(); + LogPrint(eLogError, "HTTP Server: error handling accept ", ecode.message()); + if(ecode != boost::asio::error::operation_aborted) + Accept(); return; + } CreateConnection(newSocket); Accept (); } diff --git a/HTTPServer.h b/HTTPServer.h index bf7f5c65..4a32702d 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -31,6 +31,7 @@ namespace http { boost::asio::deadline_timer m_Timer; char m_Buffer[HTTP_CONNECTION_BUFFER_SIZE + 1]; size_t m_BufferLen; + std::string m_SendBuffer; bool needAuth; std::string user; std::string pass; @@ -56,6 +57,7 @@ namespace http { private: + bool m_IsRunning; std::unique_ptr m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; diff --git a/I2PService.cpp b/I2PService.cpp index 4f907f18..f5ebcb0c 100644 --- a/I2PService.cpp +++ b/I2PService.cpp @@ -32,7 +32,12 @@ namespace client } } - TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) {} + TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) + { + boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE); + upstream->set_option(option); + downstream->set_option(option); + } TCPIPPipe::~TCPIPPipe() { diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index ad06328b..0c58ba9d 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -9,6 +9,17 @@ namespace i2p { namespace client { + + /** set standard socket options */ + static void I2PTunnelSetSocketOptions(std::shared_ptr socket) + { + if (socket && socket->is_open()) + { + boost::asio::socket_base::receive_buffer_size option(I2P_TUNNEL_CONNECTION_BUFFER_SIZE); + socket->set_option(option); + } + } + I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, std::shared_ptr leaseSet, int port): I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()), @@ -34,7 +45,7 @@ namespace client I2PTunnelConnection::~I2PTunnelConnection () { } - + void I2PTunnelConnection::I2PConnect (const uint8_t * msg, size_t len) { if (m_Stream) @@ -50,9 +61,27 @@ namespace client void I2PTunnelConnection::Connect () { - if (m_Socket) - m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, + I2PTunnelSetSocketOptions(m_Socket); + if (m_Socket) { +#ifdef __linux__ + // bind to 127.x.x.x address + // where x.x.x are first three bytes from ident + + if (m_RemoteEndpoint.address ().is_v4 () && + m_RemoteEndpoint.address ().to_v4 ().to_bytes ()[0] == 127) + { + m_Socket->open (boost::asio::ip::tcp::v4 ()); + boost::asio::ip::address_v4::bytes_type bytes; + const uint8_t * ident = m_Stream->GetRemoteIdentity ()->GetIdentHash (); + bytes[0] = 127; + memcpy (bytes.data ()+1, ident, 3); + boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes); + m_Socket->bind (boost::asio::ip::tcp::endpoint (ourIP, 0)); + } +#endif + m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, shared_from_this (), std::placeholders::_1)); + } } void I2PTunnelConnection::Terminate () @@ -384,7 +413,7 @@ namespace client { m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip); } - + void I2PServerTunnel::Start () { m_Endpoint.port (m_Port); @@ -499,5 +528,241 @@ namespace client conn->Connect (); } -} -} + void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + std::lock_guard lock(m_SessionsMutex); + auto session = ObtainUDPSession(from, toPort, fromPort); + session->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); + session->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + + } + + void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession * u) -> bool { + return now - u->LastActivity >= delta; + }); + } + + UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + { + auto ih = from.GetIdentHash(); + for ( UDPSession * s : m_Sessions ) + { + if ( s->Identity == ih) + { + /** found existing session */ + LogPrint(eLogDebug, "UDPServer: found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32()); + return s; + } + } + /** create new udp session */ + boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0); + m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, ih, localPort, remotePort)); + return m_Sessions.back(); + } + + UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash to, + uint16_t ourPort, uint16_t theirPort) : + m_Destination(localDestination->GetDatagramDestination()), + m_Service(localDestination->GetService()), + IPSocket(localDestination->GetService(), localEndpoint), + Identity(to), + SendEndpoint(endpoint), + LastActivity(i2p::util::GetMillisecondsSinceEpoch()), + LocalPort(ourPort), + RemotePort(theirPort) + { + Receive(); + } + + + void UDPSession::Receive() { + LogPrint(eLogDebug, "UDPSession: Receive"); + IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), + FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); + } + + void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) + { + if(!ecode) + { + LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); + LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + uint8_t * data = new uint8_t[len]; + memcpy(data, m_Buffer, len); + m_Service.post([&,len, data] () { + m_Destination->SendDatagramTo(data, len, Identity, 0, 0); + delete [] data; + }); + + Receive(); + } else { + LogPrint(eLogError, "UDPSession: ", ecode.message()); + } + } + + + + I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, + const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + m_Name(name), + LocalPort(port), + m_LocalAddress(localAddress), + m_RemoteEndpoint(forwardTo) + { + m_LocalDest = localDestination; + m_LocalDest->Start(); + auto dgram = m_LocalDest->CreateDatagramDestination(); + dgram->SetReceiver(std::bind(&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); + } + + I2PUDPServerTunnel::~I2PUDPServerTunnel() + { + auto dgram = m_LocalDest->GetDatagramDestination(); + if (dgram) dgram->ResetReceiver(); + + LogPrint(eLogInfo, "UDPServer: done"); + } + + void I2PUDPServerTunnel::Start() { + m_LocalDest->Start(); + } + + std::vector > I2PUDPServerTunnel::GetSessions() + { + std::vector > sessions; + std::lock_guard lock(m_SessionsMutex); + for ( UDPSession * s : m_Sessions ) + { + if (!s->m_Destination) continue; + auto info = s->m_Destination->GetInfoForRemote(s->Identity); + if(!info) continue; + + auto sinfo = std::make_shared(); + sinfo->Name = m_Name; + sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); + sinfo->RemoteIdent = std::make_shared(s->Identity.data()); + sinfo->CurrentIBGW = info->IBGW; + sinfo->CurrentOBEP = info->OBEP; + sessions.push_back(sinfo); + } + return sessions; + } + + I2PUDPClientTunnel::I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, + std::shared_ptr localDestination, + uint16_t remotePort) : + m_Name(name), + m_Session(nullptr), + m_RemoteDest(remoteDest), + m_LocalDest(localDestination), + m_LocalEndpoint(localEndpoint), + m_RemoteIdent(nullptr), + m_ResolveThread(nullptr), + LocalPort(localEndpoint.port()), + RemotePort(remotePort), + m_cancel_resolve(false) + { + auto dgram = m_LocalDest->CreateDatagramDestination(); + dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5)); + } + + + + void I2PUDPClientTunnel::Start() { + m_LocalDest->Start(); + if (m_ResolveThread == nullptr) + m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + } + + std::vector > I2PUDPClientTunnel::GetSessions() + { + std::vector > infos; + if(m_Session && m_LocalDest) + { + auto s = m_Session; + if (s->m_Destination) + { + auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity); + if(info) + { + auto sinfo = std::make_shared(); + sinfo->Name = m_Name; + sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); + sinfo->RemoteIdent = std::make_shared(s->Identity.data()); + sinfo->CurrentIBGW = info->IBGW; + sinfo->CurrentOBEP = info->OBEP; + infos.push_back(sinfo); + } + } + } + return infos; + } + + void I2PUDPClientTunnel::TryResolving() { + LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); + m_RemoteIdent = new i2p::data::IdentHash; + m_RemoteIdent->Fill(0); + + while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve) + { + LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + if(m_cancel_resolve) + { + LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled"); + return; + } + LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32()); + // delete existing session + if(m_Session) delete m_Session; + + boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0); + m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, *m_RemoteIdent, LocalPort, RemotePort); + } + + void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) + { + // address match + if(m_Session) + { + // tell session + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); + m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint); + } + else + LogPrint(eLogWarning, "UDP Client: no session"); + } + else + LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); + + } + + I2PUDPClientTunnel::~I2PUDPClientTunnel() { + auto dgram = m_LocalDest->GetDatagramDestination(); + if (dgram) dgram->ResetReceiver(); + + if (m_Session) delete m_Session; + m_cancel_resolve = true; + + if(m_ResolveThread) + { + m_ResolveThread->join(); + delete m_ResolveThread; + m_ResolveThread = nullptr; + } + if (m_RemoteIdent) delete m_RemoteIdent; + } +} +} diff --git a/I2PTunnel.h b/I2PTunnel.h index bec0f9a4..dce9f812 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -9,6 +9,7 @@ #include #include "Identity.h" #include "Destination.h" +#include "Datagram.h" #include "Streaming.h" #include "I2PService.h" @@ -128,8 +129,121 @@ namespace client std::string m_Name, m_Destination; const i2p::data::IdentHash * m_DestinationIdentHash; int m_DestinationPort; - }; + }; + + /** 2 minute timeout for udp sessions */ + const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2; + + /** max size for i2p udp */ + const size_t I2P_UDP_MAX_MTU = i2p::datagram::MAX_DATAGRAM_SIZE; + + struct UDPSession + { + i2p::datagram::DatagramDestination * m_Destination; + boost::asio::io_service & m_Service; + boost::asio::ip::udp::socket IPSocket; + i2p::data::IdentHash Identity; + boost::asio::ip::udp::endpoint FromEndpoint; + boost::asio::ip::udp::endpoint SendEndpoint; + uint64_t LastActivity; + + uint16_t LocalPort; + uint16_t RemotePort; + + uint8_t m_Buffer[I2P_UDP_MAX_MTU]; + + UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash ident, + uint16_t ourPort, uint16_t theirPort); + void HandleReceived(const boost::system::error_code & ecode, std::size_t len); + void Receive(); + }; + + + /** read only info about a datagram session */ + struct DatagramSessionInfo + { + /** the name of this forward */ + std::string Name; + /** ident hash of local destination */ + std::shared_ptr LocalIdent; + /** ident hash of remote destination */ + std::shared_ptr RemoteIdent; + /** ident hash of IBGW in use currently in this session or nullptr if none is set */ + std::shared_ptr CurrentIBGW; + /** ident hash of OBEP in use for this session or nullptr if none is set */ + std::shared_ptr CurrentOBEP; + /** i2p router's udp endpoint */ + boost::asio::ip::udp::endpoint LocalEndpoint; + /** client's udp endpoint */ + boost::asio::ip::udp::endpoint RemoteEndpoint; + /** how long has this converstation been idle in ms */ + uint64_t idle; + }; + + /** server side udp tunnel, many i2p inbound to 1 ip outbound */ + class I2PUDPServerTunnel + { + public: + I2PUDPServerTunnel(const std::string & name, + std::shared_ptr localDestination, + const boost::asio::ip::address & localAddress, + boost::asio::ip::udp::endpoint forwardTo, uint16_t port); + ~I2PUDPServerTunnel(); + /** expire stale udp conversations */ + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); + void Start(); + const char * GetName() const { return m_Name.c_str(); } + std::vector > GetSessions(); + std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + + private: + + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + + private: + const std::string m_Name; + const uint16_t LocalPort; + boost::asio::ip::address m_LocalAddress; + boost::asio::ip::udp::endpoint m_RemoteEndpoint; + std::mutex m_SessionsMutex; + std::vector m_Sessions; + std::shared_ptr m_LocalDest; + }; + + class I2PUDPClientTunnel + { + public: + I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, + uint16_t remotePort); + ~I2PUDPClientTunnel(); + void Start(); + const char * GetName() const { return m_Name.c_str(); } + std::vector > GetSessions(); + + bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } + + std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + + private: + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void TryResolving(); + const std::string m_Name; + UDPSession * m_Session; + const std::string m_RemoteDest; + std::shared_ptr m_LocalDest; + const boost::asio::ip::udp::endpoint m_LocalEndpoint; + i2p::data::IdentHash * m_RemoteIdent; + std::thread * m_ResolveThread; + uint16_t LocalPort; + uint16_t RemotePort; + bool m_cancel_resolve; + }; + class I2PServerTunnel: public I2PService { public: @@ -148,8 +262,10 @@ namespace client const boost::asio::ip::tcp::endpoint& GetEndpoint () const { return m_Endpoint; } const char* GetName() { return m_Name.c_str (); } - - private: + + void SetMaxConnsPerMinute(const uint32_t conns) { m_PortDestination->SetMaxConnsPerMinute(conns); } + + private: void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, std::shared_ptr resolver); @@ -165,7 +281,7 @@ namespace client boost::asio::ip::tcp::endpoint m_Endpoint; std::shared_ptr m_PortDestination; std::set m_AccessList; - bool m_IsAccessList; + bool m_IsAccessList; }; class I2PServerTunnelHTTP: public I2PServerTunnel diff --git a/Identity.cpp b/Identity.cpp index 71ca007f..6d37d34e 100644 --- a/Identity.cpp +++ b/Identity.cpp @@ -200,7 +200,9 @@ namespace data } memcpy (&m_StandardIdentity, buf, DEFAULT_IDENTITY_SIZE); - delete[] m_ExtendedBuffer; m_ExtendedBuffer = nullptr; + if(m_ExtendedBuffer) delete[] m_ExtendedBuffer; + m_ExtendedBuffer = nullptr; + m_ExtendedLen = bufbe16toh (m_StandardIdentity.certificate + 1); if (m_ExtendedLen) { @@ -410,6 +412,7 @@ namespace data memcpy (m_PrivateKey, buf + ret, 256); // private key always 256 ret += 256; size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + if(signingPrivateKeySize + ret > len) return 0; // overflow memcpy (m_SigningPrivateKey, buf + ret, signingPrivateKeySize); ret += signingPrivateKeySize; m_Signer = nullptr; @@ -422,7 +425,8 @@ namespace data size_t ret = m_Public->ToBuffer (buf, len); memcpy (buf + ret, m_PrivateKey, 256); // private key always 256 ret += 256; - size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + if(ret + signingPrivateKeySize > len) return 0; // overflow memcpy (buf + ret, m_SigningPrivateKey, signingPrivateKeySize); ret += signingPrivateKeySize; return ret; @@ -452,11 +456,12 @@ namespace data void PrivateKeys::Sign (const uint8_t * buf, int len, uint8_t * signature) const { - if (m_Signer) - m_Signer->Sign (buf, len, signature); + if (!m_Signer) + CreateSigner(); + m_Signer->Sign (buf, len, signature); } - void PrivateKeys::CreateSigner () + void PrivateKeys::CreateSigner () const { switch (m_Public->GetSigningKeyType ()) { diff --git a/Identity.h b/Identity.h index 841acf65..8f3e9e9d 100644 --- a/Identity.h +++ b/Identity.h @@ -92,6 +92,8 @@ namespace data CryptoKeyType GetCryptoKeyType () const; void DropVerifier () const; // to save memory + bool operator == (const IdentityEx & other) const { return GetIdentHash() == other.GetIdentHash(); } + private: void CreateVerifier () const; @@ -133,14 +135,14 @@ namespace data private: - void CreateSigner (); + void CreateSigner () const; private: std::shared_ptr m_Public; uint8_t m_PrivateKey[256]; uint8_t m_SigningPrivateKey[1024]; // assume private key doesn't exceed 1024 bytes - std::unique_ptr m_Signer; + mutable std::unique_ptr m_Signer; }; // kademlia diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 89cad0df..04dc77c5 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -162,8 +162,21 @@ namespace data { return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen); } - - const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + + bool LeaseSet::ExpiresSoon(const uint64_t dlt, const uint64_t fudge) const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + if (fudge) now += rand() % fudge; + if (now >= m_ExpirationTime) return true; + return m_ExpirationTime - now <= dlt; + } + + const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + { + return GetNonExpiredLeasesExcluding( [] (const Lease & l) -> bool { return false; }, withThreshold); + } + + const std::vector > LeaseSet::GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold) const { auto ts = i2p::util::GetMillisecondsSinceEpoch (); std::vector > leases; @@ -174,7 +187,7 @@ namespace data endDate += LEASE_ENDDATE_THRESHOLD; else endDate -= LEASE_ENDDATE_THRESHOLD; - if (ts < endDate) + if (ts < endDate && !exclude(*it)) leases.push_back (it); } return leases; diff --git a/LeaseSet.h b/LeaseSet.h index c174ac39..95d8bf90 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -7,6 +7,7 @@ #include #include #include "Identity.h" +#include "Timestamp.h" namespace i2p { @@ -24,7 +25,13 @@ namespace data IdentHash tunnelGateway; uint32_t tunnelID; uint64_t endDate; // 0 means invalid - bool isUpdated; // trasient + bool isUpdated; // trasient + /* return true if this lease expires within t millisecond + fudge factor */ + bool ExpiresWithin( const uint64_t t, const uint64_t fudge = 1000 ) const { + auto expire = i2p::util::GetMillisecondsSinceEpoch (); + if(fudge) expire += rand() % fudge; + return endDate - expire >= t; + } }; struct LeaseCmp @@ -38,6 +45,8 @@ namespace data }; }; + typedef std::function LeaseInspectFunc; + const size_t MAX_LS_BUFFER_SIZE = 3072; const size_t LEASE_SIZE = 44; // 32 + 4 + 8 const uint8_t MAX_NUM_LEASES = 16; @@ -56,10 +65,12 @@ namespace data size_t GetBufferLen () const { return m_BufferLen; }; bool IsValid () const { return m_IsValid; }; const std::vector > GetNonExpiredLeases (bool withThreshold = true) const; + const std::vector > GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold = true) const; bool HasExpiredLeases () const; bool IsExpired () const; bool IsEmpty () const { return m_Leases.empty (); }; uint64_t GetExpirationTime () const { return m_ExpirationTime; }; + bool ExpiresSoon(const uint64_t dlt=1000 * 5, const uint64_t fudge = 0) const ; bool operator== (const LeaseSet& other) const { return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); }; diff --git a/Log.cpp b/Log.cpp index 590f3d0f..5660821f 100644 --- a/Log.cpp +++ b/Log.cpp @@ -10,7 +10,7 @@ namespace i2p { namespace log { - Log logger; + static Log logger; /** * @brief Maps our loglevel to their symbolic name */ diff --git a/Log.h b/Log.h index 6762899f..79bbeb3f 100644 --- a/Log.h +++ b/Log.h @@ -40,8 +40,20 @@ enum LogType { #endif }; +#ifdef _WIN32 + const char LOG_COLOR_ERROR[] = ""; + const char LOG_COLOR_WARNING[] = ""; + const char LOG_COLOR_RESET[] = ""; +#else + const char LOG_COLOR_ERROR[] = "\033[1;31m"; + const char LOG_COLOR_WARNING[] = "\033[1;33m"; + const char LOG_COLOR_RESET[] = "\033[0m"; +#endif + + namespace i2p { namespace log { + struct LogMsg; /* forward declaration */ class Log @@ -150,17 +162,17 @@ namespace log { /** internal usage only -- folding args array to single string */ template -void LogPrint (std::stringstream& s, TValue arg) +void LogPrint (std::stringstream& s, TValue&& arg) noexcept { - s << arg; + s << std::forward(arg); } /** internal usage only -- folding args array to single string */ template -void LogPrint (std::stringstream& s, TValue arg, TArgs... args) +void LogPrint (std::stringstream& s, TValue&& arg, TArgs&&... args) noexcept { - LogPrint (s, arg); - LogPrint (s, args...); + LogPrint (s, std::forward(arg)); + LogPrint (s, std::forward(args)...); } /** @@ -169,7 +181,7 @@ void LogPrint (std::stringstream& s, TValue arg, TArgs... args) * @param args Array of message parts */ template -void LogPrint (LogLevel level, TArgs... args) +void LogPrint (LogLevel level, TArgs&&... args) noexcept { i2p::log::Log &log = i2p::log::Logger(); if (level > log.GetLogLevel ()) @@ -177,8 +189,16 @@ void LogPrint (LogLevel level, TArgs... args) // fold message to single string std::stringstream ss(""); - LogPrint (ss, args ...); + if(level == eLogError) // if log level is ERROR color log message red + ss << LOG_COLOR_ERROR; + else if (level == eLogWarning) // if log level is WARN color log message yellow + ss << LOG_COLOR_WARNING; + LogPrint (ss, std::forward(args)...); + + // reset color + ss << LOG_COLOR_RESET; + auto msg = std::make_shared(level, std::time(nullptr), ss.str()); msg->tid = std::this_thread::get_id(); log.Append(msg); diff --git a/Makefile.linux b/Makefile.linux index b0549f43..ddff76a7 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -32,9 +32,14 @@ ifeq ($(USE_STATIC),yes) # Using 'getaddrinfo' in statically linked applications requires at runtime # the shared libraries from the glibc version used for linking LIBDIR := /usr/lib - LDLIBS = -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options - LDLIBS += -lssl -lcrypto -lz -ldl -lpthread -lrt - LDLIBS += -static-libstdc++ -static-libgcc -static + LDLIBS = $(LIBDIR)/libboost_system.a + LDLIBS += $(LIBDIR)/libboost_date_time.a + LDLIBS += $(LIBDIR)/libboost_filesystem.a + LDLIBS += $(LIBDIR)/libboost_program_options.a + LDLIBS += $(LIBDIR)/libssl.a + LDLIBS += $(LIBDIR)/libcrypto.a + LDLIBS += $(LIBDIR)/libz.a + LDLIBS += -lpthread -static-libstdc++ -static-libgcc -lrt USE_AESNI := no else LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread diff --git a/NTCPSession.cpp b/NTCPSession.cpp index a1b26aca..20dc9ab0 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -22,7 +22,6 @@ namespace transport TransportSession (in_RemoteRouter, NTCP_TERMINATION_TIMEOUT), m_Server (server), m_Socket (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false), - m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()), m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false) { m_Establisher = new Establisher; @@ -748,6 +747,7 @@ namespace transport auto& addresses = context.GetRouterInfo ().GetAddresses (); for (const auto& address: addresses) { + if (!address) continue; if (address->transportStyle == i2p::data::RouterInfo::eTransportNTCP) { if (address->host.is_v4()) @@ -845,6 +845,7 @@ namespace transport if (it != m_NTCPSessions.end ()) { LogPrint (eLogWarning, "NTCP: session to ", ident.ToBase64 (), " already exists"); + session->Terminate(); return false; } m_NTCPSessions.insert (std::pair >(ident, session)); @@ -953,7 +954,7 @@ namespace transport { if (ecode) { - LogPrint (eLogError, "NTCP: Can't connect to ", conn->GetSocket ().remote_endpoint (), ": ", ecode.message ()); + LogPrint (eLogError, "NTCP: Connect error ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true); conn->Terminate (); diff --git a/NTCPSession.h b/NTCPSession.h index adf33553..a3d95c62 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -54,9 +54,7 @@ namespace transport void Done (); boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; - bool IsEstablished () const { return m_IsEstablished; }; - bool IsTerminationTimeoutExpired (uint64_t ts) const - { return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); }; + bool IsEstablished () const { return m_IsEstablished; }; void ClientLogin (); void ServerLogin (); @@ -103,7 +101,6 @@ namespace transport NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; - uint64_t m_LastActivityTimestamp; i2p::crypto::CBCDecryption m_Decryption; i2p::crypto::CBCEncryption m_Encryption; diff --git a/NetDb.cpp b/NetDb.cpp index dc018326..2c9c4395 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -34,11 +34,11 @@ namespace data } void NetDb::Start () - { + { m_Storage.SetPlace(i2p::fs::GetDataDir()); m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); InitProfilesStorage (); - m_Families.LoadCertificates (); + m_Families.LoadCertificates (); Load (); if (m_RouterInfos.size () < 25) // reseed if # of router less than 50 Reseed (); @@ -68,7 +68,7 @@ namespace data m_Requests.Stop (); } } - + void NetDb::Run () { uint32_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0; @@ -329,6 +329,67 @@ namespace data for ( auto & entry : m_LeaseSets) v(entry.first, entry.second); } + + void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v) + { + m_Storage.Iterate([v] (const std::string & filename) { + auto ri = std::make_shared(filename); + v(ri); + }); + } + + void NetDb::VisitRouterInfos(RouterInfoVisitor v) + { + std::unique_lock lock(m_RouterInfosMutex); + for ( const auto & item : m_RouterInfos ) + v(item.second); + } + + size_t NetDb::VisitRandomRouterInfos(RouterInfoFilter filter, RouterInfoVisitor v, size_t n) + { + std::vector > found; + const size_t max_iters_per_cyle = 3; + size_t iters = max_iters_per_cyle; + while(n > 0) + { + std::unique_lock lock(m_RouterInfosMutex); + uint32_t idx = rand () % m_RouterInfos.size (); + uint32_t i = 0; + for (const auto & it : m_RouterInfos) { + if(i >= idx) // are we at the random start point? + { + // yes, check if we want this one + if(filter(it.second)) + { + // we have a match + --n; + found.push_back(it.second); + // reset max iterations per cycle + iters = max_iters_per_cyle; + break; + } + } + else // not there yet + ++i; + } + // we have enough + if(n == 0) break; + --iters; + // have we tried enough this cycle ? + if(!iters) { + // yes let's try the next cycle + --n; + iters = max_iters_per_cyle; + } + } + // visit the ones we found + size_t visited = 0; + for(const auto & ri : found ) { + v(ri); + ++visited; + } + return visited; + } void NetDb::Load () { @@ -495,8 +556,13 @@ namespace data } uint8_t uncompressed[2048]; size_t uncompressedSize = m_Inflator.Inflate (buf + offset, size, uncompressed, 2048); - if (uncompressedSize) + if (uncompressedSize && uncompressedSize < 2048) updated = AddRouterInfo (ident, uncompressed, uncompressedSize); + else + { + LogPrint (eLogError, "NetDb: decompression failed ", uncompressedSize); + return; + } } if (replyToken && context.IsFloodfill () && updated) diff --git a/NetDb.h b/NetDb.h index 43b069b9..d8ee148a 100644 --- a/NetDb.h +++ b/NetDb.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "Base.h" #include "Gzip.h" @@ -35,7 +36,13 @@ namespace data /** function for visiting a leaseset stored in a floodfill */ typedef std::function)> LeaseSetVisitor; - + + /** function for visiting a router info we have locally */ + typedef std::function)> RouterInfoVisitor; + + /** function for visiting a router info and determining if we want to use it */ + typedef std::function)> RouterInfoFilter; + class NetDb { public: @@ -45,7 +52,7 @@ namespace data void Start (); void Stop (); - + bool AddRouterInfo (const uint8_t * buf, int len); bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len); bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from); @@ -86,7 +93,12 @@ namespace data /** visit all lease sets we currently store */ void VisitLeaseSets(LeaseSetVisitor v); - + /** visit all router infos we have currently on disk, usually insanely expensive, does not access in memory RI */ + void VisitStoredRouterInfos(RouterInfoVisitor v); + /** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */ + void VisitRouterInfos(RouterInfoVisitor v); + /** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */ + size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n); private: void Load (); @@ -103,7 +115,7 @@ namespace data std::shared_ptr GetRandomRouter (Filter filter) const; private: - + mutable std::mutex m_LeaseSetsMutex; std::map > m_LeaseSets; mutable std::mutex m_RouterInfosMutex; diff --git a/Reseed.cpp b/Reseed.cpp index 48d7a53f..a51dcad4 100644 --- a/Reseed.cpp +++ b/Reseed.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -22,26 +23,6 @@ namespace i2p { namespace data { - static std::vector httpsReseedHostList = - { -#ifdef MESHNET - // meshnet i2p reseeds - "https://reseed.i2p.rocks:8443/" -#else - // mainline i2p reseeds - "https://reseed.i2p-projekt.de/", // Only HTTPS - "https://i2p.mooo.com/netDb/", - "https://netdb.i2p2.no/", // Only SU3 (v3) support, SNI required - "https://us.reseed.i2p2.no:444/", - "https://uk.reseed.i2p2.no:444/", - "https://i2p.manas.ca:8443/", - "https://i2p-0.manas.ca:8443/", - "https://reseed.i2p.vzaws.com:8443/", // Only SU3 (v3) support - "https://user.mx24.eu/", // Only HTTPS and SU3 (v3) support - "https://download.xxlspeed.com/", // Only HTTPS and SU3 (v3) support - "https://reseed-ru.lngserv.ru/" -#endif - }; Reseeder::Reseeder() { @@ -53,6 +34,10 @@ namespace data int Reseeder::ReseedNowSU3 () { + std::string reseedURLs; i2p::config::GetOption("reseed.urls", reseedURLs); + std::vector httpsReseedHostList; + boost::split(httpsReseedHostList, reseedURLs, boost::is_any_of(","), boost::token_compress_on); + std::string filename; i2p::config::GetOption("reseed.file", filename); if (filename.length() > 0) // reseed file is specified { @@ -345,11 +330,21 @@ namespace data // extract issuer name char name[100]; X509_NAME_oneline (X509_get_issuer_name(cert), name, 100); + char * cn = strstr (name, "CN="); + if (cn) + { + cn += 3; + char * terminator = strchr (cn, '/'); + if (terminator) terminator[0] = 0; + } // extract RSA key (we need n only, e = 65537) RSA * key = X509_get_pubkey (cert)->pkey.rsa; PublicKey value; i2p::crypto::bn2buf (key->n, value, 512); - m_SigningKeys[name] = value; + if (cn) + m_SigningKeys[cn] = value; + else + LogPrint (eLogError, "Reseed: Can't find CN field in ", filename); } SSL_free (ssl); } diff --git a/RouterInfo.cpp b/RouterInfo.cpp index 130355b8..9b2a4195 100644 --- a/RouterInfo.cpp +++ b/RouterInfo.cpp @@ -125,14 +125,6 @@ namespace data m_IsUnreachable = true; return; } - std::stringstream str (std::string ((char *)m_Buffer + identityLen, m_BufferLen - identityLen)); - ReadFromStream (str); - if (!str) - { - LogPrint (eLogError, "RouterInfo: malformed message"); - m_IsUnreachable = true; - return; - } if (verifySignature) { // verify signature @@ -141,9 +133,20 @@ namespace data { LogPrint (eLogError, "RouterInfo: signature verification failed"); m_IsUnreachable = true; + return; } m_RouterIdentity->DropVerifier (); } + // parse RI + std::stringstream str; + str.write ((const char *)m_Buffer + identityLen, m_BufferLen - identityLen); + ReadFromStream (str); + if (!str) + { + LogPrint (eLogError, "RouterInfo: malformed message"); + m_IsUnreachable = true; + } + } void RouterInfo::ReadFromStream (std::istream& s) @@ -182,6 +185,7 @@ namespace data s.seekg (1, std::ios_base::cur); r++; // = r += ReadString (value, 255, s); s.seekg (1, std::ios_base::cur); r++; // ; + if (!s) return; if (!strcmp (key, "host")) { boost::system::error_code ecode; @@ -223,6 +227,11 @@ namespace data size_t l = strlen(key); unsigned char index = key[l-1] - '0'; // TODO: key[l-1] = 0; + if (index > 9) + { + LogPrint (eLogError, "RouterInfo: Unexpected introducer's index ", index, " skipped"); + if (s) continue; else return; + } if (index >= address.introducers.size ()) address.introducers.resize (index + 1); Introducer& introducer = address.introducers.at (index); @@ -262,6 +271,7 @@ namespace data s.seekg (1, std::ios_base::cur); r++; // = r += ReadString (value, 255, s); s.seekg (1, std::ios_base::cur); r++; // ; + if (!s) return; m_Properties[key] = value; // extract caps @@ -270,7 +280,7 @@ namespace data // check netId else if (!strcmp (key, ROUTER_INFO_PROPERTY_NETID) && atoi (value) != I2PD_NET_ID) { - LogPrint (eLogError, "Unexpected ", ROUTER_INFO_PROPERTY_NETID, "=", value); + LogPrint (eLogError, "RouterInfo: Unexpected ", ROUTER_INFO_PROPERTY_NETID, "=", value); m_IsUnreachable = true; } // family @@ -543,6 +553,7 @@ namespace data if (l < len) { s.read (str, l); + if (!s) l = 0; // failed, return empty string str[l] = 0; } else diff --git a/SSU.cpp b/SSU.cpp index 0c3662d3..72091e00 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -17,7 +17,8 @@ namespace transport m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_EndpointV6 (addr, port), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) + m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), + m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.set_option (boost::asio::ip::v6_only (true)); @@ -32,7 +33,8 @@ namespace transport m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) + m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), + m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); @@ -59,11 +61,13 @@ namespace transport { m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); + ScheduleTermination (); } if (context.SupportsV6 ()) { m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); + ScheduleTerminationV6 (); } SchedulePeerTestsCleanupTimer (); ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers @@ -73,6 +77,8 @@ namespace transport { DeleteAllSessions (); m_IsRunning = false; + m_TerminationTimer.cancel (); + m_TerminationTimerV6.cancel (); m_Service.stop (); m_Socket.close (); m_ServiceV6.stop (); @@ -640,6 +646,58 @@ namespace transport SchedulePeerTestsCleanupTimer (); } } + + void SSUServer::ScheduleTermination () + { + m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer, + this, std::placeholders::_1)); + } + + void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto& it: m_Sessions) + if (it.second->IsTerminationTimeoutExpired (ts)) + { + auto session = it.second; + m_Service.post ([session] + { + LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); + session->Failed (); + }); + } + ScheduleTermination (); + } + } + + void SSUServer::ScheduleTerminationV6 () + { + m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6, + this, std::placeholders::_1)); + } + + void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto& it: m_SessionsV6) + if (it.second->IsTerminationTimeoutExpired (ts)) + { + auto session = it.second; + m_ServiceV6.post ([session] + { + LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); + session->Failed (); + }); + } + ScheduleTerminationV6 (); + } + } } } diff --git a/SSU.h b/SSU.h index d779c025..182fa0eb 100644 --- a/SSU.h +++ b/SSU.h @@ -23,6 +23,7 @@ namespace transport const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour + const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU_MAX_NUM_INTRODUCERS = 3; struct SSUPacket @@ -90,6 +91,12 @@ namespace transport void SchedulePeerTestsCleanupTimer (); void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode); + // timer + void ScheduleTermination (); + void HandleTerminationTimer (const boost::system::error_code& ecode); + void ScheduleTerminationV6 (); + void HandleTerminationTimerV6 (const boost::system::error_code& ecode); + private: struct PeerTest @@ -106,7 +113,8 @@ namespace transport boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6; - boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer; + boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer, + m_TerminationTimer, m_TerminationTimerV6; std::list m_Introducers; // introducers we are connected to std::map > m_Sessions, m_SessionsV6; std::map m_Relays; // we are introducer diff --git a/SSUSession.cpp b/SSUSession.cpp index bf4058a5..26c14570 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -14,7 +14,7 @@ namespace transport SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint, std::shared_ptr router, bool peerTest ): TransportSession (router, SSU_TERMINATION_TIMEOUT), - m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Timer (GetService ()), + m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_ConnectTimer (GetService ()), m_IsPeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0),m_Data (*this), m_IsDataReceived (false) { @@ -97,7 +97,7 @@ namespace transport { if (!len) return; // ignore zero-length packets if (m_State == eSessionStateEstablished) - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); if (m_IsSessionKey && Validate (buf, len, m_MacKey)) // try session key first DecryptSessionKey (buf, len); @@ -229,7 +229,7 @@ namespace transport } LogPrint (eLogDebug, "SSU message: session created"); - m_Timer.cancel (); // connect timer + m_ConnectTimer.cancel (); // connect timer SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, signed on time auto headerSize = GetSSUHeaderSize (buf); if (headerSize >= len) @@ -804,9 +804,9 @@ namespace transport void SSUSession::ScheduleConnectTimer () { - m_Timer.cancel (); - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.cancel (); + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } @@ -826,8 +826,8 @@ namespace transport if (m_State == eSessionStateUnknown) { // set connect timer - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } uint32_t nonce; @@ -840,8 +840,8 @@ namespace transport { m_State = eSessionStateIntroduced; // set connect timer - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } @@ -851,7 +851,7 @@ namespace transport SendSesionDestroyed (); transports.PeerDisconnected (shared_from_this ()); m_Data.Stop (); - m_Timer.cancel (); + m_ConnectTimer.cancel (); } void SSUSession::Done () @@ -868,7 +868,7 @@ namespace transport transports.PeerConnected (shared_from_this ()); if (m_IsPeerTest) SendPeerTest (); - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); } void SSUSession::Failed () @@ -880,24 +880,6 @@ namespace transport } } - void SSUSession::ScheduleTermination () - { - m_Timer.cancel (); - m_Timer.expires_from_now (boost::posix_time::seconds(GetTerminationTimeout ())); - m_Timer.async_wait (std::bind (&SSUSession::HandleTerminationTimer, - shared_from_this (), std::placeholders::_1)); - } - - void SSUSession::HandleTerminationTimer (const boost::system::error_code& ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint (eLogWarning, "SSU: no activity with ", m_RemoteEndpoint, " for ", GetTerminationTimeout (), " seconds"); - Failed (); - } - } - - void SSUSession::SendI2NPMessages (const std::vector >& msgs) { GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs)); @@ -1009,7 +991,7 @@ namespace transport else // v6 { boost::asio::ip::address_v6::bytes_type bytes; - memcpy (bytes.data (), address, 6); + memcpy (bytes.data (), address, 16); addr = boost::asio::ip::address_v6 (bytes); } SendPeerTest (nonce, addr, be16toh (port), introKey); // to Alice with her address received from Bob @@ -1051,7 +1033,7 @@ namespace transport } else if (address.is_v6 ()) { - *payload = 6; + *payload = 16; memcpy (payload + 1, address.to_v6 ().to_bytes ().data (), 16); // our IP V6 } else @@ -1126,7 +1108,7 @@ namespace transport FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48); Send (buf, 48); LogPrint (eLogDebug, "SSU: keep-alive sent"); - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); } } diff --git a/SSUSession.h b/SSUSession.h index c2f24ce3..69669187 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -77,6 +77,7 @@ namespace transport void WaitForIntroduction (); void Close (); void Done (); + void Failed (); boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; void SendI2NPMessages (const std::vector >& msgs); @@ -114,7 +115,6 @@ namespace transport void ProcessRelayResponse (const uint8_t * buf, size_t len); void ProcessRelayIntro (const uint8_t * buf, size_t len); void Established (); - void Failed (); void ScheduleConnectTimer (); void HandleConnectTimer (const boost::system::error_code& ecode); void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); @@ -131,15 +131,12 @@ namespace transport void DecryptSessionKey (uint8_t * buf, size_t len); bool Validate (uint8_t * buf, size_t len, const i2p::crypto::MACKey& macKey); - void ScheduleTermination (); - void HandleTerminationTimer (const boost::system::error_code& ecode); - private: friend class SSUData; // TODO: change in later SSUServer& m_Server; boost::asio::ip::udp::endpoint m_RemoteEndpoint; - boost::asio::deadline_timer m_Timer; + boost::asio::deadline_timer m_ConnectTimer; bool m_IsPeerTest; SessionState m_State; bool m_IsSessionKey; diff --git a/Streaming.cpp b/Streaming.cpp index 90c363ea..02e738c8 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -269,9 +269,14 @@ namespace stream } auto sentPacket = *it; uint64_t rtt = ts - sentPacket->sendTime; + if(ts < sentPacket->sendTime) + { + LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); + rtt = 1; + } m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTO = m_RTT*1.5; // TODO: implement it better - LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt); + LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; @@ -658,6 +663,9 @@ namespace stream void Stream::ScheduleResend () { m_ResendTimer.cancel (); + // check for invalid value + if (m_RTO <= 0) + m_RTO = 1; m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO)); m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer, shared_from_this (), std::placeholders::_1)); @@ -733,7 +741,15 @@ namespace stream return; } if (m_Status == eStreamStatusOpen) + { + if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) + { + // seems something went wrong and we should re-select tunnels + m_CurrentOutboundTunnel = nullptr; + m_CurrentRemoteLease = nullptr; + } SendQuickAck (); + } m_IsAckSendScheduled = false; } } @@ -792,7 +808,10 @@ namespace stream StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), - m_PendingIncomingTimer (m_Owner->GetService ()) + m_PendingIncomingTimer (m_Owner->GetService ()), + m_ConnTrackTimer(m_Owner->GetService()), + m_ConnsPerMinute(DEFAULT_MAX_CONNS_PER_MIN), + m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch()) { } @@ -807,17 +826,23 @@ namespace stream } void StreamingDestination::Start () - { + { + ScheduleConnTrack(); } void StreamingDestination::Stop () { ResetAcceptor (); m_PendingIncomingTimer.cancel (); + m_ConnTrackTimer.cancel(); { std::unique_lock l(m_StreamsMutex); m_Streams.clear (); - } + } + { + std::unique_lock l(m_ConnsMutex); + m_Conns.clear (); + } } void StreamingDestination::HandleNextPacket (Packet * packet) @@ -841,6 +866,18 @@ namespace stream auto incomingStream = CreateNewIncomingStream (); uint32_t receiveStreamID = packet->GetReceiveStreamID (); incomingStream->HandleNextPacket (packet); // SYN + auto ident = incomingStream->GetRemoteIdentity(); + if(ident) + { + auto ih = ident->GetIdentHash(); + if(DropNewStream(ih)) + { + // drop + LogPrint(eLogWarning, "Streaming: Dropping connection, too many inbound streams from ", ih.ToBase32()); + incomingStream->Terminate(); + return; + } + } // handle saved packets if any { auto it = m_SavedPackets.find (receiveStreamID); @@ -851,7 +888,7 @@ namespace stream incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); } - } + } // accept if (m_Acceptor != nullptr) m_Acceptor (incomingStream); @@ -1004,6 +1041,64 @@ namespace stream else msg = nullptr; return msg; - } + } + + void StreamingDestination::SetMaxConnsPerMinute(const uint32_t conns) + { + m_ConnsPerMinute = conns; + LogPrint(eLogDebug, "Streaming: Set max conns per minute per destination to ", conns); + } + + bool StreamingDestination::DropNewStream(const i2p::data::IdentHash & ih) + { + std::lock_guard lock(m_ConnsMutex); + if (m_Banned.size() > MAX_BANNED_CONNS) return true; // overload + auto end = std::end(m_Banned); + if ( std::find(std::begin(m_Banned), end, ih) != end) return true; // already banned + auto itr = m_Conns.find(ih); + if (itr == m_Conns.end()) + m_Conns[ih] = 0; + + m_Conns[ih] += 1; + + bool ban = m_Conns[ih] >= m_ConnsPerMinute; + if (ban) + { + m_Banned.push_back(ih); + m_Conns.erase(ih); + LogPrint(eLogWarning, "Streaming: ban ", ih.ToBase32()); + } + return ban; + } + + void StreamingDestination::HandleConnTrack(const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + { // acquire lock + std::lock_guard lock(m_ConnsMutex); + // clear conn tracking + m_Conns.clear(); + // check for ban clear + auto ts = i2p::util::GetMillisecondsSinceEpoch(); + if (ts - m_LastBanClear >= DEFAULT_BAN_INTERVAL) + { + // clear bans + m_Banned.clear(); + m_LastBanClear = ts; + } + } + // reschedule timer + ScheduleConnTrack(); + } + } + + void StreamingDestination::ScheduleConnTrack() + { + m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60)); + m_ConnTrackTimer.async_wait ( + std::bind (&StreamingDestination::HandleConnTrack, + shared_from_this (), std::placeholders::_1)); + } } } diff --git a/Streaming.h b/Streaming.h index c29b62f9..efa4d69e 100644 --- a/Streaming.h +++ b/Streaming.h @@ -51,6 +51,22 @@ namespace stream const int INITIAL_RTO = 9000; // in milliseconds const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const int PENDING_INCOMING_TIMEOUT = 10; // in seconds + + /** i2cp option for limiting inbound stremaing connections */ + const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "maxconns"; + /** default maximum connections attempts per minute per destination */ + const uint32_t DEFAULT_MAX_CONNS_PER_MIN = 600; + + /** + * max banned destinations per local destination + * TODO: make configurable + */ + const uint16_t MAX_BANNED_CONNS = 9999; + /** + * length of a ban in ms + * TODO: make configurable + */ + const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000; struct Packet { @@ -134,10 +150,11 @@ namespace stream size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; int GetWindowSize () const { return m_WindowSize; }; int GetRTT () const { return m_RTT; }; - - private: + /** don't call me */ void Terminate (); + + private: void SendBuffer (); void SendQuickAck (); @@ -210,12 +227,22 @@ namespace stream void HandleDataMessagePayload (const uint8_t * buf, size_t len); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort); + /** set max connections per minute per destination */ + void SetMaxConnsPerMinute(const uint32_t conns); + private: void HandleNextPacket (Packet * packet); std::shared_ptr CreateNewIncomingStream (); void HandlePendingIncomingTimer (const boost::system::error_code& ecode); + /** handle cleaning up connection tracking for ratelimits */ + void HandleConnTrack(const boost::system::error_code& ecode); + + bool DropNewStream(const i2p::data::IdentHash & ident); + + void ScheduleConnTrack(); + private: std::shared_ptr m_Owner; @@ -227,7 +254,16 @@ namespace stream std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN - + + std::mutex m_ConnsMutex; + /** how many connections per minute did each identity have */ + std::map m_Conns; + boost::asio::deadline_timer m_ConnTrackTimer; + uint32_t m_ConnsPerMinute; + /** banned identities */ + std::vector m_Banned; + uint64_t m_LastBanClear; + public: i2p::data::GzipInflator m_Inflator; diff --git a/Tag.h b/Tag.h index b6f94de7..92e2f1a5 100644 --- a/Tag.h +++ b/Tag.h @@ -20,7 +20,7 @@ namespace data { { public: - Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); }; + Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); }; Tag (const Tag& ) = default; #ifndef _WIN32 // FIXME!!! msvs 2013 can't compile it Tag (Tag&& ) = default; @@ -50,6 +50,14 @@ namespace data { return true; } + const uint8_t * data() const { return m_Buf; } + + /** fill with a value */ + void Fill(uint8_t c) + { + memset(m_Buf, c, sz); + } + std::string ToBase64 () const { char str[sz*2]; diff --git a/TransportSession.h b/TransportSession.h index 1b057bc7..9c97d02e 100644 --- a/TransportSession.h +++ b/TransportSession.h @@ -9,6 +9,7 @@ #include "Crypto.h" #include "RouterInfo.h" #include "I2NPProtocol.h" +#include "Timestamp.h" namespace i2p { @@ -54,7 +55,8 @@ namespace transport public: TransportSession (std::shared_ptr router, int terminationTimeout): - m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout) + m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout), + m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()) { if (router) m_RemoteIdentity = router->GetRouterIdentity (); @@ -72,6 +74,8 @@ namespace transport int GetTerminationTimeout () const { return m_TerminationTimeout; }; void SetTerminationTimeout (int terminationTimeout) { m_TerminationTimeout = terminationTimeout; }; + bool IsTerminationTimeoutExpired (uint64_t ts) const + { return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); }; virtual void SendI2NPMessages (const std::vector >& msgs) = 0; @@ -82,6 +86,7 @@ namespace transport size_t m_NumSentBytes, m_NumReceivedBytes; bool m_IsOutgoing; int m_TerminationTimeout; + uint64_t m_LastActivityTimestamp; }; } } diff --git a/Transports.cpp b/Transports.cpp index b1c174f6..a29cac15 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -114,6 +114,7 @@ namespace transport auto& addresses = context.GetRouterInfo ().GetAddresses (); for (const auto& address : addresses) { + if (!address) continue; if (m_NTCPServer == nullptr && enableNTCP) { m_NTCPServer = new NTCPServer (); diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 119556aa..35272f2c 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -15,7 +15,8 @@ namespace tunnel { TunnelPool::TunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels): m_NumInboundHops (numInboundHops), m_NumOutboundHops (numOutboundHops), - m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true) + m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true), + m_CustomPeerSelector(nullptr) { } @@ -327,9 +328,18 @@ namespace tunnel bool TunnelPool::SelectPeers (std::vector >& peers, bool isInbound) { - if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops; - if (numHops <= 0) return true; // peers is empty + // peers is empty + if (numHops <= 0) return true; + // custom peer selector in use ? + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + if (m_CustomPeerSelector) + return m_CustomPeerSelector->SelectPeers(peers, numHops, isInbound); + } + // explicit peers in use + if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); + auto prevHop = i2p::context.GetSharedRouterInfo (); if(i2p::transport::transports.RoutesRestricted()) { @@ -477,6 +487,23 @@ namespace tunnel LogPrint (eLogDebug, "Tunnels: Creating paired inbound tunnel..."); auto tunnel = tunnels.CreateInboundTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); - } + } + + void TunnelPool::SetCustomPeerSelector(TunnelPeerSelector selector) + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + m_CustomPeerSelector = selector; + } + + void TunnelPool::UnsetCustomPeerSelector() + { + SetCustomPeerSelector(nullptr); + } + + bool TunnelPool::HasCustomPeerSelector() + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + return m_CustomPeerSelector != nullptr; + } } } diff --git a/TunnelPool.h b/TunnelPool.h index 0cd2057d..d5bcf18f 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -23,6 +23,16 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + /** interface for custom tunnel peer selection algorithm */ + struct ITunnelPeerSelector + { + typedef std::shared_ptr Peer; + typedef std::vector TunnelPath; + virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0; + }; + + typedef std::shared_ptr TunnelPeerSelector; + class TunnelPool: public std::enable_shared_from_this // per local destination { public: @@ -45,7 +55,6 @@ namespace tunnel std::shared_ptr GetNextOutboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNextInboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNewOutboundTunnel (std::shared_ptr old) const; - void TestTunnels (); void ProcessGarlicMessage (std::shared_ptr msg); void ProcessDeliveryStatus (std::shared_ptr msg); @@ -56,9 +65,12 @@ namespace tunnel int GetNumInboundTunnels () const { return m_NumInboundTunnels; }; int GetNumOutboundTunnels () const { return m_NumOutboundTunnels; }; - - private: + void SetCustomPeerSelector(TunnelPeerSelector selector); + void UnsetCustomPeerSelector(); + bool HasCustomPeerSelector(); + private: + void CreateInboundTunnel (); void CreateOutboundTunnel (); void CreatePairedInboundTunnel (std::shared_ptr outboundTunnel); @@ -80,7 +92,8 @@ namespace tunnel mutable std::mutex m_TestsMutex; std::map, std::shared_ptr > > m_Tests; bool m_IsActive; - + std::mutex m_CustomPeerSelectorMutex; + TunnelPeerSelector m_CustomPeerSelector; public: // for HTTP only diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index f9f21fd4..61b8fdf9 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -16,6 +16,7 @@ option(WITH_UPNP "Include support for UPnP client" OFF) option(WITH_PCH "Use precompiled header" OFF) option(WITH_GUI "Include GUI (currently MS Windows only)" ON) option(WITH_MESHNET "Build for cjdns test network" OFF) +option(WITH_ADDRSANITIZER "Build with address sanitizer (linux only)" OFF) # paths set ( CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules" ) @@ -183,11 +184,27 @@ if (WITH_AESNI) add_definitions ( -DAESNI ) endif() +if (WITH_ADDRSANITIZER) + if (NOT MSVC) + set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-omit-frame-pointer" ) + set( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address" ) + else () + error ("MSVC does not support address sanitizer option") + endif() +endif() + # libraries # TODO: once CMake 3.1+ becomes mainstream, see e.g. http://stackoverflow.com/a/29871891/673826 # use imported Threads::Threads instead set(THREADS_PREFER_PTHREAD_FLAG ON) -find_package ( Threads REQUIRED ) +if (IOS) + set(CMAKE_THREAD_LIBS_INIT "-lpthread") + set(CMAKE_HAVE_THREADS_LIBRARY 1) + set(CMAKE_USE_WIN32_THREADS_INIT 0) + set(CMAKE_USE_PTHREADS_INIT 1) +else() + find_package ( Threads REQUIRED ) +endif() if(THREADS_HAVE_PTHREAD_ARG) # compile time flag set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") endif() @@ -328,6 +345,7 @@ message(STATUS " STATIC BUILD : ${WITH_STATIC}") message(STATUS " UPnP : ${WITH_UPNP}") message(STATUS " PCH : ${WITH_PCH}") message(STATUS " MESHNET : ${WITH_MESHNET}") +message(STATUS " ADDRSANITIZER : ${WITH_ADDRSANITIZER}") message(STATUS "---------------------------------------") #Handle paths nicely diff --git a/docs/build_notes_ios.md b/docs/build_notes_ios.md new file mode 100644 index 00000000..26688d21 --- /dev/null +++ b/docs/build_notes_ios.md @@ -0,0 +1,85 @@ +Building on iOS +=================== + +How to build i2pd for iOS 9 and iOS Simulator 386/x64 + +Prerequisites +-------------- + +XCode7+, cmake 3.2+ + +Dependencies +-------------- +- precompiled openssl +- precompiled boost with modules `filesystem`, `program_options`, `date_time` and `system` +- ios-cmake toolchain from https://github.com/vovasty/ios-cmake.git + +Building +------------------------ +Assume you have folder structure + +``` +lib + libboost_date_time.a + libboost_filesystem.a + libboost_program_options.a + libboost_system.a + libboost.a + libcrypto.a + libssl.a +include + boost + openssl +ios-cmake +i2pd +``` + + +```bash +mkdir -p build/simulator/lib build/ios/lib include/i2pd + +pushd build/simulator && \ +cmake -DIOS_PLATFORM=SIMULATOR \ + -DPATCH=/usr/bin/patch \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_TOOLCHAIN_FILE=../../ios-cmake/toolchain/iOS.cmake \ + -DWITH_STATIC=yes \ + -DWITH_BINARY=no \ + -DBoost_INCLUDE_DIR=../../include \ + -DOPENSSL_INCLUDE_DIR=../../include \ + -DBoost_LIBRARY_DIR=../../lib \ + -DOPENSSL_SSL_LIBRARY=../../lib/libssl.a \ + -DOPENSSL_CRYPTO_LIBRARY=../../lib/libcrypto.a \ + ../../i2pd/build && \ +make -j16 VERBOSE=1 && \ +popd + +pushd build/ios +cmake -DIOS_PLATFORM=OS \ + -DPATCH=/usr/bin/patch \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_TOOLCHAIN_FILE=../../ios-cmake/toolchain/iOS.cmake \ + -DWITH_STATIC=yes \ + -DWITH_BINARY=no \ + -DBoost_INCLUDE_DIR=../../include \ + -DOPENSSL_INCLUDE_DIR=../../include \ + -DBoost_LIBRARY_DIR=../../lib \ + -DOPENSSL_SSL_LIBRARY=../../lib/libssl.a \ + -DOPENSSL_CRYPTO_LIBRARY=../../lib/libcrypto.a \ + ../../i2pd/build && \ +make -j16 VERBOSE=1 && \ +popd + +libtool -static -o lib/libi2pdclient.a build/*/libi2pdclient.a +libtool -static -o lib/libi2pd.a build/*/libi2pd.a + +cp i2pd/*.h include/i2pd +``` + +Include into project +--------------------------- +1. add all libraries in `lib` folder to `Project linked frameworks`. +2. add `libc++` and `libz` libraries from system libraries to `Project linked frameworks`. +3. add path to i2p headers to your `Headers search paths` + +Alternatively you may use swift wrapper https://github.com/vovasty/SwiftyI2P.git \ No newline at end of file diff --git a/docs/configuration.md b/docs/configuration.md index 32cd7f49..31082dc2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -73,6 +73,7 @@ All options below still possible in cmdline, but better write it in config file: * --precomputation.elgamal= - Use ElGamal precomputated tables. false for x64 and true for other platforms by default * --reseed.file - Full path to SU3 file to reseed from +* --reseed.urls - Reseed URLs, separated by comma * --limits.transittunnels= - Override maximum number of transit tunnels. 2500 by default diff --git a/docs/index.rst b/docs/index.rst index 13cd9edb..5507b075 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -35,5 +35,6 @@ Contents: build_notes_android configuration family + usage diff --git a/docs/usage.md b/docs/usage.md new file mode 100644 index 00000000..cb678cb4 --- /dev/null +++ b/docs/usage.md @@ -0,0 +1,130 @@ +Usage and tutorials +=================== + + +i2pd can be used for: + +* [anonymous websites](#browsing-and-hosting-websites) +* [anonymous chats](#using-and-hosting-chat-servers) +* [anonymous file sharing](#file-sharing) + +and many more. + +## Browsing and hosting websites + +### Browse anonymous websites + +To browse anonymous websites inside Invisible Internet, configure your web browser to use HTTP proxy 127.0.0.1:4444 (available by default in i2pd). + +In Firefox: Preferences -> Advanced -> Network tab -> Connection Settings -> choose Manual proxy configuration, Enter HTTP proxy 127.0.0.1, Port 4444 + +In Chromium: run chromium executable with key + + chromium --proxy-server="http://127.0.0.1:4444" + +Note that if you wish to stay anonymous too you'll need to tune your browser for better privacy. Do your own research, [can start here](http://www.howtogeek.com/102032/how-to-optimize-mozilla-firefox-for-maximum-privacy/). + +Big list of Invisible Internet websites can be found at [identiguy.i2p](http://identiguy.i2p). + +### Host anonymous website + + +If you wish to run your own website in Invisible Internet, follow those steps: + +1) Run your webserver and find out which host:port it uses (for example, 127.0.0.1:8080). + +2) Configure i2pd to create HTTP server tunnel. Put in your ~/.i2pd/tunnels.conf file: + + [anon-website] + type = http + host = 127.0.0.1 + port = 8080 + keys = anon-website.dat + +3) Restart i2pd. + +4) Find b32 destination of your website. + +Go to webconsole -> [I2P tunnels page](http://127.0.0.1:7070/?page=i2p_tunnels). Look for Sever tunnels and you will see address like \.b32.i2p next to anon-website. + +Website is now available in Invisible Internet by visiting this address. + +5) (Optional) Register short and rememberable .i2p domain on [inr.i2p](http://inr.i2p). + + +## Using and hosting chat servers + +### Running anonymous IRC server + +1) Run your IRC server software and find out which host:port it uses (for example, 127.0.0.1:5555). + +For small private IRC servers you can use [miniircd](https://github.com/jrosdahl/miniircd), for large public networks [UnreadIRCd](https://www.unrealircd.org/). + +2) Configure i2pd to create IRC server tunnel. + +Simplest case, if your server does not support WebIRC, add this to ~/.i2pd/tunnels.conf: + + [anon-chatserver] + type = irc + host = 127.0.0.1 + port = 5555 + keys = chatserver-key.dat + +And that is it. + +Alternatively, if your IRC server supports WebIRC, for example, UnreadIRCd, put this into UnrealIRCd config: + + webirc { + mask 127.0.0.1; + password your_password; + }; + +Also change line: + + modes-on-connect "+ixw"; + +to + + modes-on-connect "+iw"; + +And this in ~/.i2pd/tunnels.conf: + + [anon-chatserver] + type = irc + host = 127.0.0.1 + port = 5555 + keys = chatserver-key.dat + webircpassword = your_password + +3) Restart i2pd. + +4) Find b32 destination of your anonymous IRC server. + +Go to webconsole -> [I2P tunnels page](http://127.0.0.1:7070/?page=i2p_tunnels). Look for Sever tunnels and you will see address like \.b32.i2p next to anon-chatserver. + +Clients will use this address to connect to your server anonymously. + +### Connect to anonymous IRC server + +To connect to IRC server at *walker.i2p*, add this to ~/.i2pd/tunnels.conf: + + [IRC2] + type = client + address = 127.0.0.1 + port = 6669 + destination = walker.i2p + #keys = walker-keys.dat + +Restart i2pd, then connect to irc://127.0.0.1:6669 with your IRC client. + +## File sharing + +You can share and download torrents with [Transmission-I2P](https://github.com/l-n-s/transmission-i2p). + +Alternative torrent-clients are [Robert](http://en.wikipedia.org/wiki/Robert_%28P2P_Software%29) and [Vuze](https://en.wikipedia.org/wiki/Vuze). + +Robert uses BOB protocol, i2pd must be run with parameter --bob.enabled=true. + +Vuze uses I2CP protocol, i2pd must be run with parameter --i2cp.enabled=true. + +Also, visit [postman tracker](http://tracker2.postman.i2p).