diff --git a/libi2pd_client/I2PService.cpp b/libi2pd_client/I2PService.cpp index 27e34b69..5604f950 100644 --- a/libi2pd_client/I2PService.cpp +++ b/libi2pd_client/I2PService.cpp @@ -148,162 +148,6 @@ namespace client } } - SocketsPipe::SocketsPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) - { - boost::asio::socket_base::receive_buffer_size option(SOCKETS_PIPE_BUFFER_SIZE); - upstream->set_option(option); - downstream->set_option(option); - } - - SocketsPipe::~SocketsPipe() - { - Terminate(); - } - - void SocketsPipe::Start() - { - AsyncReceiveUpstream(); - AsyncReceiveDownstream(); - } - - void SocketsPipe::Terminate() - { - if(Kill()) return; - if (m_up) - { - if (m_up->is_open()) - m_up->close(); - m_up = nullptr; - } - if (m_down) - { - if (m_down->is_open()) - m_down->close(); - m_down = nullptr; - } - Done(shared_from_this()); - } - - void SocketsPipe::AsyncReceiveUpstream() - { - if (m_up) - { - m_up->async_read_some(boost::asio::buffer(m_upstream_to_down_buf, SOCKETS_PIPE_BUFFER_SIZE), - std::bind(&SocketsPipe::HandleUpstreamReceived, shared_from_this(), - std::placeholders::_1, std::placeholders::_2)); - } - else - LogPrint(eLogError, "SocketsPipe: Upstream receive: No socket"); - } - - void SocketsPipe::AsyncReceiveDownstream() - { - if (m_down) { - m_down->async_read_some(boost::asio::buffer(m_downstream_to_up_buf, SOCKETS_PIPE_BUFFER_SIZE), - std::bind(&SocketsPipe::HandleDownstreamReceived, shared_from_this(), - std::placeholders::_1, std::placeholders::_2)); - } - else - LogPrint(eLogError, "SocketsPipe: Downstream receive: No socket"); - } - - void SocketsPipe::UpstreamWrite(size_t len) - { - if (m_up) - { - LogPrint(eLogDebug, "SocketsPipe: Upstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_up, boost::asio::buffer(m_upstream_buf, len), - boost::asio::transfer_all(), - std::bind(&SocketsPipe::HandleUpstreamWrite, - shared_from_this(), - std::placeholders::_1)); - } - else - LogPrint(eLogError, "SocketsPipe: Upstream write: no socket"); - } - - void SocketsPipe::DownstreamWrite(size_t len) - { - if (m_down) - { - LogPrint(eLogDebug, "TCPIPPipe: Downstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_down, boost::asio::buffer(m_downstream_buf, len), - boost::asio::transfer_all(), - std::bind(&SocketsPipe::HandleDownstreamWrite, - shared_from_this(), - std::placeholders::_1)); - } - else - LogPrint(eLogError, "TCPIPPipe: Downstream write: No socket"); - } - - - void SocketsPipe::HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint(eLogDebug, "TCPIPPipe: Downstream: ", (int) bytes_transfered, " bytes received"); - if (ecode) - { - LogPrint(eLogWarning, "TCPIPPipe: Downstream read error:" , ecode.message()); - Terminate(); - } - else - { - if (bytes_transfered > 0 ) - memcpy(m_upstream_buf, m_downstream_to_up_buf, bytes_transfered); - UpstreamWrite(bytes_transfered); - } - } - } - - void SocketsPipe::HandleDownstreamWrite(const boost::system::error_code & ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - if (ecode) - { - LogPrint(eLogWarning, "TCPIPPipe: Downstream write error:" , ecode.message()); - Terminate(); - } - else - AsyncReceiveUpstream(); - } - } - - void SocketsPipe::HandleUpstreamWrite(const boost::system::error_code & ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - if (ecode) - { - LogPrint(eLogWarning, "SocketsPipe: Upstream write error:" , ecode.message()); - Terminate(); - } - else - AsyncReceiveDownstream(); - } - } - - void SocketsPipe::HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint(eLogDebug, "SocketsPipe: Upstream ", (int)bytes_transfered, " bytes received"); - if (ecode) - { - LogPrint(eLogWarning, "SocketsPipe: Upstream read error:" , ecode.message()); - Terminate(); - } - else - { - if (bytes_transfered > 0 ) - memcpy(m_downstream_buf, m_upstream_to_down_buf, bytes_transfered); - DownstreamWrite(bytes_transfered); - } - } - } - void TCPIPAcceptor::Start () { m_Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService (), m_LocalEndpoint)); diff --git a/libi2pd_client/I2PService.h b/libi2pd_client/I2PService.h index 4be7f2ca..b8973f9d 100644 --- a/libi2pd_client/I2PService.h +++ b/libi2pd_client/I2PService.h @@ -123,38 +123,91 @@ namespace client const size_t SOCKETS_PIPE_BUFFER_SIZE = 8192 * 8; // bidirectional pipe for 2 stream sockets - class SocketsPipe: public I2PServiceHandler, public std::enable_shared_from_this + template + class SocketsPipe: public I2PServiceHandler, + public std::enable_shared_from_this > { public: - SocketsPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream); - ~SocketsPipe(); - void Start() override; - - protected: - - void Terminate(); - void AsyncReceiveUpstream(); - void AsyncReceiveDownstream(); - void HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transferred); - void HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transferred); - void HandleUpstreamWrite(const boost::system::error_code & ecode); - void HandleDownstreamWrite(const boost::system::error_code & ecode); - void UpstreamWrite(size_t len); - void DownstreamWrite(size_t len); + SocketsPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream): + I2PServiceHandler(owner), m_up(upstream), m_down(downstream) + { + boost::asio::socket_base::receive_buffer_size option(SOCKETS_PIPE_BUFFER_SIZE); + upstream->set_option(option); + downstream->set_option(option); + } + ~SocketsPipe() { Terminate(); } + + void Start() override + { + Transfer (m_up, m_down, m_upstream_to_down_buf, SOCKETS_PIPE_BUFFER_SIZE); // receive from upstream + Transfer (m_down, m_up, m_downstream_to_up_buf, SOCKETS_PIPE_BUFFER_SIZE); // receive from upstream + } private: - uint8_t m_upstream_to_down_buf[SOCKETS_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[SOCKETS_PIPE_BUFFER_SIZE]; - uint8_t m_upstream_buf[SOCKETS_PIPE_BUFFER_SIZE], m_downstream_buf[SOCKETS_PIPE_BUFFER_SIZE]; - std::shared_ptr m_up; - std::shared_ptr m_down; + void Terminate() + { + if(Kill()) return; + if (m_up) + { + if (m_up->is_open()) + m_up->close(); + m_up = nullptr; + } + if (m_down) + { + if (m_down->is_open()) + m_down->close(); + m_down = nullptr; + } + Done(SocketsPipe::shared_from_this()); + } + + template + void Transfer (std::shared_ptr from, std::shared_ptr to, uint8_t * buf, size_t len) + { + if (!from || !to || !buf) return; + auto s = SocketsPipe::shared_from_this (); + from->async_read_some(boost::asio::buffer(buf, len), + [from, to, s, buf, len](const boost::system::error_code& ecode, std::size_t transferred) + { + if (ecode == boost::asio::error::operation_aborted) return; + if (!ecode) + { + boost::asio::async_write(*to, boost::asio::buffer(buf, transferred), boost::asio::transfer_all(), + [from, to, s, buf, len](const boost::system::error_code& ecode, std::size_t transferred) + { + (void) transferred; + if (ecode == boost::asio::error::operation_aborted) return; + if (!ecode) + s->Transfer (from, to, buf, len); + else + { + LogPrint(eLogWarning, "SocketsPipe: Write error:" , ecode.message()); + s->Terminate(); + } + }); + } + else + { + LogPrint(eLogWarning, "SocketsPipe: Read error:" , ecode.message()); + s->Terminate(); + } + }); + } + + private: + + uint8_t m_upstream_to_down_buf[SOCKETS_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[SOCKETS_PIPE_BUFFER_SIZE]; + std::shared_ptr m_up; + std::shared_ptr m_down; }; - template - std::shared_ptr CreateSocketsPipe (I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) + template + std::shared_ptr CreateSocketsPipe (I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) { - return std::make_shared(owner, upstream, downstream); + return std::make_shared >(owner, upstream, downstream); } /* TODO: support IPv6 too */