mirror of
https://github.com/PurpleI2P/i2pd
synced 2024-11-10 00:00:29 +03:00
drop duplicated I2NP messages
This commit is contained in:
parent
0d3ede56cb
commit
55704ece3a
@ -221,6 +221,7 @@ namespace transport
|
|||||||
m_IncompleteMessages.clear ();
|
m_IncompleteMessages.clear ();
|
||||||
m_RelaySessions.clear ();
|
m_RelaySessions.clear ();
|
||||||
m_PeerTests.clear ();
|
m_PeerTests.clear ();
|
||||||
|
m_ReceivedI2NPMsgIDs.clear ();
|
||||||
m_Server.RemoveSession (m_SourceConnID);
|
m_Server.RemoveSession (m_SourceConnID);
|
||||||
transports.PeerDisconnected (shared_from_this ());
|
transports.PeerDisconnected (shared_from_this ());
|
||||||
LogPrint (eLogDebug, "SSU2: Session terminated");
|
LogPrint (eLogDebug, "SSU2: Session terminated");
|
||||||
@ -1450,7 +1451,7 @@ namespace transport
|
|||||||
nextMsg->len = nextMsg->offset + size + 7; // 7 more bytes for full I2NP header
|
nextMsg->len = nextMsg->offset + size + 7; // 7 more bytes for full I2NP header
|
||||||
memcpy (nextMsg->GetNTCP2Header (), buf + offset, size);
|
memcpy (nextMsg->GetNTCP2Header (), buf + offset, size);
|
||||||
nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2
|
nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2
|
||||||
m_Handler.PutNextMessage (std::move (nextMsg));
|
HandleI2NPMsg (std::move (nextMsg));
|
||||||
m_IsDataReceived = true;
|
m_IsDataReceived = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1738,7 +1739,7 @@ namespace transport
|
|||||||
{
|
{
|
||||||
// we have all follow-on fragments already
|
// we have all follow-on fragments already
|
||||||
m->msg->FromNTCP2 ();
|
m->msg->FromNTCP2 ();
|
||||||
m_Handler.PutNextMessage (std::move (m->msg));
|
HandleI2NPMsg (std::move (m->msg));
|
||||||
m_IncompleteMessages.erase (it);
|
m_IncompleteMessages.erase (it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1760,14 +1761,14 @@ namespace transport
|
|||||||
if (isLast)
|
if (isLast)
|
||||||
{
|
{
|
||||||
it->second->msg->FromNTCP2 ();
|
it->second->msg->FromNTCP2 ();
|
||||||
m_Handler.PutNextMessage (std::move (it->second->msg));
|
HandleI2NPMsg (std::move (it->second->msg));
|
||||||
m_IncompleteMessages.erase (it);
|
m_IncompleteMessages.erase (it);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (ConcatOutOfSequenceFragments (it->second))
|
if (ConcatOutOfSequenceFragments (it->second))
|
||||||
{
|
{
|
||||||
m_Handler.PutNextMessage (std::move (it->second->msg));
|
HandleI2NPMsg (std::move (it->second->msg));
|
||||||
m_IncompleteMessages.erase (it);
|
m_IncompleteMessages.erase (it);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -2268,6 +2269,25 @@ namespace transport
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SSU2Session::HandleI2NPMsg (std::shared_ptr<I2NPMessage>&& msg)
|
||||||
|
{
|
||||||
|
if (!msg) return;
|
||||||
|
int32_t msgID = msg->GetMsgID ();
|
||||||
|
if (!m_ReceivedI2NPMsgIDs.count (msgID))
|
||||||
|
{
|
||||||
|
if (!msg->IsExpired ())
|
||||||
|
{
|
||||||
|
// m_LastActivityTimestamp is updated in ProcessData before
|
||||||
|
m_ReceivedI2NPMsgIDs.emplace (msgID, (uint32_t)m_LastActivityTimestamp);
|
||||||
|
m_Handler.PutNextMessage (std::move (msg));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
LogPrint (eLogDebug, "SSU2: Message ", msgID, " expired");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
LogPrint (eLogDebug, "SSU2: Message ", msgID, " already received");
|
||||||
|
}
|
||||||
|
|
||||||
bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep)
|
bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep)
|
||||||
{
|
{
|
||||||
if (size < 2) return false;
|
if (size < 2) return false;
|
||||||
@ -2790,6 +2810,20 @@ namespace transport
|
|||||||
else
|
else
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
|
if (m_ReceivedI2NPMsgIDs.size () > SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS || ts > m_LastActivityTimestamp + SSU2_DECAY_INTERVAL)
|
||||||
|
// decay
|
||||||
|
m_ReceivedI2NPMsgIDs.clear ();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// delete old received msgIDs
|
||||||
|
for (auto it = m_ReceivedI2NPMsgIDs.begin (); it != m_ReceivedI2NPMsgIDs.end ();)
|
||||||
|
{
|
||||||
|
if (ts > it->second + SSU2_RECEIVED_I2NP_MSGIDS_CLEANUP_TIMEOUT)
|
||||||
|
it = m_ReceivedI2NPMsgIDs.erase (it);
|
||||||
|
else
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!m_OutOfSequencePackets.empty ())
|
if (!m_OutOfSequencePackets.empty ())
|
||||||
{
|
{
|
||||||
if (m_OutOfSequencePackets.size () > 2*SSU2_MAX_NUM_ACK_RANGES ||
|
if (m_OutOfSequencePackets.size () > 2*SSU2_MAX_NUM_ACK_RANGES ||
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2022, The PurpleI2P Project
|
* Copyright (c) 2022-2023, The PurpleI2P Project
|
||||||
*
|
*
|
||||||
* This file is part of Purple i2pd project and licensed under BSD3
|
* This file is part of Purple i2pd project and licensed under BSD3
|
||||||
*
|
*
|
||||||
@ -39,6 +39,9 @@ namespace transport
|
|||||||
const int SSU2_RESEND_INTERVAL = 300; // in milliseconds
|
const int SSU2_RESEND_INTERVAL = 300; // in milliseconds
|
||||||
const int SSU2_MAX_NUM_RESENDS = 5;
|
const int SSU2_MAX_NUM_RESENDS = 5;
|
||||||
const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
|
const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
|
||||||
|
const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check
|
||||||
|
const int SSU2_RECEIVED_I2NP_MSGIDS_CLEANUP_TIMEOUT = 10; // in seconds
|
||||||
|
const int SSU2_DECAY_INTERVAL = 20; // in seconds
|
||||||
const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets
|
const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets
|
||||||
const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets
|
const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets
|
||||||
const size_t SSU2_MIN_RTO = 100; // in milliseconds
|
const size_t SSU2_MIN_RTO = 100; // in milliseconds
|
||||||
@ -47,7 +50,7 @@ namespace transport
|
|||||||
const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 500; // in messages
|
const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 500; // in messages
|
||||||
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
|
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
|
||||||
const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64;
|
const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64;
|
||||||
|
|
||||||
// flags
|
// flags
|
||||||
const uint8_t SSU2_FLAG_IMMEDIATE_ACK_REQUESTED = 0x01;
|
const uint8_t SSU2_FLAG_IMMEDIATE_ACK_REQUESTED = 0x01;
|
||||||
|
|
||||||
@ -308,7 +311,8 @@ namespace transport
|
|||||||
void HandleRelayIntro (const uint8_t * buf, size_t len, int attempts = 0);
|
void HandleRelayIntro (const uint8_t * buf, size_t len, int attempts = 0);
|
||||||
void HandleRelayResponse (const uint8_t * buf, size_t len);
|
void HandleRelayResponse (const uint8_t * buf, size_t len);
|
||||||
void HandlePeerTest (const uint8_t * buf, size_t len);
|
void HandlePeerTest (const uint8_t * buf, size_t len);
|
||||||
|
void HandleI2NPMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||||
|
|
||||||
size_t CreateAddressBlock (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep);
|
size_t CreateAddressBlock (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep);
|
||||||
size_t CreateRouterInfoBlock (uint8_t * buf, size_t len, std::shared_ptr<const i2p::data::RouterInfo> r);
|
size_t CreateRouterInfoBlock (uint8_t * buf, size_t len, std::shared_ptr<const i2p::data::RouterInfo> r);
|
||||||
size_t CreateAckBlock (uint8_t * buf, size_t len);
|
size_t CreateAckBlock (uint8_t * buf, size_t len);
|
||||||
@ -351,6 +355,7 @@ namespace transport
|
|||||||
SSU2TerminationReason m_TerminationReason;
|
SSU2TerminationReason m_TerminationReason;
|
||||||
size_t m_MaxPayloadSize;
|
size_t m_MaxPayloadSize;
|
||||||
std::unique_ptr<i2p::data::IdentHash> m_PathChallenge;
|
std::unique_ptr<i2p::data::IdentHash> m_PathChallenge;
|
||||||
|
std::unordered_map<uint32_t, uint32_t> m_ReceivedI2NPMsgIDs; // msgID -> timestamp in seconds
|
||||||
};
|
};
|
||||||
|
|
||||||
inline uint64_t CreateHeaderMask (const uint8_t * kh, const uint8_t * nonce)
|
inline uint64_t CreateHeaderMask (const uint8_t * kh, const uint8_t * nonce)
|
||||||
|
Loading…
Reference in New Issue
Block a user