From 595c745c900fcfa16c5731b4ff2f2e50088b9f4f Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2013 11:26:39 -0400 Subject: [PATCH] Transport thread added --- Transports.cpp | 129 +++++++++++++++++++++++++++++++++++++++++++++++++ Transports.h | 54 +++++++++++++++++++++ 2 files changed, 183 insertions(+) create mode 100644 Transports.cpp create mode 100644 Transports.h diff --git a/Transports.cpp b/Transports.cpp new file mode 100644 index 00000000..e5210184 --- /dev/null +++ b/Transports.cpp @@ -0,0 +1,129 @@ +#include +#include "Log.h" +#include "RouterContext.h" +#include "I2NPProtocol.h" +#include "NetDb.h" +#include "Transports.h" + +using namespace i2p::data; + +namespace i2p +{ + Transports transports; + + Transports::Transports (): + m_Thread (0), m_Work (m_Service),m_NTCPAcceptor (0) + { + } + + Transports::~Transports () + { + Stop (); + } + + void Transports::Start () + { + m_Thread = new std::thread (std::bind (&Transports::Run, this)); + // create acceptors + auto addresses = context.GetRouterInfo ().GetAddresses (); + for (auto& address : addresses) + { + if (address.transportStyle == RouterInfo::eTransportNTCP) + { + m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address.port)); + + LogPrint ("Start listening port ", address.port); + auto conn = new i2p::ntcp::NTCPServerConnection (m_Service); + m_NTCPAcceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAccept, this, + conn, boost::asio::placeholders::error)); + } + } + } + + void Transports::Stop () + { + for (auto session: m_NTCPSessions) + delete session.second; + m_NTCPSessions.clear (); + delete m_NTCPAcceptor; + + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } + } + + void Transports::AddNTCPSession (i2p::ntcp::NTCPSession * session) + { + if (session) + m_NTCPSessions[std::string ((const char *)session->GetRemoteRouterInfo ().GetIdentHash (), 32)] = session; + } + + void Transports::RemoveNTCPSession (i2p::ntcp::NTCPSession * session) + { + if (session) + m_NTCPSessions.erase (std::string ((const char *)session->GetRemoteRouterInfo ().GetIdentHash (), 32)); + } + + void Transports::HandleAccept (i2p::ntcp::NTCPServerConnection * conn, const boost::system::error_code& error) + { + if (!error) + { + LogPrint ("Connected from ", conn->GetSocket ().remote_endpoint().address ().to_string ()); + conn->ServerLogin (); + } + else + { + delete conn; + } + + conn = new i2p::ntcp::NTCPServerConnection (m_Service); + m_NTCPAcceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAccept, this, + conn, boost::asio::placeholders::error)); + } + + i2p::ntcp::NTCPSession * Transports::GetNextNTCPSession () + { + for (auto session: m_NTCPSessions) + if (session.second->IsEstablished ()) + return session.second; + return 0; + } + + i2p::ntcp::NTCPSession * Transports::FindNTCPSession (const uint8_t * ident) + { + auto it = m_NTCPSessions.find (std::string ((const char *)ident,32)); + if (it != m_NTCPSessions.end ()) + return it->second; + return 0; + } + + void Transports::SendMessage (const uint8_t * ident, i2p::I2NPMessage * msg) + { + if (!memcmp (ident, i2p::context.GetRouterInfo ().GetIdentHash (), 32)) + // we send it to ourself + i2p::HandleI2NPMessage (msg); + else + { + auto session = FindNTCPSession (ident); + if (!session) + { + RouterInfo * r = netdb.FindRouter (ident); + if (r) + { + auto address = r->GetNTCPAddress (); + if (address) + session = new i2p::ntcp::NTCPClient (m_Service, address->host.c_str (), address->port, *r); + } + } + if (session) + session->SendI2NPMessage (msg); + else + LogPrint ("Session not found"); + } + } +} diff --git a/Transports.h b/Transports.h new file mode 100644 index 00000000..ea5a8057 --- /dev/null +++ b/Transports.h @@ -0,0 +1,54 @@ +#ifndef TRANSPORTS_H__ +#define TRANSPORTS_H__ + +#include +#include +#include +#include +#include +#include "NTCPSession.h" +#include "RouterInfo.h" +#include "I2NPProtocol.h" + +namespace i2p +{ + class Transports + { + public: + + Transports (); + ~Transports (); + + void Start (); + void Stop (); + + boost::asio::io_service& GetService () { return m_Service; }; + + void AddNTCPSession (i2p::ntcp::NTCPSession * session); + void RemoveNTCPSession (i2p::ntcp::NTCPSession * session); + + i2p::ntcp::NTCPSession * GetNextNTCPSession (); + i2p::ntcp::NTCPSession * FindNTCPSession (const uint8_t * ident); + + void SendMessage (const uint8_t * ident, i2p::I2NPMessage * msg); + + private: + + void Run () { m_Service.run (); }; + void HandleAccept (i2p::ntcp::NTCPServerConnection * conn, + const boost::system::error_code& error); + + private: + + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + boost::asio::ip::tcp::acceptor * m_NTCPAcceptor; + + std::map m_NTCPSessions; + }; + + extern Transports transports; +} + +#endif