From aa3a93b6a09943596a7e28d2d5976d0e8c8477ce Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 28 Jul 2016 11:16:29 -0400 Subject: [PATCH] implement streaming limiting (initial) --- ClientContext.cpp | 2 ++ I2PTunnel.cpp | 4 +-- I2PTunnel.h | 6 ++-- Streaming.cpp | 90 ++++++++++++++++++++++++++++++++++++++++++++--- Streaming.h | 37 ++++++++++++++++++- 5 files changed, 129 insertions(+), 10 deletions(-) diff --git a/ClientContext.cpp b/ClientContext.cpp index bf0465e1..623abab8 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -437,6 +437,8 @@ namespace client std::unique_ptr(serverTunnel))).second) { serverTunnel->Start (); + auto maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN); + serverTunnel->SetMaxConnsPerMinute(maxConns); numServerTunnels++; } else diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index d680fee9..87764a84 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -45,7 +45,7 @@ namespace client I2PTunnelConnection::~I2PTunnelConnection () { } - + void I2PTunnelConnection::I2PConnect (const uint8_t * msg, size_t len) { if (m_Stream) @@ -396,7 +396,7 @@ namespace client { m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip); } - + void I2PServerTunnel::Start () { m_Endpoint.port (m_Port); diff --git a/I2PTunnel.h b/I2PTunnel.h index bec0f9a4..f0332722 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -148,8 +148,10 @@ namespace client const boost::asio::ip::tcp::endpoint& GetEndpoint () const { return m_Endpoint; } const char* GetName() { return m_Name.c_str (); } - - private: + + void SetMaxConnsPerMinute(const uint32_t conns) { m_PortDestination->SetMaxConnsPerMinute(conns); } + + private: void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, std::shared_ptr resolver); diff --git a/Streaming.cpp b/Streaming.cpp index 9dfef0fe..887bdecc 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -797,7 +797,10 @@ namespace stream StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), - m_PendingIncomingTimer (m_Owner->GetService ()) + m_PendingIncomingTimer (m_Owner->GetService ()), + m_ConnTrackTimer(m_Owner->GetService()), + m_ConnsPerMinute(DEFAULT_MAX_CONNS_PER_MIN), + m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch()) { } @@ -812,17 +815,23 @@ namespace stream } void StreamingDestination::Start () - { + { + ScheduleConnTrack(); } void StreamingDestination::Stop () { ResetAcceptor (); m_PendingIncomingTimer.cancel (); + m_ConnTrackTimer.cancel(); { std::unique_lock l(m_StreamsMutex); m_Streams.clear (); - } + } + { + std::unique_lock l(m_ConnsMutex); + m_Conns.clear (); + } } void StreamingDestination::HandleNextPacket (Packet * packet) @@ -856,7 +865,22 @@ namespace stream incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); } - } + } + auto ident = incomingStream->GetRemoteIdentity(); + if(ident) + { + auto ih = ident->GetIdentHash(); + if(DropNewStream(ih)) + { + // drop + LogPrint(eLogWarning, "Streaming: Too many inbound streams from ", ih.ToBase32()); + DeleteStream(incomingStream); + incomingStream = nullptr; + delete packet; + return; + } + } else + LogPrint(eLogWarning, "Streaming: Inbound stream has no identity"); // accept if (m_Acceptor != nullptr) m_Acceptor (incomingStream); @@ -1009,6 +1033,62 @@ namespace stream else msg = nullptr; return msg; - } + } + + void StreamingDestination::SetMaxConnsPerMinute(const uint32_t conns) + { + m_ConnsPerMinute = conns; + } + + bool StreamingDestination::DropNewStream(const i2p::data::IdentHash & ih) + { + std::lock_guard lock(m_ConnsMutex); + if (m_Banned.size() > MAX_BANNED_CONNS) return true; // overload + auto end = m_Banned.end(); + if ( std::find(m_Banned.begin(), end, ih) != end) return true; // already banned + auto itr = m_Conns.find(ih); + if (itr == m_Conns.end()) + m_Conns[ih] = 0; + + m_Conns[ih] = m_Conns[ih] + 1; + + bool ban = m_Conns[ih] <= m_ConnsPerMinute; + if (ban) + { + m_Banned.push_back(ih); + m_Conns.erase(ih); + } + return ban; + } + + void StreamingDestination::HandleConnTrack(const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + { // acquire lock + std::lock_guard lock(m_ConnsMutex); + // clear conn tracking + m_Conns.clear(); + // check for ban clear + auto ts = i2p::util::GetMillisecondsSinceEpoch(); + if (ts - m_LastBanClear >= DEFAULT_BAN_INTERVAL) + { + // clear bans + m_Banned.clear(); + m_LastBanClear = ts; + } + } + // reschedule timer + ScheduleConnTrack(); + } + } + + void StreamingDestination::ScheduleConnTrack() + { + m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60)); + m_ConnTrackTimer.async_wait ( + std::bind (&StreamingDestination::HandleConnTrack, + shared_from_this (), std::placeholders::_1)); + } } } diff --git a/Streaming.h b/Streaming.h index c29b62f9..d21dc16b 100644 --- a/Streaming.h +++ b/Streaming.h @@ -51,6 +51,22 @@ namespace stream const int INITIAL_RTO = 9000; // in milliseconds const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const int PENDING_INCOMING_TIMEOUT = 10; // in seconds + + /** i2cp option for limiting inbound stremaing connections */ + const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "i2p.streaming.maxConnsPerMinute"; + /** default maximum connections attempts per minute per destination */ + const uint32_t DEFAULT_MAX_CONNS_PER_MIN = 600; + + /** + * max banned destinations per local destination + * TODO: make configurable + */ + const uint16_t MAX_BANNED_CONNS = 9999; + /** + * length of a ban in ms + * TODO: make configurable + */ + const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000; struct Packet { @@ -210,12 +226,22 @@ namespace stream void HandleDataMessagePayload (const uint8_t * buf, size_t len); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort); + /** set max connections per minute per destination */ + void SetMaxConnsPerMinute(const uint32_t conns); + private: void HandleNextPacket (Packet * packet); std::shared_ptr CreateNewIncomingStream (); void HandlePendingIncomingTimer (const boost::system::error_code& ecode); + /** handle cleaning up connection tracking for ratelimits */ + void HandleConnTrack(const boost::system::error_code& ecode); + + bool DropNewStream(const i2p::data::IdentHash & ident); + + void ScheduleConnTrack(); + private: std::shared_ptr m_Owner; @@ -227,7 +253,16 @@ namespace stream std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN - + + std::mutex m_ConnsMutex; + /** how many connections per minute did each identity have */ + std::map m_Conns; + boost::asio::deadline_timer m_ConnTrackTimer; + uint32_t m_ConnsPerMinute; + /** banned identities */ + std::vector m_Banned; + uint64_t m_LastBanClear; + public: i2p::data::GzipInflator m_Inflator;