diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 7a6c5c12..917acdb8 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -290,13 +290,13 @@ namespace http { s << "" << tr("Tunnel creation success rate") << ": " << i2p::tunnel::tunnels.GetTunnelCreationSuccessRate () << "%
\r\n"; s << "" << tr("Received") << ": "; ShowTraffic (s, i2p::transport::transports.GetTotalReceivedBytes ()); - s << " (" << (double) i2p::transport::transports.GetInBandwidth () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; + s << " (" << (double) i2p::transport::transports.GetInBandwidth15s () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; s << "" << tr("Sent") << ": "; ShowTraffic (s, i2p::transport::transports.GetTotalSentBytes ()); - s << " (" << (double) i2p::transport::transports.GetOutBandwidth () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; + s << " (" << (double) i2p::transport::transports.GetOutBandwidth15s () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; s << "" << tr("Transit") << ": "; ShowTraffic (s, i2p::transport::transports.GetTotalTransitTransmittedBytes ()); - s << " (" << (double) i2p::transport::transports.GetTransitBandwidth () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; + s << " (" << (double) i2p::transport::transports.GetTransitBandwidth15s () / 1024 << " " << tr(/* tr: Kibibit/s */ "KiB/s") << ")
\r\n"; s << "" << tr("Data path") << ": " << i2p::fs::GetUTF8DataDir() << "
\r\n"; s << "
"; if ((outputFormat == OutputFormatEnum::forWebConsole) || !includeHiddenContent) { diff --git a/daemon/I2PControlHandlers.cpp b/daemon/I2PControlHandlers.cpp index 15763948..f3ea7f61 100644 --- a/daemon/I2PControlHandlers.cpp +++ b/daemon/I2PControlHandlers.cpp @@ -33,7 +33,9 @@ namespace client m_RouterInfoHandlers["i2p.router.netdb.knownpeers"] = &I2PControlHandlers::NetDbKnownPeersHandler; m_RouterInfoHandlers["i2p.router.netdb.activepeers"] = &I2PControlHandlers::NetDbActivePeersHandler; m_RouterInfoHandlers["i2p.router.net.bw.inbound.1s"] = &I2PControlHandlers::InboundBandwidth1S; + m_RouterInfoHandlers["i2p.router.net.bw.inbound.15s"] = &I2PControlHandlers::InboundBandwidth15S; m_RouterInfoHandlers["i2p.router.net.bw.outbound.1s"] = &I2PControlHandlers::OutboundBandwidth1S; + m_RouterInfoHandlers["i2p.router.net.bw.outbound.15s"] = &I2PControlHandlers::OutboundBandwidth15S; m_RouterInfoHandlers["i2p.router.net.status"] = &I2PControlHandlers::NetStatusHandler; m_RouterInfoHandlers["i2p.router.net.tunnels.participating"] = &I2PControlHandlers::TunnelsParticipatingHandler; m_RouterInfoHandlers["i2p.router.net.tunnels.successrate"] = &I2PControlHandlers::TunnelsSuccessRateHandler; @@ -153,12 +155,24 @@ namespace client InsertParam (results, "i2p.router.net.bw.inbound.1s", bw); } + void I2PControlHandlers::InboundBandwidth15S (std::ostringstream& results) + { + double bw = i2p::transport::transports.GetInBandwidth15s (); + InsertParam (results, "i2p.router.net.bw.inbound.15s", bw); + } + void I2PControlHandlers::OutboundBandwidth1S (std::ostringstream& results) { double bw = i2p::transport::transports.GetOutBandwidth (); InsertParam (results, "i2p.router.net.bw.outbound.1s", bw); } + void I2PControlHandlers::OutboundBandwidth15S (std::ostringstream& results) + { + double bw = i2p::transport::transports.GetOutBandwidth15s (); + InsertParam (results, "i2p.router.net.bw.outbound.15s", bw); + } + void I2PControlHandlers::NetTotalReceivedBytes (std::ostringstream& results) { InsertParam (results, "i2p.router.net.total.received.bytes", (double)i2p::transport::transports.GetTotalReceivedBytes ()); diff --git a/daemon/I2PControlHandlers.h b/daemon/I2PControlHandlers.h index e33a19fc..d106f288 100644 --- a/daemon/I2PControlHandlers.h +++ b/daemon/I2PControlHandlers.h @@ -50,7 +50,9 @@ namespace client void TunnelsParticipatingHandler (std::ostringstream& results); void TunnelsSuccessRateHandler (std::ostringstream& results); void InboundBandwidth1S (std::ostringstream& results); + void InboundBandwidth15S (std::ostringstream& results); void OutboundBandwidth1S (std::ostringstream& results); + void OutboundBandwidth15S (std::ostringstream& results); void NetTotalReceivedBytes (std::ostringstream& results); void NetTotalSentBytes (std::ostringstream& results); diff --git a/libi2pd/Identity.cpp b/libi2pd/Identity.cpp index 64d87f74..310e6081 100644 --- a/libi2pd/Identity.cpp +++ b/libi2pd/Identity.cpp @@ -49,29 +49,21 @@ namespace data IdentityEx::IdentityEx(const uint8_t * publicKey, const uint8_t * signingKey, SigningKeyType type, CryptoKeyType cryptoType) { - /*uint8_t randomPaddingBlock[32]; - RAND_bytes (randomPaddingBlock, 32);*/ + uint8_t randomPaddingBlock[32]; + RAND_bytes (randomPaddingBlock, 32); if (cryptoType == CRYPTO_KEY_TYPE_ECIES_X25519_AEAD) { - /*memcpy (m_StandardIdentity.publicKey, publicKey ? publicKey : randomPaddingBlock, 32); + memcpy (m_StandardIdentity.publicKey, publicKey ? publicKey : randomPaddingBlock, 32); for (int i = 0; i < 7; i++) // 224 bytes - memcpy (m_StandardIdentity.publicKey + 32*(i + 1), randomPaddingBlock, 32);*/ - if (publicKey) - { - memcpy (m_StandardIdentity.publicKey, publicKey, 32); - RAND_bytes (m_StandardIdentity.publicKey + 32, 224); - } - else - RAND_bytes (m_StandardIdentity.publicKey, 256); + memcpy (m_StandardIdentity.publicKey + 32*(i + 1), randomPaddingBlock, 32); } else { if (publicKey) memcpy (m_StandardIdentity.publicKey, publicKey, 256); else - RAND_bytes (m_StandardIdentity.publicKey, 256); - /*for (int i = 0; i < 8; i++) // 256 bytes - memcpy (m_StandardIdentity.publicKey + 32*i, randomPaddingBlock, 32);*/ + for (int i = 0; i < 8; i++) // 256 bytes + memcpy (m_StandardIdentity.publicKey + 32*i, randomPaddingBlock, 32); } if (type != SIGNING_KEY_TYPE_DSA_SHA1) { @@ -110,9 +102,8 @@ namespace data case SIGNING_KEY_TYPE_REDDSA_SHA512_ED25519: { size_t padding = 128 - i2p::crypto::EDDSA25519_PUBLIC_KEY_LENGTH; // 96 = 128 - 32 - /*for (int i = 0; i < 3; i++) // 96 bytes - memcpy (m_StandardIdentity.signingKey + 32*i, randomPaddingBlock, 32);*/ - RAND_bytes (m_StandardIdentity.signingKey, 96); + for (int i = 0; i < 3; i++) // 96 bytes + memcpy (m_StandardIdentity.signingKey + 32*i, randomPaddingBlock, 32); memcpy (m_StandardIdentity.signingKey + padding, signingKey, i2p::crypto::EDDSA25519_PUBLIC_KEY_LENGTH); break; } diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index f4fdfa04..dba228fd 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -47,21 +47,21 @@ namespace transport { found = true; if (address->IsV6 ()) - { + { uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu); if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE) mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; i2p::context.SetMTU (mtu, false); - } - else - { + } + else + { uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu); if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE) mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; i2p::context.SetMTU (mtu, true); - } + } continue; // we don't need port for proxy - } + } auto port = address->port; if (!port) { @@ -103,11 +103,11 @@ namespace transport } } if (found) - { + { if (m_IsThroughProxy) ConnectToProxy (); m_ReceiveService.Start (); - } + } ScheduleTermination (); ScheduleResend (false); } @@ -136,11 +136,11 @@ namespace transport m_SocketV6.close (); if (m_UDPAssociateSocket) - { + { m_UDPAssociateSocket->close (); m_UDPAssociateSocket.reset (nullptr); - } - + } + StopIOService (); m_Sessions.clear (); @@ -168,11 +168,11 @@ namespace transport m_AddressV6 = localAddress; uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu); if (!mtu) - { + { int maxMTU = i2p::util::net::GetMaxMTU (localAddress.to_v6 ()); mtu = i2p::util::net::GetMTU (localAddress); if (mtu > maxMTU) mtu = maxMTU; - } + } else if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE; if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE; @@ -236,7 +236,19 @@ namespace transport void SSU2Server::HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket) { - if (!ecode) + if (!ecode + || ecode == boost::asio::error::connection_refused + || ecode == boost::asio::error::connection_reset + || ecode == boost::asio::error::network_unreachable + || ecode == boost::asio::error::host_unreachable +#ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO + || ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_ + || ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_ + || ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_ +#endif + ) + // just try continue reading when received ICMP response otherwise socket can crash, + // but better to find out which host were sent it and mark that router as unreachable { i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); packet->len = bytes_transferred; @@ -285,12 +297,12 @@ namespace transport ConnectToProxy (); } else - { + { auto ep = socket.local_endpoint (); socket.close (); OpenSocket (ep); Receive (socket); - } + } } } } @@ -301,7 +313,7 @@ namespace transport { if (m_IsThroughProxy) ProcessNextPacketFromProxy (packet->buf, packet->len); - else + else ProcessNextPacket (packet->buf, packet->len, packet->from); m_PacketsPool.ReleaseMt (packet); if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated) @@ -314,7 +326,7 @@ namespace transport if (m_IsThroughProxy) for (auto& packet: packets) ProcessNextPacketFromProxy (packet->buf, packet->len); - else + else for (auto& packet: packets) ProcessNextPacket (packet->buf, packet->len, packet->from); m_PacketsPool.ReleaseMt (packets); @@ -446,7 +458,7 @@ namespace transport } return nullptr; } - + void SSU2Server::ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint) { if (len < 24) return; @@ -499,7 +511,7 @@ namespace transport if (m_LastSession && m_LastSession->GetState () == eSSU2SessionStateClosing) m_LastSession->RequestTermination (eSSU2TerminationReasonIdleTimeout); // send termination again break; - case eSSU2SessionStateClosingConfirmed: + case eSSU2SessionStateClosingConfirmed: case eSSU2SessionStateTerminated: m_LastSession = nullptr; break; @@ -518,20 +530,22 @@ namespace transport { std::unique_lock l(m_PendingOutgoingSessionsMutex); m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint - } + } else it1->second->ProcessRetry (buf, len); } - else + else if (!i2p::util::net::IsInReservedRange(senderEndpoint.address ()) && senderEndpoint.port ()) { // assume new incoming session auto session = std::make_shared (*this); session->SetRemoteEndpoint (senderEndpoint); session->ProcessFirstIncomingMessage (connID, buf, len); } + else + LogPrint (eLogError, "SSU2: Incoming packet received from invalid endpoint ", senderEndpoint); } } - + void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to) { @@ -539,7 +553,7 @@ namespace transport { SendThroughProxy (header, headerLen, nullptr, 0, payload, payloadLen, to); return; - } + } std::vector bufs { boost::asio::buffer (header, headerLen), @@ -859,11 +873,11 @@ namespace transport if (it != m_OutgoingTokens.end ()) { if (i2p::util::GetSecondsSinceEpoch () + SSU2_TOKEN_EXPIRATION_THRESHOLD > it->second.second) - { + { // token expired m_OutgoingTokens.erase (it); return 0; - } + } return it->second.first; } return 0; @@ -879,7 +893,7 @@ namespace transport return it->second.first; else // token expired m_IncomingTokens.erase (it); - } + } uint64_t token; RAND_bytes ((uint8_t *)&token, 8); m_IncomingTokens.emplace (ep, std::make_pair (token, ts + SSU2_TOKEN_EXPIRATION_TIMEOUT)); @@ -1125,23 +1139,23 @@ namespace transport size_t requestHeaderSize = 0; memset (m_UDPRequestHeader, 0, 3); if (to.address ().is_v6 ()) - { + { m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV6; memcpy (m_UDPRequestHeader + 4, to.address ().to_v6().to_bytes().data(), 16); requestHeaderSize = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; - } + } else - { + { m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV4; memcpy (m_UDPRequestHeader + 4, to.address ().to_v4().to_bytes().data(), 4); requestHeaderSize = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; - } + } htobe16buf (m_UDPRequestHeader + requestHeaderSize - 2, to.port ()); - + std::vector bufs; bufs.push_back (boost::asio::buffer (m_UDPRequestHeader, requestHeaderSize)); bufs.push_back (boost::asio::buffer (header, headerLen)); - if (headerX) bufs.push_back (boost::asio::buffer (headerX, headerXLen)); + if (headerX) bufs.push_back (boost::asio::buffer (headerX, headerXLen)); bufs.push_back (boost::asio::buffer (payload, payloadLen)); boost::system::error_code ec; @@ -1150,7 +1164,7 @@ namespace transport i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); else LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to); - } + } void SSU2Server::ProcessNextPacketFromProxy (uint8_t * buf, size_t len) { @@ -1158,11 +1172,11 @@ namespace transport { LogPrint (eLogWarning, "SSU2: Proxy packet fragmentation is not supported"); return; - } + } size_t offset = 0; boost::asio::ip::udp::endpoint ep; switch (buf[3]) // ATYP - { + { case SOCKS5_ATYP_IPV4: { offset = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; @@ -1172,7 +1186,7 @@ namespace transport uint16_t port = bufbe16toh (buf + 8); ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4 (bytes), port); break; - } + } case SOCKS5_ATYP_IPV6: { offset = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; @@ -1182,15 +1196,15 @@ namespace transport uint16_t port = bufbe16toh (buf + 20); ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v6 (bytes), port); break; - } + } default: { LogPrint (eLogWarning, "SSU2: Unknown ATYP ", (int)buf[3], " from proxy relay"); return; - } - } + } + } ProcessNextPacket (buf + offset, len - offset, ep); - } + } void SSU2Server::ConnectToProxy () { @@ -1208,7 +1222,7 @@ namespace transport else HandshakeWithProxy (); }); - } + } void SSU2Server::HandshakeWithProxy () { @@ -1223,13 +1237,13 @@ namespace transport if (ecode) { LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message()); - m_UDPAssociateSocket.reset (nullptr); + m_UDPAssociateSocket.reset (nullptr); ReconnectToProxy (); - } + } else ReadHandshakeWithProxyReply (); }); - } + } void SSU2Server::ReadHandshakeWithProxyReply () { @@ -1243,7 +1257,7 @@ namespace transport LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message()); m_UDPAssociateSocket.reset (nullptr); ReconnectToProxy (); - } + } else { if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1]) @@ -1252,10 +1266,10 @@ namespace transport { LogPrint(eLogError, "SSU2: Invalid proxy reply"); m_UDPAssociateSocket.reset (nullptr); - } - } - }); - } + } + } + }); + } void SSU2Server::SendUDPAssociateRequest () { @@ -1272,13 +1286,13 @@ namespace transport if (ecode) { LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message()); - m_UDPAssociateSocket.reset (nullptr); + m_UDPAssociateSocket.reset (nullptr); ReconnectToProxy (); - } + } else ReadUDPAssociateReply (); }); - } + } void SSU2Server::ReadUDPAssociateReply () { @@ -1292,11 +1306,11 @@ namespace transport LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message()); m_UDPAssociateSocket.reset (nullptr); ReconnectToProxy (); - } + } else { if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1]) - { + { if (m_UDPRequestHeader[3] == SOCKS5_ATYP_IPV4) { boost::asio::ip::address_v4::bytes_type bytes; @@ -1306,21 +1320,21 @@ namespace transport m_SocketV4.open (boost::asio::ip::udp::v4 ()); Receive (m_SocketV4); ReadUDPAssociateSocket (); - } + } else { LogPrint(eLogError, "SSU2: Proxy UDP associate unsupported ATYP ", (int)m_UDPRequestHeader[3]); m_UDPAssociateSocket.reset (nullptr); - } - } + } + } else { LogPrint(eLogError, "SSU2: Proxy UDP associate error ", (int)m_UDPRequestHeader[1]); m_UDPAssociateSocket.reset (nullptr); - } - } - }); - } + } + } + }); + } void SSU2Server::ReadUDPAssociateSocket () { @@ -1336,18 +1350,18 @@ namespace transport m_ProxyRelayEndpoint.reset (nullptr); m_SocketV4.close (); ConnectToProxy (); // try to reconnect immediately - } + } else ReadUDPAssociateSocket (); - }); - } + }); + } void SSU2Server::ReconnectToProxy () { LogPrint(eLogInfo, "SSU2: Reconnect to proxy after ", SSU2_PROXY_CONNECT_RETRY_TIMEOUT, " seconds"); if (m_ProxyConnectRetryTimer) m_ProxyConnectRetryTimer->cancel (); - else + else m_ProxyConnectRetryTimer.reset (new boost::asio::deadline_timer (m_ReceiveService.GetService ())); m_ProxyConnectRetryTimer->expires_from_now (boost::posix_time::seconds (SSU2_PROXY_CONNECT_RETRY_TIMEOUT)); m_ProxyConnectRetryTimer->async_wait ( @@ -1361,8 +1375,8 @@ namespace transport ConnectToProxy (); } }); - } - + } + bool SSU2Server::SetProxy (const std::string& address, uint16_t port) { boost::system::error_code ecode; @@ -1371,14 +1385,14 @@ namespace transport { m_IsThroughProxy = true; m_ProxyEndpoint.reset (new boost::asio::ip::tcp::endpoint (addr, port)); - } + } else { if (ecode) LogPrint (eLogError, "SSU2: Invalid proxy address ", address, " ", ecode.message()); return false; - } + } return true; - } + } } } diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index ac184506..85c91f71 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -611,7 +611,7 @@ namespace transport void SSU2Session::ProcessSessionRequest (Header& header, uint8_t * buf, size_t len) { // we are Bob - if (len < 80) + if (len < 88) { LogPrint (eLogWarning, "SSU2: SessionRequest message too short ", len); return; @@ -2477,14 +2477,9 @@ namespace transport size_t paddingSize = rand () & 0x0F; // 0 - 15 if (paddingSize + 3 > len) paddingSize = len - 3; else if (paddingSize + 3 < minSize) paddingSize = minSize - 3; - if (paddingSize) - { - buf[0] = eSSU2BlkPadding; - htobe16buf (buf + 1, paddingSize); - memset (buf + 3, 0, paddingSize); - } - else - return 0; + buf[0] = eSSU2BlkPadding; + htobe16buf (buf + 1, paddingSize); + memset (buf + 3, 0, paddingSize); return paddingSize + 3; } diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 34fcc065..c571d45a 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -136,12 +136,14 @@ namespace transport Transports::Transports (): m_IsOnline (true), m_IsRunning (false), m_IsNAT (true), m_CheckReserved(true), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_PeerCleanupTimer (nullptr), m_PeerTestTimer (nullptr), - m_SSU2Server (nullptr), m_NTCP2Server (nullptr), + m_UpdateBandwidthTimer (nullptr), m_SSU2Server (nullptr), m_NTCP2Server (nullptr), m_X25519KeysPairSupplier (15), // 15 pre-generated keys - m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_TotalTransitTransmittedBytes (0), - m_InBandwidth (0), m_OutBandwidth (0), m_TransitBandwidth(0), - m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), - m_LastTransitBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0) + m_TotalSentBytes (0), m_TotalReceivedBytes (0), m_TotalTransitTransmittedBytes (0), + m_InBandwidth (0), m_OutBandwidth (0), m_TransitBandwidth (0), + m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), m_LastTransitBandwidthUpdateBytes (0), + m_InBandwidth15s (0), m_OutBandwidth15s (0), m_TransitBandwidth15s (0), + m_LastInBandwidth15sUpdateBytes (0), m_LastOutBandwidth15sUpdateBytes (0), m_LastTransitBandwidth15sUpdateBytes (0), + m_LastBandwidth15sUpdateTime (0) { } @@ -152,6 +154,7 @@ namespace transport { delete m_PeerCleanupTimer; m_PeerCleanupTimer = nullptr; delete m_PeerTestTimer; m_PeerTestTimer = nullptr; + delete m_UpdateBandwidthTimer; m_UpdateBandwidthTimer = nullptr; delete m_Work; m_Work = nullptr; delete m_Service; m_Service = nullptr; } @@ -165,6 +168,7 @@ namespace transport m_Work = new boost::asio::io_service::work (*m_Service); m_PeerCleanupTimer = new boost::asio::deadline_timer (*m_Service); m_PeerTestTimer = new boost::asio::deadline_timer (*m_Service); + m_UpdateBandwidthTimer = new boost::asio::deadline_timer (*m_Service); } bool ipv4; i2p::config::GetOption("ipv4", ipv4); @@ -206,8 +210,8 @@ namespace transport } // create SSU2 server - if (enableSSU2) - { + if (enableSSU2) + { m_SSU2Server = new SSU2Server (); std::string ssu2proxy; i2p::config::GetOption("ssu2.proxy", ssu2proxy); if (!ssu2proxy.empty()) @@ -215,18 +219,18 @@ namespace transport if (proxyurl.parse (ssu2proxy) && proxyurl.schema == "socks") { if (m_SSU2Server->SetProxy (proxyurl.host, proxyurl.port)) - { + { i2p::context.SetStatus (eRouterStatusProxy); if (ipv6) i2p::context.SetStatusV6 (eRouterStatusProxy); - } + } else LogPrint(eLogError, "Transports: Can't set SSU2 proxy ", ssu2proxy); - } + } else LogPrint(eLogError, "Transports: Invalid SSU2 proxy URL ", ssu2proxy); - } - } + } + } // bind to interfaces if (ipv4) @@ -247,12 +251,12 @@ namespace transport { uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu); if (mtu) - { + { if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE; if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE; i2p::context.SetMTU (mtu, true); - } - } + } + } } if (ipv6) @@ -273,12 +277,12 @@ namespace transport { uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu); if (mtu) - { + { if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE; if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE; i2p::context.SetMTU (mtu, false); - } - } + } + } } bool ygg; i2p::config::GetOption("meshnets.yggdrasil", ygg); @@ -299,9 +303,12 @@ namespace transport if (m_SSU2Server) m_SSU2Server->Start (); if (m_SSU2Server) DetectExternalIP (); - m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); + m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5 * SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); + m_UpdateBandwidthTimer->expires_from_now (boost::posix_time::seconds(1)); + m_UpdateBandwidthTimer->async_wait (std::bind (&Transports::HandleUpdateBandwidthTimer, this, std::placeholders::_1)); + if (m_IsNAT) { m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); @@ -357,29 +364,44 @@ namespace transport } } - void Transports::UpdateBandwidth () + void Transports::HandleUpdateBandwidthTimer (const boost::system::error_code& ecode) { - uint64_t ts = i2p::util::GetMillisecondsSinceEpoch (); - if (m_LastBandwidthUpdateTime > 0) + if (ecode != boost::asio::error::operation_aborted) { - auto delta = ts - m_LastBandwidthUpdateTime; - if (delta > 0) + uint64_t ts = i2p::util::GetMillisecondsSinceEpoch (); + + // updated every second + m_InBandwidth = m_TotalReceivedBytes - m_LastInBandwidthUpdateBytes; + m_OutBandwidth = m_TotalSentBytes - m_LastOutBandwidthUpdateBytes; + m_TransitBandwidth = m_TotalTransitTransmittedBytes - m_LastTransitBandwidthUpdateBytes; + + m_LastInBandwidthUpdateBytes = m_TotalReceivedBytes; + m_LastOutBandwidthUpdateBytes = m_TotalSentBytes; + m_LastTransitBandwidthUpdateBytes = m_TotalTransitTransmittedBytes; + + // updated every 15 seconds + auto delta = ts - m_LastBandwidth15sUpdateTime; + if (delta > 15 * 1000) { - m_InBandwidth = (m_TotalReceivedBytes - m_LastInBandwidthUpdateBytes)*1000/delta; // per second - m_OutBandwidth = (m_TotalSentBytes - m_LastOutBandwidthUpdateBytes)*1000/delta; // per second - m_TransitBandwidth = (m_TotalTransitTransmittedBytes - m_LastTransitBandwidthUpdateBytes)*1000/delta; + m_InBandwidth15s = (m_TotalReceivedBytes - m_LastInBandwidth15sUpdateBytes) * 1000 / delta; + m_OutBandwidth15s = (m_TotalSentBytes - m_LastOutBandwidth15sUpdateBytes) * 1000 / delta; + m_TransitBandwidth15s = (m_TotalTransitTransmittedBytes - m_LastTransitBandwidth15sUpdateBytes) * 1000 / delta; + + m_LastBandwidth15sUpdateTime = ts; + m_LastInBandwidth15sUpdateBytes = m_TotalReceivedBytes; + m_LastOutBandwidth15sUpdateBytes = m_TotalSentBytes; + m_LastTransitBandwidth15sUpdateBytes = m_TotalTransitTransmittedBytes; } + + m_UpdateBandwidthTimer->expires_from_now (boost::posix_time::seconds(1)); + m_UpdateBandwidthTimer->async_wait (std::bind (&Transports::HandleUpdateBandwidthTimer, this, std::placeholders::_1)); } - m_LastBandwidthUpdateTime = ts; - m_LastInBandwidthUpdateBytes = m_TotalReceivedBytes; - m_LastOutBandwidthUpdateBytes = m_TotalSentBytes; - m_LastTransitBandwidthUpdateBytes = m_TotalTransitTransmittedBytes; } bool Transports::IsBandwidthExceeded () const { auto limit = i2p::context.GetBandwidthLimit() * 1024; // convert to bytes - auto bw = std::max (m_InBandwidth, m_OutBandwidth); + auto bw = std::max (m_InBandwidth15s, m_OutBandwidth15s); return bw > limit; } @@ -745,7 +767,7 @@ namespace transport { it->second.sessions.remove_if ( [](std::shared_ptr session)->bool - { + { return !session || !session->IsEstablished (); }); if (it->second.sessions.empty () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT) @@ -772,13 +794,12 @@ namespace transport ++it; } } - UpdateBandwidth (); // TODO: use separate timer(s) for it bool ipv4Testing = i2p::context.GetStatus () == eRouterStatusTesting; bool ipv6Testing = i2p::context.GetStatusV6 () == eRouterStatusTesting; // if still testing, repeat peer test if (ipv4Testing || ipv6Testing) PeerTest (ipv4Testing, ipv6Testing); - m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(3*SESSION_CREATION_TIMEOUT)); + m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(3 * SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); } } diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 05fc5458..4723b752 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -124,6 +124,9 @@ namespace transport uint32_t GetInBandwidth () const { return m_InBandwidth; }; uint32_t GetOutBandwidth () const { return m_OutBandwidth; }; uint32_t GetTransitBandwidth () const { return m_TransitBandwidth; }; + uint32_t GetInBandwidth15s () const { return m_InBandwidth15s; }; + uint32_t GetOutBandwidth15s () const { return m_OutBandwidth15s; }; + uint32_t GetTransitBandwidth15s () const { return m_TransitBandwidth15s; }; bool IsBandwidthExceeded () const; bool IsTransitBandwidthExceeded () const; size_t GetNumPeers () const { return m_Peers.size (); }; @@ -155,8 +158,8 @@ namespace transport void SetPriority (Peer& peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); void HandlePeerTestTimer (const boost::system::error_code& ecode); + void HandleUpdateBandwidthTimer (const boost::system::error_code& ecode); - void UpdateBandwidth (); void DetectExternalIP (); private: @@ -166,7 +169,7 @@ namespace transport std::thread * m_Thread; boost::asio::io_service * m_Service; boost::asio::io_service::work * m_Work; - boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer; + boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer, * m_UpdateBandwidthTimer; SSU2Server * m_SSU2Server; NTCP2Server * m_NTCP2Server; @@ -176,9 +179,15 @@ namespace transport X25519KeysPairSupplier m_X25519KeysPairSupplier; std::atomic m_TotalSentBytes, m_TotalReceivedBytes, m_TotalTransitTransmittedBytes; - uint32_t m_InBandwidth, m_OutBandwidth, m_TransitBandwidth; // bytes per second + + // Bandwidth per second + uint32_t m_InBandwidth, m_OutBandwidth, m_TransitBandwidth; uint64_t m_LastInBandwidthUpdateBytes, m_LastOutBandwidthUpdateBytes, m_LastTransitBandwidthUpdateBytes; - uint64_t m_LastBandwidthUpdateTime; + + // Bandwidth every 15 seconds + uint32_t m_InBandwidth15s, m_OutBandwidth15s, m_TransitBandwidth15s; + uint64_t m_LastInBandwidth15sUpdateBytes, m_LastOutBandwidth15sUpdateBytes, m_LastTransitBandwidth15sUpdateBytes; + uint64_t m_LastBandwidth15sUpdateTime; /** which router families to trust for first hops */ std::vector m_TrustedFamilies;