separate thread for processing router's messages

This commit is contained in:
orignal 2023-02-23 13:58:06 -05:00
parent 5657079355
commit 2e62f9532f
4 changed files with 43 additions and 21 deletions

View File

@ -334,10 +334,12 @@ namespace util
} }
} }
LogPrint(eLogInfo, "Daemon: Starting Tunnels"); LogPrint(eLogInfo, "Daemon: Starting Tunnels");
i2p::tunnel::tunnels.Start(); i2p::tunnel::tunnels.Start();
LogPrint(eLogInfo, "Daemon: Starting Router context");
i2p::context.Start();
LogPrint(eLogInfo, "Daemon: Starting Client"); LogPrint(eLogInfo, "Daemon: Starting Client");
i2p::client::context.Start (); i2p::client::context.Start ();
@ -366,6 +368,8 @@ namespace util
LogPrint(eLogInfo, "Daemon: Shutting down"); LogPrint(eLogInfo, "Daemon: Shutting down");
LogPrint(eLogInfo, "Daemon: Stopping Client"); LogPrint(eLogInfo, "Daemon: Stopping Client");
i2p::client::context.Stop(); i2p::client::context.Stop();
LogPrint(eLogInfo, "Daemon: Stopping Router context");
i2p::context.Stop();
LogPrint(eLogInfo, "Daemon: Stopping Tunnels"); LogPrint(eLogInfo, "Daemon: Stopping Tunnels");
i2p::tunnel::tunnels.Stop(); i2p::tunnel::tunnels.Stop();

View File

@ -26,7 +26,7 @@ namespace i2p
{ {
RouterContext context; RouterContext context;
RouterContext::RouterContext (): RouterContext::RouterContext (): RunnableServiceWithWork ("Router"),
m_LastUpdateTime (0), m_AcceptsTunnels (true), m_IsFloodfill (false), m_LastUpdateTime (0), m_AcceptsTunnels (true), m_IsFloodfill (false),
m_ShareRatio (100), m_Status (eRouterStatusUnknown), m_StatusV6 (eRouterStatusUnknown), m_ShareRatio (100), m_Status (eRouterStatusUnknown), m_StatusV6 (eRouterStatusUnknown),
m_Error (eRouterErrorNone), m_ErrorV6 (eRouterErrorNone), m_NetID (I2PD_NET_ID) m_Error (eRouterErrorNone), m_ErrorV6 (eRouterErrorNone), m_NetID (I2PD_NET_ID)
@ -47,6 +47,16 @@ namespace i2p
m_ECIESSession = std::make_shared<i2p::garlic::RouterIncomingRatchetSession>(m_InitialNoiseState); m_ECIESSession = std::make_shared<i2p::garlic::RouterIncomingRatchetSession>(m_InitialNoiseState);
} }
void RouterContext::Start ()
{
StartIOService ();
}
void RouterContext::Stop ()
{
StopIOService ();
}
void RouterContext::CreateNewRouter () void RouterContext::CreateNewRouter ()
{ {
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519, m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519,
@ -1087,12 +1097,6 @@ namespace i2p
bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID) bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID)
{ {
if (typeID == eI2NPGarlic)
{
// TODO: implement
LogPrint (eLogWarning, "Router: garlic message in garlic clove. Dropped");
return false;
}
auto msg = CreateI2NPMessage (typeID, payload, len, msgID); auto msg = CreateI2NPMessage (typeID, payload, len, msgID);
if (!msg) return false; if (!msg) return false;
i2p::HandleI2NPMessage (msg); i2p::HandleI2NPMessage (msg);
@ -1101,7 +1105,11 @@ namespace i2p
void RouterContext::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg) void RouterContext::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg)
{ {
std::unique_lock<std::mutex> l(m_GarlicMutex); GetIOService ().post (std::bind (&RouterContext::PostGarlicMessage, this, msg));
}
void RouterContext::PostGarlicMessage (std::shared_ptr<I2NPMessage> msg)
{
uint8_t * buf = msg->GetPayload (); uint8_t * buf = msg->GetPayload ();
uint32_t len = bufbe32toh (buf); uint32_t len = bufbe32toh (buf);
if (len > msg->GetLength ()) if (len > msg->GetLength ())
@ -1118,23 +1126,27 @@ namespace i2p
else else
LogPrint (eLogError, "Router: Session is not set for ECIES router"); LogPrint (eLogError, "Router: Session is not set for ECIES router");
} }
} }
void RouterContext::ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg) void RouterContext::ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg)
{ {
if (i2p::data::netdb.GetPublishReplyToken () == bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET)) if (i2p::data::netdb.GetPublishReplyToken () == bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET))
i2p::data::netdb.PostI2NPMsg (msg); i2p::data::netdb.PostI2NPMsg (msg);
else else
{ {
std::unique_lock<std::mutex> l(m_GarlicMutex); GetIOService ().post ([msg, this]()
i2p::garlic::GarlicDestination::ProcessDeliveryStatusMessage (msg); {
this->i2p::garlic::GarlicDestination::ProcessDeliveryStatusMessage (msg);
});
} }
} }
void RouterContext::CleanupDestination () void RouterContext::CleanupDestination ()
{ {
std::unique_lock<std::mutex> l(m_GarlicMutex); GetIOService ().post ([this]()
i2p::garlic::GarlicDestination::CleanupExpiredTags (); {
this->i2p::garlic::GarlicDestination::CleanupExpiredTags ();
});
} }
uint32_t RouterContext::GetUptime () const uint32_t RouterContext::GetUptime () const

View File

@ -12,12 +12,12 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <mutex>
#include <chrono> #include <chrono>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "Identity.h" #include "Identity.h"
#include "RouterInfo.h" #include "RouterInfo.h"
#include "Garlic.h" #include "Garlic.h"
#include "util.h"
namespace i2p namespace i2p
{ {
@ -52,7 +52,7 @@ namespace garlic
eRouterErrorNoDescriptors = 5 eRouterErrorNoDescriptors = 5
}; };
class RouterContext: public i2p::garlic::GarlicDestination class RouterContext: public i2p::garlic::GarlicDestination, private i2p::util::RunnableServiceWithWork
{ {
private: private:
@ -74,7 +74,9 @@ namespace garlic
RouterContext (); RouterContext ();
void Init (); void Init ();
void Start ();
void Stop ();
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
i2p::data::LocalRouterInfo& GetRouterInfo () { return m_RouterInfo; }; i2p::data::LocalRouterInfo& GetRouterInfo () { return m_RouterInfo; };
std::shared_ptr<i2p::data::RouterInfo> GetSharedRouterInfo () std::shared_ptr<i2p::data::RouterInfo> GetSharedRouterInfo ()
@ -183,7 +185,8 @@ namespace garlic
void PublishNTCP2Address (std::shared_ptr<i2p::data::RouterInfo::Address> address, int port, bool publish) const; void PublishNTCP2Address (std::shared_ptr<i2p::data::RouterInfo::Address> address, int port, bool publish) const;
bool DecryptECIESTunnelBuildRecord (const uint8_t * encrypted, uint8_t * data, size_t clearTextSize); bool DecryptECIESTunnelBuildRecord (const uint8_t * encrypted, uint8_t * data, size_t clearTextSize);
void PostGarlicMessage (std::shared_ptr<I2NPMessage> msg);
private: private:
i2p::data::LocalRouterInfo m_RouterInfo; i2p::data::LocalRouterInfo m_RouterInfo;
@ -198,7 +201,6 @@ namespace garlic
RouterStatus m_Status, m_StatusV6; RouterStatus m_Status, m_StatusV6;
RouterError m_Error, m_ErrorV6; RouterError m_Error, m_ErrorV6;
int m_NetID; int m_NetID;
std::mutex m_GarlicMutex;
std::unique_ptr<NTCP2PrivateKeys> m_NTCP2Keys; std::unique_ptr<NTCP2PrivateKeys> m_NTCP2Keys;
std::unique_ptr<SSU2PrivateKeys> m_SSU2Keys; std::unique_ptr<SSU2PrivateKeys> m_SSU2Keys;
std::unique_ptr<i2p::crypto::X25519Keys> m_NTCP2StaticKeys, m_SSU2StaticKeys; std::unique_ptr<i2p::crypto::X25519Keys> m_NTCP2StaticKeys, m_SSU2StaticKeys;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2023, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -67,11 +67,15 @@ namespace api
i2p::transport::transports.Start(); i2p::transport::transports.Start();
LogPrint(eLogInfo, "API: Starting Tunnels"); LogPrint(eLogInfo, "API: Starting Tunnels");
i2p::tunnel::tunnels.Start(); i2p::tunnel::tunnels.Start();
LogPrint(eLogInfo, "API: Starting Router context");
i2p::context.Start();
} }
void StopI2P () void StopI2P ()
{ {
LogPrint(eLogInfo, "API: Shutting down"); LogPrint(eLogInfo, "API: Shutting down");
LogPrint(eLogInfo, "API: Stopping Router context");
i2p::context.Stop();
LogPrint(eLogInfo, "API: Stopping Tunnels"); LogPrint(eLogInfo, "API: Stopping Tunnels");
i2p::tunnel::tunnels.Stop(); i2p::tunnel::tunnels.Stop();
LogPrint(eLogInfo, "API: Stopping Transports"); LogPrint(eLogInfo, "API: Stopping Transports");