From 91fdb038d90a424a6631289148bda833135bb365 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 11:38:48 -0500 Subject: [PATCH 1/8] add threadpool for ntcp dh --- libi2pd/CryptoWorker.h | 82 +++++++++++++++++++++++++++++++++++++++++ libi2pd/NTCPSession.cpp | 21 +++-------- libi2pd/NTCPSession.h | 12 +++++- 3 files changed, 98 insertions(+), 17 deletions(-) create mode 100644 libi2pd/CryptoWorker.h diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h new file mode 100644 index 00000000..f310032f --- /dev/null +++ b/libi2pd/CryptoWorker.h @@ -0,0 +1,82 @@ +#ifndef CRYPTO_WORKER_H_ +#define CRYPTO_WORKER_H_ + +#include +#include +#include +#include +#include +#include +#include + +namespace i2p +{ +namespace worker +{ + template + struct ThreadPool + { + typedef std::function ResultFunc; + typedef std::function WorkFunc; + typedef std::pair, WorkFunc> Job; + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + typedef std::condition_variable cond_t; + ThreadPool(int workers) + { + stop = false; + if(workers > 0) + { + while(workers--) + { + threads.emplace_back([this] { + Job job; + for (;;) + { + { + lock_t lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->jobs.empty(); }); + if (this->stop && this->jobs.empty()) return; + job = std::move(this->jobs.front()); + this->jobs.pop_front(); + } + } + job.first->GetService().post(job.second()); + }); + } + } + }; + + void Offer(const Job & job) + { + { + lock_t lock(queue_mutex); + if (stop) return; + jobs.emplace_back(job); + } + condition.notify_one(); + } + + ~ThreadPool() + { + { + lock_t lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(auto &t: threads) t.join(); + } + + std::vector threads; + std::deque jobs; + mtx_t queue_mutex; + cond_t condition; + bool stop; + + }; +} +} + + +#endif diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 85c32743..653c24e5 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -171,27 +171,14 @@ namespace transport return; } } -#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__) -// due the bug in gcc 4.7. std::shared_future.get() is not const - if (!m_DHKeysPair) - m_DHKeysPair = transports.GetNextDHKeysPair (); - CreateAESKey (m_Establisher->phase1.pubKey); - SendPhase2 (); -#else // TODO: check for number of pending keys auto s = shared_from_this (); - auto keyCreated = std::async (std::launch::async, [s] () - { + m_Server.Work(s, [s]() -> std::function { if (!s->m_DHKeysPair) s->m_DHKeysPair = transports.GetNextDHKeysPair (); s->CreateAESKey (s->m_Establisher->phase1.pubKey); - }).share (); - m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); - s->SendPhase2 (); + return std::bind(&NTCPSession::SendPhase2, s); }); -#endif } } @@ -788,12 +775,14 @@ namespace transport } //----------------------------------------- - NTCPServer::NTCPServer (): + NTCPServer::NTCPServer (int workers): m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr), m_SoftLimit(0), m_HardLimit(0) { + if(workers <= 0) workers = 1; + m_CryptoPool = std::make_shared(workers); } NTCPServer::~NTCPServer () diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index b64f63aa..12ca92fc 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -12,6 +12,7 @@ #include "RouterInfo.h" #include "I2NPProtocol.h" #include "TransportSession.h" +#include "CryptoWorker.h" namespace i2p { @@ -131,6 +132,8 @@ namespace transport { public: + typedef i2p::worker::ThreadPool Pool; + enum RemoteAddressType { eIP4Address, @@ -146,7 +149,7 @@ namespace transport }; - NTCPServer (); + NTCPServer (int workers=4); ~NTCPServer (); void Start (); @@ -193,6 +196,11 @@ namespace transport void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); + void Work(std::shared_ptr conn, Pool::WorkFunc work) + { + m_CryptoPool->Offer({conn, work}); + } + private: bool m_IsRunning; @@ -210,6 +218,8 @@ namespace transport boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; + std::shared_ptr m_CryptoPool; + uint16_t m_SoftLimit, m_HardLimit; public: From 8d7fde0287a064877401c674dcf2c3cffb180b4f Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 11:43:13 -0500 Subject: [PATCH 2/8] more --- libi2pd/NTCPSession.cpp | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 653c24e5..549bb2fe 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -196,7 +196,7 @@ namespace transport m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - + m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); @@ -237,24 +237,11 @@ namespace transport } else { -#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__) -// due the bug in gcc 4.7. std::shared_future.get() is not const - CreateAESKey (m_Establisher->phase2.pubKey); - HandlePhase2 (); -#else auto s = shared_from_this (); - // create AES key in separate thread - auto keyCreated = std::async (std::launch::async, [s] () - { - s->CreateAESKey (s->m_Establisher->phase2.pubKey); - }).share (); // TODO: use move capture in C++ 14 instead shared_future - // let other operations execute while a key gets created - m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); // we might wait if no more pending operations - s->HandlePhase2 (); - }); -#endif + m_Server.Work(s, [s]() -> std::function { + s->CreateAESKey (s->m_Establisher->phase2.pubKey); + return std::bind(&NTCPSession::HandlePhase2, s); + }); } } From f2e6fad104901e4e8514cbfeda3a565add845066 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 12:40:28 -0500 Subject: [PATCH 3/8] make it work --- libi2pd/CryptoWorker.h | 8 ++++---- libi2pd/NTCPSession.cpp | 15 +++++++-------- libi2pd/NTCPSession.h | 15 ++++++++------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h index f310032f..3188fa03 100644 --- a/libi2pd/CryptoWorker.h +++ b/libi2pd/CryptoWorker.h @@ -6,7 +6,6 @@ #include #include #include -#include #include namespace i2p @@ -17,7 +16,7 @@ namespace worker struct ThreadPool { typedef std::function ResultFunc; - typedef std::function WorkFunc; + typedef std::function WorkFunc; typedef std::pair, WorkFunc> Job; typedef std::mutex mtx_t; typedef std::unique_lock lock_t; @@ -30,9 +29,9 @@ namespace worker while(workers--) { threads.emplace_back([this] { - Job job; for (;;) { + Job job; { lock_t lock(this->queue_mutex); this->condition.wait( @@ -41,8 +40,9 @@ namespace worker job = std::move(this->jobs.front()); this->jobs.pop_front(); } + ResultFunc result = job.second(); + job.first->GetService().post(result); } - job.first->GetService().post(job.second()); }); } } diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 549bb2fe..ba657bed 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -105,6 +105,11 @@ namespace transport transports.PeerConnected (shared_from_this ()); } + boost::asio::io_service & NTCPSession::GetService() + { + return m_Server.GetService(); + } + void NTCPSession::ClientLogin () { if (!m_DHKeysPair) @@ -196,11 +201,8 @@ namespace transport m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); - boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); - + boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); } void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) @@ -249,7 +251,6 @@ namespace transport { m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240); m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); // verify uint8_t xy[512]; @@ -289,7 +290,6 @@ namespace transport buf += paddingSize; len += paddingSize; } - SignedData s; s.Insert (m_Establisher->phase1.pubKey, 256); // x s.Insert (m_Establisher->phase2.pubKey, 256); // y @@ -297,10 +297,9 @@ namespace transport s.Insert (tsA); // tsA s.Insert (m_Establisher->phase2.encrypted.timestamp, 4); // tsB s.Sign (keys, buf); - m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); + std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); } void NTCPSession::HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA) diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index 12ca92fc..fae9874e 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -56,6 +56,7 @@ namespace transport void Done (); boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; + boost::asio::io_service & GetService(); bool IsEstablished () const { return m_IsEstablished; }; bool IsTerminated () const { return m_IsTerminated; }; @@ -101,7 +102,7 @@ namespace transport void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); private: - + NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; @@ -172,6 +173,11 @@ namespace transport void SetSessionLimits(uint16_t softLimit, uint16_t hardLimit) { m_SoftLimit = softLimit; m_HardLimit = hardLimit; } bool ShouldLimit() const { return ShouldHardLimit() || ShouldSoftLimit(); } + void Work(std::shared_ptr conn, Pool::WorkFunc work) + { + m_CryptoPool->Offer({conn, work}); + } + private: /** @brief return true for hard limit */ @@ -196,13 +202,8 @@ namespace transport void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); - void Work(std::shared_ptr conn, Pool::WorkFunc work) - { - m_CryptoPool->Offer({conn, work}); - } - private: - + bool m_IsRunning; std::thread * m_Thread; boost::asio::io_service m_Service; From cd59ca8376138081c265ab721bd4e8da4ba634f3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 12:59:39 -0500 Subject: [PATCH 4/8] whitespace cleanup --- libi2pd/CryptoWorker.h | 119 ++++++++++++++++++++-------------------- libi2pd/NTCPSession.cpp | 7 ++- libi2pd/NTCPSession.h | 11 ++-- 3 files changed, 68 insertions(+), 69 deletions(-) diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h index 3188fa03..d43e356c 100644 --- a/libi2pd/CryptoWorker.h +++ b/libi2pd/CryptoWorker.h @@ -12,69 +12,68 @@ namespace i2p { namespace worker { - template - struct ThreadPool - { - typedef std::function ResultFunc; - typedef std::function WorkFunc; - typedef std::pair, WorkFunc> Job; - typedef std::mutex mtx_t; - typedef std::unique_lock lock_t; - typedef std::condition_variable cond_t; - ThreadPool(int workers) - { - stop = false; - if(workers > 0) - { - while(workers--) - { - threads.emplace_back([this] { - for (;;) - { - Job job; - { - lock_t lock(this->queue_mutex); - this->condition.wait( - lock, [this] { return this->stop || !this->jobs.empty(); }); - if (this->stop && this->jobs.empty()) return; - job = std::move(this->jobs.front()); - this->jobs.pop_front(); - } - ResultFunc result = job.second(); - job.first->GetService().post(result); - } - }); - } - } - }; + template + struct ThreadPool + { + typedef std::function ResultFunc; + typedef std::function WorkFunc; + typedef std::pair, WorkFunc> Job; + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + typedef std::condition_variable cond_t; + ThreadPool(int workers) + { + stop = false; + if(workers > 0) + { + while(workers--) + { + threads.emplace_back([this] { + for (;;) + { + Job job; + { + lock_t lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->jobs.empty(); }); + if (this->stop && this->jobs.empty()) return; + job = std::move(this->jobs.front()); + this->jobs.pop_front(); + } + ResultFunc result = job.second(); + job.first->GetService().post(result); + } + }); + } + } + }; - void Offer(const Job & job) - { - { - lock_t lock(queue_mutex); - if (stop) return; - jobs.emplace_back(job); - } - condition.notify_one(); - } + void Offer(const Job & job) + { + { + lock_t lock(queue_mutex); + if (stop) return; + jobs.emplace_back(job); + } + condition.notify_one(); + } - ~ThreadPool() - { - { - lock_t lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(auto &t: threads) t.join(); - } + ~ThreadPool() + { + { + lock_t lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(auto &t: threads) t.join(); + } - std::vector threads; - std::deque jobs; - mtx_t queue_mutex; - cond_t condition; - bool stop; - - }; + std::vector threads; + std::deque jobs; + mtx_t queue_mutex; + cond_t condition; + bool stop; + }; } } diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index ba657bed..b7268818 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -109,7 +109,7 @@ namespace transport { return m_Server.GetService(); } - + void NTCPSession::ClientLogin () { if (!m_DHKeysPair) @@ -202,7 +202,8 @@ namespace transport m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); - boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); + boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), + std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); } void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) @@ -299,7 +300,7 @@ namespace transport s.Sign (keys, buf); m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); + std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); } void NTCPSession::HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA) diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index fae9874e..5335fbdd 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -102,7 +102,7 @@ namespace transport void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); private: - + NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; @@ -133,7 +133,7 @@ namespace transport { public: - typedef i2p::worker::ThreadPool Pool; + typedef i2p::worker::ThreadPool Pool; enum RemoteAddressType { @@ -177,7 +177,6 @@ namespace transport { m_CryptoPool->Offer({conn, work}); } - private: /** @brief return true for hard limit */ @@ -203,7 +202,7 @@ namespace transport void HandleTerminationTimer (const boost::system::error_code& ecode); private: - + bool m_IsRunning; std::thread * m_Thread; boost::asio::io_service m_Service; @@ -219,8 +218,8 @@ namespace transport boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; - std::shared_ptr m_CryptoPool; - + std::shared_ptr m_CryptoPool; + uint16_t m_SoftLimit, m_HardLimit; public: From 098b2e968e91bda0aa84d7143158f48583132777 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 13:03:01 -0500 Subject: [PATCH 5/8] whitespace cleanup --- libi2pd/NTCPSession.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index b7268818..d3f200ee 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -291,6 +291,7 @@ namespace transport buf += paddingSize; len += paddingSize; } + SignedData s; s.Insert (m_Establisher->phase1.pubKey, 256); // x s.Insert (m_Establisher->phase2.pubKey, 256); // y @@ -298,6 +299,7 @@ namespace transport s.Insert (tsA); // tsA s.Insert (m_Establisher->phase2.encrypted.timestamp, 4); // tsB s.Sign (keys, buf); + m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (), std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); From b980ca4a9ed7b03cb0ea395c4870f07550fafae7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 13:03:34 -0500 Subject: [PATCH 6/8] whitespace cleanup --- libi2pd/NTCPSession.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index d3f200ee..be061740 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -252,6 +252,7 @@ namespace transport { m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240); m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16); + m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); // verify uint8_t xy[512]; From 547a0057e6bd7e6404f4c814af2130bc25b4fa55 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 13:04:39 -0500 Subject: [PATCH 7/8] whitespace cleanup --- libi2pd/NTCPSession.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index be061740..e3d6c004 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -201,6 +201,7 @@ namespace transport m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); + m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); From b469080cd78654c6a86926a684f6e63bcdecbfb5 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 20 Feb 2018 13:18:57 -0500 Subject: [PATCH 8/8] make ntcp worker threads configurable in number --- libi2pd/Config.cpp | 1 + libi2pd/Transports.cpp | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/libi2pd/Config.cpp b/libi2pd/Config.cpp index 636e4986..98b94f33 100644 --- a/libi2pd/Config.cpp +++ b/libi2pd/Config.cpp @@ -73,6 +73,7 @@ namespace config { ("limits.transittunnels", value()->default_value(2500), "Maximum active transit sessions (default:2500)") ("limits.ntcpsoft", value()->default_value(0), "Threshold to start probabalistic backoff with ntcp sessions (default: use system limit)") ("limits.ntcphard", value()->default_value(0), "Maximum number of ntcp sessions (default: use system limit)") + ("limits.ntcpthreads", value()->default_value(1), "Maximum number of threads used by NTCP DH worker (default: 1)") ; options_description httpserver("HTTP Server options"); diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 6dcbe56d..9914c75e 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -153,9 +153,10 @@ namespace transport m_Thread = new std::thread (std::bind (&Transports::Run, this)); std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy); i2p::http::URL proxyurl; - uint16_t softLimit, hardLimit; + uint16_t softLimit, hardLimit, threads; i2p::config::GetOption("limits.ntcpsoft", softLimit); i2p::config::GetOption("limits.ntcphard", hardLimit); + i2p::config::GetOption("limits.ntcpthreads", threads); if(softLimit > 0 && hardLimit > 0 && softLimit >= hardLimit) { LogPrint(eLogError, "ntcp soft limit must be less than ntcp hard limit"); @@ -167,7 +168,7 @@ namespace transport { if(proxyurl.schema == "socks" || proxyurl.schema == "http") { - m_NTCPServer = new NTCPServer(); + m_NTCPServer = new NTCPServer(threads); m_NTCPServer->SetSessionLimits(softLimit, hardLimit); NTCPServer::ProxyType proxytype = NTCPServer::eSocksProxy; @@ -198,7 +199,7 @@ namespace transport if (!address) continue; if (m_NTCPServer == nullptr && enableNTCP) { - m_NTCPServer = new NTCPServer (); + m_NTCPServer = new NTCPServer (threads); m_NTCPServer->SetSessionLimits(softLimit, hardLimit); m_NTCPServer->Start (); if (!(m_NTCPServer->IsBoundV6() || m_NTCPServer->IsBoundV4())) {