eliminate extra lookups for sequential fragments

This commit is contained in:
orignal 2021-06-26 07:18:42 -04:00
parent 3c07665479
commit d0c5732e16
2 changed files with 113 additions and 42 deletions

View File

@ -52,10 +52,12 @@ namespace tunnel
bool isFollowOnFragment = flag & 0x80, isLastFragment = true; bool isFollowOnFragment = flag & 0x80, isLastFragment = true;
uint32_t msgID = 0; uint32_t msgID = 0;
int fragmentNum = 0; int fragmentNum = 0;
TunnelMessageBlockEx m; TunnelMessageBlockEx& m = m_CurrentMessage;
if (!isFollowOnFragment) if (!isFollowOnFragment)
{ {
// first fragment // first fragment
if (m_CurrentMsgID)
AddIncompleteCurrentMessage (); // we have got a new message while previous is not complete
m.deliveryType = (TunnelDeliveryType)((flag >> 5) & 0x03); m.deliveryType = (TunnelDeliveryType)((flag >> 5) & 0x03);
switch (m.deliveryType) switch (m.deliveryType)
@ -81,6 +83,7 @@ namespace tunnel
// Message ID // Message ID
msgID = bufbe32toh (fragment); msgID = bufbe32toh (fragment);
fragment += 4; fragment += 4;
m_CurrentMsgID = msgID;
isLastFragment = false; isLastFragment = false;
} }
} }
@ -96,11 +99,20 @@ namespace tunnel
uint16_t size = bufbe16toh (fragment); uint16_t size = bufbe16toh (fragment);
fragment += 2; fragment += 2;
if (isFollowOnFragment && m_CurrentMsgID && m_CurrentMsgID == msgID &&
m_CurrentMessage.nextFragmentNum == fragmentNum)
{
HandleCurrenMessageFollowOnFragment (fragment, size, isLastFragment);
fragment += size;
continue;
}
msg->offset = fragment - msg->buf; msg->offset = fragment - msg->buf;
msg->len = msg->offset + size; msg->len = msg->offset + size;
if (msg->len > msg->maxLen) if (msg->len > msg->maxLen)
{ {
LogPrint (eLogError, "TunnelMessage: fragment is too long ", (int)size); LogPrint (eLogError, "TunnelMessage: fragment is too long ", (int)size);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
return; return;
} }
if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE) if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE)
@ -114,30 +126,30 @@ namespace tunnel
else else
m.data = msg; m.data = msg;
if (!isFollowOnFragment && isLastFragment) if (!isFollowOnFragment)
HandleNextMessage (m);
else
{ {
if (msgID) // msgID is presented, assume message is fragmented if (isLastFragment)
{ {
if (!isFollowOnFragment) // create new incomlete message HandleNextMessage (m);
{ m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
m.nextFragmentNum = 1; }
m.receiveTime = i2p::util::GetMillisecondsSinceEpoch (); else if (msgID)
auto ret = m_IncompleteMessages.insert (std::pair<uint32_t, TunnelMessageBlockEx>(msgID, m)); {
if (ret.second) m_CurrentMessage.nextFragmentNum = 1;
HandleOutOfSequenceFragments (msgID, ret.first->second); m_CurrentMessage.receiveTime = i2p::util::GetMillisecondsSinceEpoch ();
else HandleOutOfSequenceFragments (msgID, m_CurrentMessage);
LogPrint (eLogError, "TunnelMessage: Incomplete message ", msgID, " already exists");
}
else
{
m.nextFragmentNum = fragmentNum;
HandleFollowOnFragment (msgID, isLastFragment, m);
}
} }
else else
{
LogPrint (eLogError, "TunnelMessage: Message is fragmented, but msgID is not presented"); LogPrint (eLogError, "TunnelMessage: Message is fragmented, but msgID is not presented");
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
}
}
else
{
m.nextFragmentNum = fragmentNum;
HandleFollowOnFragment (msgID, isLastFragment, m);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
} }
fragment += size; fragment += size;
@ -157,17 +169,8 @@ namespace tunnel
auto& msg = it->second; auto& msg = it->second;
if (m.nextFragmentNum == msg.nextFragmentNum) if (m.nextFragmentNum == msg.nextFragmentNum)
{ {
if (msg.data->len + size < I2NP_MAX_MESSAGE_SIZE) // check if message is not too long if (ConcatFollowOnFragment (msg, fragment, size))
{ {
if (msg.data->len + size > msg.data->maxLen)
{
// LogPrint (eLogWarning, "TunnelMessage: I2NP message size ", msg.data->maxLen, " is not enough");
auto newMsg = NewI2NPMessage ();
*newMsg = *(msg.data);
msg.data = newMsg;
}
if (msg.data->Concat (fragment, size) < size) // concatenate fragment
LogPrint (eLogError, "TunnelMessage: I2NP buffer overflow ", msg.data->maxLen);
if (isLastFragment) if (isLastFragment)
{ {
// message complete // message complete
@ -199,9 +202,67 @@ namespace tunnel
} }
} }
bool TunnelEndpoint::ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const
{
if (msg.data->len + size < I2NP_MAX_MESSAGE_SIZE) // check if message is not too long
{
if (msg.data->len + size > msg.data->maxLen)
{
// LogPrint (eLogWarning, "TunnelMessage: I2NP message size ", msg.data->maxLen, " is not enough");
auto newMsg = NewI2NPMessage ();
*newMsg = *(msg.data);
msg.data = newMsg;
}
if (msg.data->Concat (fragment, size) < size) // concatenate fragment
{
LogPrint (eLogError, "TunnelMessage: I2NP buffer overflow ", msg.data->maxLen);
return false;
}
}
else
return false;
return true;
}
void TunnelEndpoint::HandleCurrenMessageFollowOnFragment (const uint8_t * fragment, size_t size, bool isLastFragment)
{
if (ConcatFollowOnFragment (m_CurrentMessage, fragment, size))
{
if (isLastFragment)
{
// message complete
HandleNextMessage (m_CurrentMessage);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
}
else
{
m_CurrentMessage.nextFragmentNum++;
HandleOutOfSequenceFragments (m_CurrentMsgID, m_CurrentMessage);
}
}
else
{
LogPrint (eLogError, "TunnelMessage: Fragment ", m_CurrentMessage.nextFragmentNum, " of message ", m_CurrentMsgID, " exceeds max I2NP message size, message dropped");
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
}
}
void TunnelEndpoint::AddIncompleteCurrentMessage ()
{
if (m_CurrentMsgID)
{
auto ret = m_IncompleteMessages.emplace (m_CurrentMsgID, m_CurrentMessage);
if (!ret.second)
LogPrint (eLogError, "TunnelMessage: Incomplete message ", m_CurrentMsgID, " already exists");
m_CurrentMessage.data = nullptr;
m_CurrentMsgID = 0;
}
}
void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data) void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data)
{ {
if (!m_OutOfSequenceFragments.insert ({{msgID, fragmentNum}, {isLastFragment, data, i2p::util::GetMillisecondsSinceEpoch () }}).second) if (!m_OutOfSequenceFragments.insert ({(uint64_t)msgID << 32 | fragmentNum,
{isLastFragment, data, i2p::util::GetMillisecondsSinceEpoch () }}).second)
LogPrint (eLogInfo, "TunnelMessage: duplicate out-of-sequence fragment ", fragmentNum, " of message ", msgID); LogPrint (eLogInfo, "TunnelMessage: duplicate out-of-sequence fragment ", fragmentNum, " of message ", msgID);
} }
@ -212,7 +273,13 @@ namespace tunnel
if (!msg.nextFragmentNum) // message complete if (!msg.nextFragmentNum) // message complete
{ {
HandleNextMessage (msg); HandleNextMessage (msg);
m_IncompleteMessages.erase (msgID); if (&msg == &m_CurrentMessage)
{
m_CurrentMsgID = 0;
m_CurrentMessage.data = nullptr;
}
else
m_IncompleteMessages.erase (msgID);
break; break;
} }
} }
@ -220,7 +287,7 @@ namespace tunnel
bool TunnelEndpoint::ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg) bool TunnelEndpoint::ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg)
{ {
auto it = m_OutOfSequenceFragments.find ({msgID, msg.nextFragmentNum}); auto it = m_OutOfSequenceFragments.find ((uint64_t)msgID << 32 | msg.nextFragmentNum);
if (it != m_OutOfSequenceFragments.end ()) if (it != m_OutOfSequenceFragments.end ())
{ {
LogPrint (eLogDebug, "TunnelMessage: Out-of-sequence fragment ", (int)msg.nextFragmentNum, " of message ", msgID, " found"); LogPrint (eLogDebug, "TunnelMessage: Out-of-sequence fragment ", (int)msg.nextFragmentNum, " of message ", msgID, " found");

View File

@ -10,7 +10,6 @@
#define TUNNEL_ENDPOINT_H__ #define TUNNEL_ENDPOINT_H__
#include <inttypes.h> #include <inttypes.h>
#include <map>
#include <unordered_map> #include <unordered_map>
#include <string> #include <string>
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
@ -37,7 +36,7 @@ namespace tunnel
public: public:
TunnelEndpoint (bool isInbound): m_IsInbound (isInbound), m_NumReceivedBytes (0) {}; TunnelEndpoint (bool isInbound): m_IsInbound (isInbound), m_NumReceivedBytes (0), m_CurrentMsgID (0) {};
~TunnelEndpoint (); ~TunnelEndpoint ();
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
void Cleanup (); void Cleanup ();
@ -47,18 +46,23 @@ namespace tunnel
private: private:
void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m); void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m);
bool ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const; // true if success
void HandleCurrenMessageFollowOnFragment (const uint8_t * frgament, size_t size, bool isLastFragment);
void HandleNextMessage (const TunnelMessageBlock& msg); void HandleNextMessage (const TunnelMessageBlock& msg);
void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data); void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data);
bool ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); // true if something added bool ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); // true if something added
void HandleOutOfSequenceFragments (uint32_t msgID, TunnelMessageBlockEx& msg); void HandleOutOfSequenceFragments (uint32_t msgID, TunnelMessageBlockEx& msg);
void AddIncompleteCurrentMessage ();
private: private:
std::unordered_map<uint32_t, TunnelMessageBlockEx> m_IncompleteMessages; std::unordered_map<uint32_t, TunnelMessageBlockEx> m_IncompleteMessages;
std::map<std::pair<uint32_t, uint8_t>, Fragment> m_OutOfSequenceFragments; // (msgID, fragment#)->fragment std::unordered_map<uint64_t, Fragment> m_OutOfSequenceFragments; // ((msgID << 8) + fragment#)->fragment
bool m_IsInbound; bool m_IsInbound;
size_t m_NumReceivedBytes; size_t m_NumReceivedBytes;
TunnelMessageBlockEx m_CurrentMessage;
uint32_t m_CurrentMsgID;
}; };
} }
} }