diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index cf2eafde..38d52012 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -221,6 +221,7 @@ namespace transport m_IncompleteMessages.clear (); m_RelaySessions.clear (); m_PeerTests.clear (); + m_ReceivedI2NPMsgIDs.clear (); m_Server.RemoveSession (m_SourceConnID); transports.PeerDisconnected (shared_from_this ()); LogPrint (eLogDebug, "SSU2: Session terminated"); @@ -1450,7 +1451,7 @@ namespace transport nextMsg->len = nextMsg->offset + size + 7; // 7 more bytes for full I2NP header memcpy (nextMsg->GetNTCP2Header (), buf + offset, size); nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2 - m_Handler.PutNextMessage (std::move (nextMsg)); + HandleI2NPMsg (std::move (nextMsg)); m_IsDataReceived = true; break; } @@ -1738,7 +1739,7 @@ namespace transport { // we have all follow-on fragments already m->msg->FromNTCP2 (); - m_Handler.PutNextMessage (std::move (m->msg)); + HandleI2NPMsg (std::move (m->msg)); m_IncompleteMessages.erase (it); } } @@ -1760,14 +1761,14 @@ namespace transport if (isLast) { it->second->msg->FromNTCP2 (); - m_Handler.PutNextMessage (std::move (it->second->msg)); + HandleI2NPMsg (std::move (it->second->msg)); m_IncompleteMessages.erase (it); } else { if (ConcatOutOfSequenceFragments (it->second)) { - m_Handler.PutNextMessage (std::move (it->second->msg)); + HandleI2NPMsg (std::move (it->second->msg)); m_IncompleteMessages.erase (it); } else @@ -2268,6 +2269,25 @@ namespace transport } } + void SSU2Session::HandleI2NPMsg (std::shared_ptr&& msg) + { + if (!msg) return; + int32_t msgID = msg->GetMsgID (); + if (!m_ReceivedI2NPMsgIDs.count (msgID)) + { + if (!msg->IsExpired ()) + { + // m_LastActivityTimestamp is updated in ProcessData before + m_ReceivedI2NPMsgIDs.emplace (msgID, (uint32_t)m_LastActivityTimestamp); + m_Handler.PutNextMessage (std::move (msg)); + } + else + LogPrint (eLogDebug, "SSU2: Message ", msgID, " expired"); + } + else + LogPrint (eLogDebug, "SSU2: Message ", msgID, " already received"); + } + bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep) { if (size < 2) return false; @@ -2790,6 +2810,20 @@ namespace transport else ++it; } + if (m_ReceivedI2NPMsgIDs.size () > SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS || ts > m_LastActivityTimestamp + SSU2_DECAY_INTERVAL) + // decay + m_ReceivedI2NPMsgIDs.clear (); + else + { + // delete old received msgIDs + for (auto it = m_ReceivedI2NPMsgIDs.begin (); it != m_ReceivedI2NPMsgIDs.end ();) + { + if (ts > it->second + SSU2_RECEIVED_I2NP_MSGIDS_CLEANUP_TIMEOUT) + it = m_ReceivedI2NPMsgIDs.erase (it); + else + ++it; + } + } if (!m_OutOfSequencePackets.empty ()) { if (m_OutOfSequencePackets.size () > 2*SSU2_MAX_NUM_ACK_RANGES || diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index aab02127..b1cc234f 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022, The PurpleI2P Project +* Copyright (c) 2022-2023, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -39,6 +39,9 @@ namespace transport const int SSU2_RESEND_INTERVAL = 300; // in milliseconds const int SSU2_MAX_NUM_RESENDS = 5; const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds + const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check + const int SSU2_RECEIVED_I2NP_MSGIDS_CLEANUP_TIMEOUT = 10; // in seconds + const int SSU2_DECAY_INTERVAL = 20; // in seconds const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets const size_t SSU2_MIN_RTO = 100; // in milliseconds @@ -47,7 +50,7 @@ namespace transport const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 500; // in messages const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64; - + // flags const uint8_t SSU2_FLAG_IMMEDIATE_ACK_REQUESTED = 0x01; @@ -308,7 +311,8 @@ namespace transport void HandleRelayIntro (const uint8_t * buf, size_t len, int attempts = 0); void HandleRelayResponse (const uint8_t * buf, size_t len); void HandlePeerTest (const uint8_t * buf, size_t len); - + void HandleI2NPMsg (std::shared_ptr&& msg); + size_t CreateAddressBlock (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep); size_t CreateRouterInfoBlock (uint8_t * buf, size_t len, std::shared_ptr r); size_t CreateAckBlock (uint8_t * buf, size_t len); @@ -351,6 +355,7 @@ namespace transport SSU2TerminationReason m_TerminationReason; size_t m_MaxPayloadSize; std::unique_ptr m_PathChallenge; + std::unordered_map m_ReceivedI2NPMsgIDs; // msgID -> timestamp in seconds }; inline uint64_t CreateHeaderMask (const uint8_t * kh, const uint8_t * nonce)