diff --git a/ClientContext.cpp b/ClientContext.cpp index 31f5e640..0354e708 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -8,6 +8,8 @@ #include "Identity.h" #include "util.h" #include "ClientContext.h" +#include "SOCKS.h" +#include "WebSocks.h" namespace i2p { @@ -424,10 +426,16 @@ namespace client try { std::string type = section.second.get (I2P_TUNNELS_SECTION_TYPE); - if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) + if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT + || type == I2P_TUNNELS_SECTION_TYPE_SOCKS + || type == I2P_TUNNELS_SECTION_TYPE_WEBSOCKS + || type == I2P_TUNNELS_SECTION_TYPE_HTTPPROXY + || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { // mandatory params - std::string dest = section.second.get (I2P_CLIENT_TUNNEL_DESTINATION); + std::string dest; + if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) + dest = section.second.get (I2P_CLIENT_TUNNEL_DESTINATION); int port = section.second.get (I2P_CLIENT_TUNNEL_PORT); // optional params std::string keys = section.second.get (I2P_CLIENT_TUNNEL_KEYS, ""); @@ -466,19 +474,45 @@ namespace client LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); } else { - // tcp client - auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); - if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), - std::unique_ptr(clientTunnel))).second) + boost::asio::ip::tcp::endpoint clientEndpoint; + I2PService * clientTunnel = nullptr; + if (type == I2P_TUNNELS_SECTION_TYPE_SOCKS) + { + // socks proxy + clientTunnel = new i2p::proxy::SOCKSProxy(address, port, "", destinationPort, localDestination); + clientEndpoint = ((i2p::proxy::SOCKSProxy*)clientTunnel)->GetAcceptor().local_endpoint(); + } + else if (type == I2P_TUNNELS_SECTION_TYPE_HTTPPROXY) + { + // http proxy + clientTunnel = new i2p::proxy::HTTPProxy(address, port, localDestination); + clientEndpoint = ((i2p::proxy::HTTPProxy*)clientTunnel)->GetAcceptor().local_endpoint(); + } + else if (type == I2P_TUNNELS_SECTION_TYPE_WEBSOCKS) + { + // websocks proxy + clientTunnel = new WebSocks(address, port, localDestination);; + clientEndpoint = ((WebSocks*)clientTunnel)->GetLocalEndpoint(); + } + else + { + // tcp client + clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); + clientEndpoint = ((I2PClientTunnel*)clientTunnel)->GetAcceptor().local_endpoint(); + } + if (m_ClientTunnels.insert (std::make_pair (clientEndpoint, std::unique_ptr(clientTunnel))).second) { clientTunnel->Start (); numClientTunnels++; } else - LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); + LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientEndpoint, "already exists"); } } - else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) + else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER + || type == I2P_TUNNELS_SECTION_TYPE_HTTP + || type == I2P_TUNNELS_SECTION_TYPE_IRC + || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { // mandatory params std::string host = section.second.get (I2P_SERVER_TUNNEL_HOST); @@ -493,7 +527,8 @@ namespace client i2p::data::SigningKeyType sigType = section.second.get (I2P_SERVER_TUNNEL_SIGNATURE_TYPE, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); uint32_t maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN); std::string address = section.second.get (I2P_SERVER_TUNNEL_ADDRESS, "127.0.0.1"); - + bool isUniqueLocal = section.second.get(I2P_SERVER_TUNNEL_ENABLE_UNIQUE_LOCAL, true); + // I2CP std::map options; ReadI2CPOptions (section, options); @@ -512,6 +547,11 @@ namespace client auto localAddress = boost::asio::ip::address::from_string(address); boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); + if(!isUniqueLocal) + { + LogPrint(eLogInfo, "Clients: disabling loopback address mapping"); + serverTunnel->SetUniqueLocal(isUniqueLocal); + } std::lock_guard lock(m_ForwardsMutex); if(m_ServerForwards.insert( std::make_pair( @@ -538,8 +578,12 @@ namespace client LogPrint(eLogInfo, "Clients: Set Max Conns To ", maxConns); serverTunnel->SetMaxConnsPerMinute(maxConns); - - + if(!isUniqueLocal) + { + LogPrint(eLogInfo, "Clients: disabling loopback address mapping"); + serverTunnel->SetUniqueLocal(isUniqueLocal); + } + if (accessList.length () > 0) { std::set idents; diff --git a/ClientContext.h b/ClientContext.h index e392c8a2..1aa1e7f0 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -26,6 +26,9 @@ namespace client const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; + const char I2P_TUNNELS_SECTION_TYPE_SOCKS[] = "socks"; + const char I2P_TUNNELS_SECTION_TYPE_WEBSOCKS[] = "websocks"; + const char I2P_TUNNELS_SECTION_TYPE_HTTPPROXY[] = "httpproxy"; const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; @@ -42,7 +45,8 @@ namespace client const char I2P_SERVER_TUNNEL_GZIP[] = "gzip"; const char I2P_SERVER_TUNNEL_WEBIRC_PASSWORD[] = "webircpassword"; const char I2P_SERVER_TUNNEL_ADDRESS[] = "address"; - + const char I2P_SERVER_TUNNEL_ENABLE_UNIQUE_LOCAL[] = "enableuniquelocal"; + class ClientContext { public: @@ -92,7 +96,7 @@ namespace client i2p::proxy::HTTPProxy * m_HttpProxy; i2p::proxy::SOCKSProxy * m_SocksProxy; - std::map > m_ClientTunnels; // local endpoint->tunnel + std::map > m_ClientTunnels; // local endpoint->tunnel std::map, std::unique_ptr > m_ServerTunnels; // ->tunnel std::mutex m_ForwardsMutex; diff --git a/Config.cpp b/Config.cpp index 610ab4aa..64333d74 100644 --- a/Config.cpp +++ b/Config.cpp @@ -90,7 +90,8 @@ namespace config { ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") ("httpproxy.latency.min", value()->default_value("0"), "HTTP proxy min latency for tunnels") - ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.outproxy", value()->default_value(""), "HTTP proxy upstream out proxy url") ; options_description socksproxy("SOCKS Proxy options"); @@ -201,7 +202,7 @@ namespace config { ("websockets.enabled", value()->default_value(false), "enable websocket server") ("websockets.address", value()->default_value("127.0.0.1"), "address to bind websocket server on") ("websockets.port", value()->default_value(7666), "port to bind websocket server on"); - + m_OptionsDesc .add(general) .add(limits) diff --git a/Daemon.cpp b/Daemon.cpp index a007995f..c7aaa279 100644 --- a/Daemon.cpp +++ b/Daemon.cpp @@ -307,7 +307,6 @@ namespace i2p d.m_WebsocketServer->Start(); i2p::event::core.SetListener(d.m_WebsocketServer->ToListener()); } - #endif return true; } diff --git a/Datagram.cpp b/Datagram.cpp index db29e83c..705e10ef 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -45,7 +45,8 @@ namespace datagram owner->Sign (buf1, len, signature); auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - ObtainSession(identity)->SendMsg(msg); + auto session = ObtainSession(identity); + session->SendMsg(msg); } @@ -69,7 +70,8 @@ namespace datagram if (verified) { auto h = identity.GetIdentHash(); - ObtainSession(h)->Ack(); + auto session = ObtainSession(h); + session->Ack(); auto r = FindReceiver(toPort); if(r) r(identity, fromPort, toPort, buf + headerLen, len -headerLen); @@ -180,7 +182,8 @@ namespace datagram // we used this session m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); // schedule send - m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); + auto self = shared_from_this(); + m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, self, msg)); } DatagramSession::Info DatagramSession::GetSessionInfo() const @@ -237,11 +240,11 @@ namespace datagram m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel); path->outboundTunnel = m_CurrentOutboundTunnel; } - if(m_CurrentRemoteLease && ! m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { + if(m_CurrentRemoteLease && m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { // bad lease, switch to next one if(m_RemoteLeaseSet) { auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&](const i2p::data::Lease& l) -> bool { - return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway || l.endDate <= m_CurrentRemoteLease->endDate; + return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway; }); auto sz = ls.size(); if (sz) { @@ -332,7 +335,8 @@ namespace datagram { boost::posix_time::milliseconds dlt(100); m_SendQueueTimer.expires_from_now(dlt); - m_SendQueueTimer.async_wait([&](const boost::system::error_code & ec) { if(ec) return; FlushSendQueue(); }); + auto self = shared_from_this(); + m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) { if(ec) return; self->FlushSendQueue(); }); } } } diff --git a/Datagram.h b/Datagram.h index 2eb180d6..a10f2646 100644 --- a/Datagram.h +++ b/Datagram.h @@ -34,7 +34,7 @@ namespace datagram // max 64 messages buffered in send queue for each datagram session const size_t DATAGRAM_SEND_QUEUE_MAX_SIZE = 64; - class DatagramSession + class DatagramSession : public std::enable_shared_from_this { public: DatagramSession(i2p::client::ClientDestination * localDestination, @@ -90,7 +90,9 @@ namespace datagram uint64_t m_LastUse; bool m_RequestingLS; }; - + + typedef std::shared_ptr DatagramSession_ptr; + const size_t MAX_DATAGRAM_SIZE = 32768; class DatagramDestination { @@ -132,7 +134,7 @@ namespace datagram i2p::data::IdentityEx m_Identity; Receiver m_Receiver; // default std::mutex m_SessionsMutex; - std::map > m_Sessions; + std::map m_Sessions; std::mutex m_ReceiversMutex; std::map m_ReceiversByPorts; diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..229d0d53 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,54 @@ +FROM alpine:latest + +MAINTAINER Mikal Villa + +ENV GIT_BRANCH="master" +ENV I2PD_PREFIX="/opt/i2pd-${GIT_BRANCH}" +ENV PATH=${I2PD_PREFIX}/bin:$PATH + +ENV GOSU_VERSION=1.7 +ENV GOSU_SHASUM="34049cfc713e8b74b90d6de49690fa601dc040021980812b2f1f691534be8a50 /usr/local/bin/gosu" + +RUN mkdir /user && adduser -S -h /user i2pd && chown -R i2pd:nobody /user + + +# +# Each RUN is a layer, adding the dependencies and building i2pd in one layer takes around 8-900Mb, so to keep the +# image under 20mb we need to remove all the build dependencies in the same "RUN" / layer. +# + +# 1. install deps, clone and build. +# 2. strip binaries. +# 3. Purge all dependencies and other unrelated packages, including build directory. +RUN apk --no-cache --virtual build-dependendencies add make gcc g++ libtool boost-dev build-base openssl-dev openssl git \ + && mkdir -p /tmp/build \ + && cd /tmp/build && git clone -b ${GIT_BRANCH} https://github.com/PurpleI2P/i2pd.git \ + && cd i2pd \ + && make -j4 \ + && mkdir -p ${I2PD_PREFIX}/bin \ + && mv i2pd ${I2PD_PREFIX}/bin/ \ + && cd ${I2PD_PREFIX}/bin \ + && strip i2pd \ + && rm -fr /tmp/build && apk --purge del build-dependendencies build-base fortify-headers boost-dev zlib-dev openssl-dev \ + boost-python3 python3 gdbm boost-unit_test_framework boost-python linux-headers boost-prg_exec_monitor \ + boost-serialization boost-signals boost-wave boost-wserialization boost-math boost-graph boost-regex git pcre \ + libtool g++ gcc pkgconfig + +# 2. Adding required libraries to run i2pd to ensure it will run. +RUN apk --no-cache add boost-filesystem boost-system boost-program_options boost-date_time boost-thread boost-iostreams openssl musl-utils libstdc++ + +# Gosu is a replacement for su/sudo in docker and not a backdoor :) See https://github.com/tianon/gosu +RUN wget -O /usr/local/bin/gosu https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64 \ + && echo "${GOSU_SHASUM}" | sha256sum -c && chmod +x /usr/local/bin/gosu + +COPY entrypoint.sh /entrypoint.sh + +RUN chmod a+x /entrypoint.sh +RUN echo "export PATH=${PATH}" >> /etc/profile + +VOLUME [ "/var/lib/i2pd" ] + +EXPOSE 7070 4444 4447 7656 2827 7654 7650 + +ENTRYPOINT [ "/entrypoint.sh" ] + diff --git a/Event.cpp b/Event.cpp index e148538e..4bc6d594 100644 --- a/Event.cpp +++ b/Event.cpp @@ -17,15 +17,44 @@ namespace i2p void EventCore::QueueEvent(const EventType & ev) { - if(m_listener) - m_listener->HandleEvent(ev); + if(m_listener) m_listener->HandleEvent(ev); + } + + void EventCore::CollectEvent(const std::string & type, const std::string & ident, uint64_t val) + { + std::unique_lock lock(m_collect_mutex); + std::string key = type + "." + ident; + if (m_collected.find(key) == m_collected.end()) + { + m_collected[key] = {type, key, 0}; + } + m_collected[key].Val += val; + } + + void EventCore::PumpCollected(EventListener * listener) + { + std::unique_lock lock(m_collect_mutex); + if(listener) + { + for(const auto & ev : m_collected) { + listener->HandlePumpEvent({{"type", ev.second.Key}, {"ident", ev.second.Ident}}, ev.second.Val); + } + } + m_collected.clear(); } } } -void EmitEvent(const EventType & e) +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val) { #ifdef WITH_EVENTS + i2p::event::core.CollectEvent(type, ident, val); +#endif +} + +void EmitEvent(const EventType & e) +{ +#if WITH_EVENTS i2p::event::core.QueueEvent(e); #endif } diff --git a/Event.h b/Event.h index 1ab37847..a9f97df2 100644 --- a/Event.h +++ b/Event.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include @@ -16,15 +18,27 @@ namespace i2p public: virtual ~EventListener() {}; virtual void HandleEvent(const EventType & ev) = 0; + /** @brief handle collected event when pumped */ + virtual void HandlePumpEvent(const EventType & ev, const uint64_t & val) = 0; }; class EventCore { public: void QueueEvent(const EventType & ev); + void CollectEvent(const std::string & type, const std::string & ident, uint64_t val); void SetListener(EventListener * l); - + void PumpCollected(EventListener * l); + private: + std::mutex m_collect_mutex; + struct CollectedEvent + { + std::string Key; + std::string Ident; + uint64_t Val; + }; + std::map m_collected; EventListener * m_listener = nullptr; }; #ifdef WITH_EVENTS @@ -32,6 +46,8 @@ namespace i2p #endif } } + +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val); void EmitEvent(const EventType & ev); #endif diff --git a/HTTP.cpp b/HTTP.cpp index 08615e5b..201a4e3d 100644 --- a/HTTP.cpp +++ b/HTTP.cpp @@ -259,16 +259,21 @@ namespace http { return eoh + strlen(HTTP_EOH); } - std::string HTTPReq::to_string() { - std::stringstream ss; - ss << method << " " << uri << " " << version << CRLF; + void HTTPReq::write(std::ostream & o) { + o << method << " " << uri << " " << version << CRLF; for (auto & h : headers) { - ss << h.first << ": " << h.second << CRLF; + o << h.first << ": " << h.second << CRLF; } - ss << CRLF; - return ss.str(); + o << CRLF; } + std::string HTTPReq::to_string() + { + std::stringstream ss; + write(ss); + return ss.str(); + } + bool HTTPRes::is_chunked() { auto it = headers.find("Transfer-Encoding"); if (it == headers.end()) diff --git a/HTTP.h b/HTTP.h index 847cf347..581e4a34 100644 --- a/HTTP.h +++ b/HTTP.h @@ -82,6 +82,9 @@ namespace http { /** @brief Serialize HTTP request to string */ std::string to_string(); + + void write(std::ostream & o); + }; struct HTTPRes : HTTPMsg { @@ -116,6 +119,8 @@ namespace http { */ std::string to_string(); + void write(std::ostream & o); + /** @brief Checks that response declared as chunked data */ bool is_chunked(); diff --git a/HTTPProxy.cpp b/HTTPProxy.cpp index c40989d1..33a2a85c 100644 --- a/HTTPProxy.cpp +++ b/HTTPProxy.cpp @@ -64,15 +64,40 @@ namespace proxy { void HostNotFound(std::string & host); void SendProxyError(std::string & content); + void ForwardToUpstreamProxy(); + void HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec); + void HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec); + + void HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transfered); + void HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transfered); + + typedef std::function ProxyResolvedHandler; + + void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr, ProxyResolvedHandler handler); + + void SocksProxySuccess(); + void HandoverToUpstreamProxy(); + uint8_t m_recv_chunk[8192]; std::string m_recv_buf; // from client std::string m_send_buf; // to upstream std::shared_ptr m_sock; - + std::shared_ptr m_proxysock; + boost::asio::ip::tcp::resolver m_proxy_resolver; + i2p::http::URL m_ProxyURL; + i2p::http::URL m_RequestURL; + uint8_t m_socks_buf[255+8]; // for socks request/response + ssize_t m_req_len; + i2p::http::URL m_ClientRequestURL; + i2p::http::HTTPReq m_ClientRequest; + i2p::http::HTTPRes m_ClientResponse; + std::stringstream m_ClientRequestBuffer; public: HTTPReqHandler(HTTPProxy * parent, std::shared_ptr sock) : - I2PServiceHandler(parent), m_sock(sock) {} + I2PServiceHandler(parent), m_sock(sock), + m_proxysock(std::make_shared(parent->GetService())), + m_proxy_resolver(parent->GetService()) {} ~HTTPReqHandler() { Terminate(); } void Handle () { AsyncSockRead(); } /* overload */ }; @@ -97,6 +122,13 @@ namespace proxy { m_sock->close(); m_sock = nullptr; } + if(m_proxysock) + { + LogPrint(eLogDebug, "HTTPProxy: close proxysock"); + if(m_proxysock->is_open()) + m_proxysock->close(); + m_proxysock = nullptr; + } Done(shared_from_this()); } @@ -142,7 +174,7 @@ namespace proxy { << "\r\n"; res.body = ss.str(); std::string response = res.to_string(); - boost::asio::async_write(*m_sock, boost::asio::buffer(response), + boost::asio::async_write(*m_sock, boost::asio::buffer(response), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::SentHTTPFailed, shared_from_this(), std::placeholders::_1)); } @@ -198,54 +230,51 @@ namespace proxy { */ bool HTTPReqHandler::HandleRequest() { - i2p::http::HTTPReq req; - i2p::http::URL url; std::string b64; - int req_len = 0; - req_len = req.parse(m_recv_buf); + m_req_len = m_ClientRequest.parse(m_recv_buf); - if (req_len == 0) + if (m_req_len == 0) return false; /* need more data */ - if (req_len < 0) { + if (m_req_len < 0) { LogPrint(eLogError, "HTTPProxy: unable to parse request"); GenericProxyError("Invalid request", "Proxy unable to parse your request"); return true; /* parse error */ } /* parsing success, now let's look inside request */ - LogPrint(eLogDebug, "HTTPProxy: requested: ", req.uri); - url.parse(req.uri); + LogPrint(eLogDebug, "HTTPProxy: requested: ", m_ClientRequest.uri); + m_RequestURL.parse(m_ClientRequest.uri); - if (ExtractAddressHelper(url, b64)) { - i2p::client::context.GetAddressBook ().InsertAddress (url.host, b64); - LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", url.host); - std::string full_url = url.to_string(); + if (ExtractAddressHelper(m_RequestURL, b64)) { + i2p::client::context.GetAddressBook ().InsertAddress (m_RequestURL.host, b64); + LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", m_RequestURL.host); + std::string full_url = m_RequestURL.to_string(); std::stringstream ss; - ss << "Host " << url.host << " added to router's addressbook from helper. " + ss << "Host " << m_RequestURL.host << " added to router's addressbook from helper. " << "Click here to proceed."; GenericProxyInfo("Addresshelper found", ss.str().c_str()); return true; /* request processed */ } - SanitizeHTTPRequest(req); + SanitizeHTTPRequest(m_ClientRequest); - std::string dest_host = url.host; - uint16_t dest_port = url.port; + std::string dest_host = m_RequestURL.host; + uint16_t dest_port = m_RequestURL.port; /* always set port, even if missing in request */ if (!dest_port) { - dest_port = (url.schema == "https") ? 443 : 80; + dest_port = (m_RequestURL.schema == "https") ? 443 : 80; } /* detect dest_host, set proper 'Host' header in upstream request */ - auto h = req.headers.find("Host"); + auto h = m_ClientRequest.headers.find("Host"); if (dest_host != "") { /* absolute url, replace 'Host' header */ std::string h = dest_host; if (dest_port != 0 && dest_port != 80) h += ":" + std::to_string(dest_port); - req.add_header("Host", h, true); - } else if (h != req.headers.end()) { + m_ClientRequest.add_header("Host", h, true); + } else if (h != m_ClientRequest.headers.end()) { /* relative url and 'Host' header provided. transparent proxy mode? */ i2p::http::URL u; std::string t = "http://" + h->second; @@ -265,23 +294,31 @@ namespace proxy { HostNotFound(dest_host); return true; /* request processed */ } - /* TODO: outproxy handler here */ } else { - LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": not implemented yet"); - std::string message = "Host" + dest_host + "not inside I2P network, but outproxy support not implemented yet"; - GenericProxyError("Outproxy failure", message.c_str()); + std::string outproxyUrl; i2p::config::GetOption("httpproxy.outproxy", outproxyUrl); + if(outproxyUrl.size()) { + LogPrint (eLogDebug, "HTTPProxy: use outproxy ", outproxyUrl); + if(m_ProxyURL.parse(outproxyUrl)) + ForwardToUpstreamProxy(); + else + GenericProxyError("Outproxy failure", "bad outproxy settings"); + } else { + LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": no outprxy enabled"); + std::string message = "Host" + dest_host + "not inside I2P network, but outproxy is not enabled"; + GenericProxyError("Outproxy failure", message.c_str()); + } return true; } /* make relative url */ - url.schema = ""; - url.host = ""; - req.uri = url.to_string(); + m_RequestURL.schema = ""; + m_RequestURL.host = ""; + m_ClientRequest.uri = m_RequestURL.to_string(); /* drop original request from recv buffer */ - m_recv_buf.erase(0, req_len); + m_recv_buf.erase(0, m_req_len); /* build new buffer from modified request and data from original request */ - m_send_buf = req.to_string(); + m_send_buf = m_ClientRequest.to_string(); m_send_buf.append(m_recv_buf); /* connect to destination */ LogPrint(eLogDebug, "HTTPProxy: connecting to host ", dest_host, ":", dest_port); @@ -290,6 +327,144 @@ namespace proxy { return true; } + void HTTPReqHandler::ForwardToUpstreamProxy() + { + LogPrint(eLogDebug, "HTTPProxy: forward to upstream"); + // build http requset + + m_ClientRequestURL = m_RequestURL; + LogPrint(eLogDebug, "HTTPProxy: ", m_ClientRequestURL.host); + m_ClientRequestURL.schema = ""; + m_ClientRequestURL.host = ""; + m_ClientRequest.uri = m_ClientRequestURL.to_string(); + + m_ClientRequest.write(m_ClientRequestBuffer); + m_ClientRequestBuffer << m_recv_buf.substr(m_req_len); + + // assume http if empty schema + if (m_ProxyURL.schema == "" || m_ProxyURL.schema == "http") { + // handle upstream http proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 80; + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1)); + })); + } else if (m_ProxyURL.schema == "socks") { + // handle upstream socks proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 9050; // default to tor default if not specified + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1)); + })); + } else { + // unknown type, complain + GenericProxyError("unknown outproxy url", m_ProxyURL.to_string().c_str()); + } + } + + void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::iterator it, ProxyResolvedHandler handler) + { + if(ec) GenericProxyError("cannot resolve upstream proxy", ec.message().c_str()); + else handler(*it); + } + + void HTTPReqHandler::HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + if(m_RequestURL.host.size() > 255) { + GenericProxyError("hostname too long", m_RequestURL.host.c_str()); + return; + } + uint16_t port = m_RequestURL.port; + if(!port) port = 80; + LogPrint(eLogDebug, "HTTPProxy: connected to socks upstream"); + + std::string host = m_RequestURL.host; + std::size_t reqsize = 0; + m_socks_buf[0] = '\x04'; + m_socks_buf[1] = 1; + htobe16buf(m_socks_buf+2, port); + m_socks_buf[4] = 0; + m_socks_buf[5] = 0; + m_socks_buf[6] = 0; + m_socks_buf[7] = 1; + // user id + m_socks_buf[8] = 'i'; + m_socks_buf[9] = '2'; + m_socks_buf[10] = 'p'; + m_socks_buf[11] = 'd'; + m_socks_buf[12] = 0; + reqsize += 13; + memcpy(m_socks_buf+ reqsize, host.c_str(), host.size()); + reqsize += host.size(); + m_socks_buf[++reqsize] = 0; + boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_socks_buf, reqsize), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::HandleSocksProxySendHandshake, this, std::placeholders::_1, std::placeholders::_2)); + } else GenericProxyError("cannot connect to upstream socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + LogPrint(eLogDebug, "HTTPProxy: upstream socks handshake sent"); + if(ec) GenericProxyError("Cannot negotiate with socks proxy", ec.message().c_str()); + else m_proxysock->async_read_some(boost::asio::buffer(m_socks_buf, 8), std::bind(&HTTPReqHandler::HandleSocksProxyReply, this, std::placeholders::_1, std::placeholders::_2)); + } + + void HTTPReqHandler::HandoverToUpstreamProxy() + { + LogPrint(eLogDebug, "HTTPProxy: handover to socks proxy"); + auto connection = std::make_shared(GetOwner(), m_proxysock, m_sock); + m_sock = nullptr; + m_proxysock = nullptr; + GetOwner()->AddHandler(connection); + connection->Start(); + Terminate(); + } + + void HTTPReqHandler::SocksProxySuccess() + { + if(m_ClientRequest.method == "CONNECT") { + m_ClientResponse.code = 200; + m_send_buf = m_ClientResponse.to_string(); + boost::asio::async_write(*m_sock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&] (const boost::system::error_code & ec, std::size_t transferred) { + if(ec) GenericProxyError("socks proxy error", ec.message().c_str()); + else HandoverToUpstreamProxy(); + }); + } else { + m_send_buf = m_ClientRequestBuffer.str(); + LogPrint(eLogDebug, "HTTPProxy: send ", m_send_buf.size(), " bytes"); + boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&](const boost::system::error_code & ec, std::size_t transferred) { + if(ec) GenericProxyError("failed to send request to upstream", ec.message().c_str()); + else HandoverToUpstreamProxy(); + }); + } + } + + void HTTPReqHandler::HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + if(!ec) + { + if(m_socks_buf[1] == 90) { + // success + SocksProxySuccess(); + } else { + std::stringstream ss; + ss << "error code: "; + ss << (int) m_socks_buf[1]; + std::string msg = ss.str(); + GenericProxyError("Socks Proxy error", msg.c_str()); + } + } + else GenericProxyError("No Reply From socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + LogPrint(eLogDebug, "HTTPProxy: connected to http upstream"); + GenericProxyError("cannot connect", "http out proxy not implemented"); + } else GenericProxyError("cannot connect to upstream http proxy", ec.message().c_str()); + } + /* will be called after some data received from client */ void HTTPReqHandler::HandleSockRecv(const boost::system::error_code & ecode, std::size_t len) { diff --git a/I2PControl.cpp b/I2PControl.cpp index 1e8546ac..523f10e1 100644 --- a/I2PControl.cpp +++ b/I2PControl.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include #include @@ -14,7 +16,6 @@ #include "Crypto.h" #include "FS.h" #include "Log.h" -#include "HTTP.h" #include "Config.h" #include "NetDb.h" #include "RouterContext.h" @@ -189,66 +190,71 @@ namespace client if (ecode) { LogPrint (eLogError, "I2PControl: read error: ", ecode.message ()); return; - } - /* try to parse received data */ - std::stringstream json; - std::string response; - bool isHTTP = false; - if (memcmp (buf->data (), "POST", 4) == 0) { - long int remains = 0; - isHTTP = true; - i2p::http::HTTPReq req; - std::size_t len = req.parse(buf->data(), bytes_transferred); - if (len <= 0) { - LogPrint(eLogError, "I2PControl: incomplete/malformed POST request"); - return; - } - /* append to json chunk of data from 1st request */ - json.write(buf->data() + len, bytes_transferred - len); - remains = req.content_length(); - /* if request has Content-Length header, fetch rest of data and store to json buffer */ - while (remains > 0) { - len = ((long int) buf->size() < remains) ? buf->size() : remains; - bytes_transferred = boost::asio::read (*socket, boost::asio::buffer (buf->data (), len)); - json.write(buf->data(), bytes_transferred); - remains -= bytes_transferred; - } } else { - json.write(buf->data(), bytes_transferred); - } - //LogPrint(eLogDebug, "I2PControl: json from request: ", json.str()); + try + { + bool isHtml = !memcmp (buf->data (), "POST", 4); + std::stringstream ss; + ss.write (buf->data (), bytes_transferred); + if (isHtml) + { + std::string header; + size_t contentLength = 0; + while (!ss.eof () && header != "\r") + { + std::getline(ss, header); + auto colon = header.find (':'); + if (colon != std::string::npos && header.substr (0, colon) == "Content-Length") + contentLength = std::stoi (header.substr (colon + 1)); + } + if (ss.eof ()) + { + LogPrint (eLogError, "I2PControl: malformed request, HTTP header expected"); + return; // TODO: + } + std::streamoff rem = contentLength + ss.tellg () - bytes_transferred; // more bytes to read + if (rem > 0) + { + bytes_transferred = boost::asio::read (*socket, boost::asio::buffer (buf->data (), rem)); + ss.write (buf->data (), bytes_transferred); + } + } + std::ostringstream response; #if GCC47_BOOST149 - LogPrint (eLogError, "I2PControl: json_read is not supported due bug in boost 1.49 with gcc 4.7"); - BuildErrorResponse(response, 32603, "JSON requests is not supported with this version of boost"); + LogPrint (eLogError, "I2PControl: json_read is not supported due bug in boost 1.49 with gcc 4.7"); + response << "{\"id\":null,\"error\":"; + response << "{\"code\":-32603,\"message\":\"JSON requests is not supported with this version of boost\"},"; + response << "\"jsonrpc\":\"2.0\"}"; #else - /* now try to parse json itself */ - std::string j_str = json.str(); - std::stringstream _json(j_str); - try { - boost::property_tree::ptree pt; - boost::property_tree::read_json (_json, pt); + boost::property_tree::ptree pt; + boost::property_tree::read_json (ss, pt); - std::string id = pt.get("id"); - std::string method = pt.get("method"); - auto it = m_MethodHandlers.find (method); - if (it != m_MethodHandlers.end ()) { - std::ostringstream ss; - ss << "{\"id\":" << id << ",\"result\":{"; - (this->*(it->second))(pt.get_child ("params"), ss); - ss << "},\"jsonrpc\":\"2.0\"}"; - response = ss.str(); - } else { - LogPrint (eLogWarning, "I2PControl: unknown method ", method); - BuildErrorResponse(response, 32601, "Method not found"); - } - } catch (std::exception& ex) { - LogPrint (eLogError, "I2PControl: exception when handle request: ", ex.what ()); - BuildErrorResponse(response, 32603, ex.what()); - } catch (...) { - LogPrint (eLogError, "I2PControl: handle request unknown exception"); - } + std::string id = pt.get("id"); + std::string method = pt.get("method"); + auto it = m_MethodHandlers.find (method); + if (it != m_MethodHandlers.end ()) + { + response << "{\"id\":" << id << ",\"result\":{"; + (this->*(it->second))(pt.get_child ("params"), response); + response << "},\"jsonrpc\":\"2.0\"}"; + } else { + LogPrint (eLogWarning, "I2PControl: unknown method ", method); + response << "{\"id\":null,\"error\":"; + response << "{\"code\":-32601,\"message\":\"Method not found\"},"; + response << "\"jsonrpc\":\"2.0\"}"; + } #endif - SendResponse (socket, buf, response, isHTTP); + SendResponse (socket, buf, response, isHtml); + } + catch (std::exception& ex) + { + LogPrint (eLogError, "I2PControl: exception when handle request: ", ex.what ()); + } + catch (...) + { + LogPrint (eLogError, "I2PControl: handle request unknown exception"); + } + } } void I2PControlService::InsertParam (std::ostringstream& ss, const std::string& name, int value) const @@ -270,28 +276,27 @@ namespace client ss << "\"" << name << "\":" << std::fixed << std::setprecision(2) << value; } - void I2PControlService::BuildErrorResponse (std::string & content, int code, const char *message) { - std::stringstream ss; - ss << "{\"id\":null,\"error\":"; - ss << "{\"code\":" << -code << ",\"message\":\"" << message << "\"},"; - ss << "\"jsonrpc\":\"2.0\"}"; - content = ss.str(); - } - void I2PControlService::SendResponse (std::shared_ptr socket, - std::shared_ptr buf, std::string& content, bool isHTTP) + std::shared_ptr buf, std::ostringstream& response, bool isHtml) { - if (isHTTP) { - i2p::http::HTTPRes res; - res.code = 200; - res.add_header("Content-Type", "application/json"); - res.add_header("Connection", "close"); - res.body = content; - std::string tmp = res.to_string(); - content = tmp; + size_t len = response.str ().length (), offset = 0; + if (isHtml) + { + std::ostringstream header; + header << "HTTP/1.1 200 OK\r\n"; + header << "Connection: close\r\n"; + header << "Content-Length: " << boost::lexical_cast(len) << "\r\n"; + header << "Content-Type: application/json\r\n"; + header << "Date: "; + auto facet = new boost::local_time::local_time_facet ("%a, %d %b %Y %H:%M:%S GMT"); + header.imbue(std::locale (header.getloc(), facet)); + header << boost::posix_time::second_clock::local_time() << "\r\n"; + header << "\r\n"; + offset = header.str ().size (); + memcpy (buf->data (), header.str ().c_str (), offset); } - std::copy(content.begin(), content.end(), buf->begin()); - boost::asio::async_write (*socket, boost::asio::buffer (buf->data (), content.length()), + memcpy (buf->data () + offset, response.str ().c_str (), len); + boost::asio::async_write (*socket, boost::asio::buffer (buf->data (), offset + len), boost::asio::transfer_all (), std::bind(&I2PControlService::HandleResponseSent, this, std::placeholders::_1, std::placeholders::_2, socket, buf)); @@ -318,7 +323,7 @@ namespace client } InsertParam (results, "API", api); results << ","; - std::string token = std::to_string(i2p::util::GetSecondsSinceEpoch ()); + std::string token = boost::lexical_cast(i2p::util::GetSecondsSinceEpoch ()); m_Tokens.insert (token); InsertParam (results, "Token", token); } @@ -344,10 +349,9 @@ namespace client (this->*(it1->second))(it.second.data ()); InsertParam (results, it.first, ""); } - else { + else LogPrint (eLogError, "I2PControl: I2PControl unknown request: ", it.first); - } - } + } } void I2PControlService::PasswordHandler (const std::string& value) @@ -361,20 +365,17 @@ namespace client void I2PControlService::RouterInfoHandler (const boost::property_tree::ptree& params, std::ostringstream& results) { - for (auto it = params.begin (); it != params.end (); ++it) + for (auto it = params.begin (); it != params.end (); it++) { LogPrint (eLogDebug, "I2PControl: RouterInfo request: ", it->first); - if (it != params.begin ()) results << ","; auto it1 = m_RouterInfoHandlers.find (it->first); if (it1 != m_RouterInfoHandlers.end ()) { + if (it != params.begin ()) results << ","; (this->*(it1->second))(results); } else - { - InsertParam(results, it->first, ""); LogPrint (eLogError, "I2PControl: RouterInfo unknown request ", it->first); - } } } @@ -440,20 +441,15 @@ namespace client void I2PControlService::RouterManagerHandler (const boost::property_tree::ptree& params, std::ostringstream& results) { - for (auto it = params.begin (); it != params.end (); ++it) + for (auto it = params.begin (); it != params.end (); it++) { - if (it != params.begin ()) results << ","; + if (it != params.begin ()) results << ","; LogPrint (eLogDebug, "I2PControl: RouterManager request: ", it->first); auto it1 = m_RouterManagerHandlers.find (it->first); - if (it1 != m_RouterManagerHandlers.end ()) - { - (this->*(it1->second))(results); - } - else - { - InsertParam(results, it->first, ""); + if (it1 != m_RouterManagerHandlers.end ()) { + (this->*(it1->second))(results); + } else LogPrint (eLogError, "I2PControl: RouterManager unknown request: ", it->first); - } } } @@ -494,20 +490,15 @@ namespace client // network setting void I2PControlService::NetworkSettingHandler (const boost::property_tree::ptree& params, std::ostringstream& results) { - for (auto it = params.begin (); it != params.end (); ++it) + for (auto it = params.begin (); it != params.end (); it++) { if (it != params.begin ()) results << ","; LogPrint (eLogDebug, "I2PControl: NetworkSetting request: ", it->first); auto it1 = m_NetworkSettingHandlers.find (it->first); - if (it1 != m_NetworkSettingHandlers.end ()) - { - (this->*(it1->second))(it->second.data (), results); - } - else - { - InsertParam(results, it->first, ""); + if (it1 != m_NetworkSettingHandlers.end ()) { + (this->*(it1->second))(it->second.data (), results); + } else LogPrint (eLogError, "I2PControl: NetworkSetting unknown request: ", it->first); - } } } @@ -545,7 +536,7 @@ namespace client X509_gmtime_adj (X509_get_notAfter (x509), I2P_CONTROL_CERTIFICATE_VALIDITY*24*60*60); // expiration X509_set_pubkey (x509, pkey); // public key X509_NAME * name = X509_get_subject_name (x509); - X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *)"RU", -1, -1, 0); // country (Russia by default) + X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *)"A1", -1, -1, 0); // country (Anonymous proxy) X509_NAME_add_entry_by_txt (name, "O", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_ORGANIZATION, -1, -1, 0); // organization X509_NAME_add_entry_by_txt (name, "CN", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_COMMON_NAME, -1, -1, 0); // common name X509_set_issuer_name (x509, name); // set issuer to ourselves diff --git a/I2PControl.h b/I2PControl.h index bd5b9bad..047c2fe2 100644 --- a/I2PControl.h +++ b/I2PControl.h @@ -45,9 +45,8 @@ namespace client void ReadRequest (std::shared_ptr socket); void HandleRequestReceived (const boost::system::error_code& ecode, size_t bytes_transferred, std::shared_ptr socket, std::shared_ptr buf); - void BuildErrorResponse (std::string & content, int code, const char *message); void SendResponse (std::shared_ptr socket, - std::shared_ptr buf, std::string& response, bool isHtml); + std::shared_ptr buf, std::ostringstream& response, bool isHtml); void HandleResponseSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::shared_ptr socket, std::shared_ptr buf); @@ -120,4 +119,3 @@ namespace client } #endif - diff --git a/I2PService.cpp b/I2PService.cpp index f5ebcb0c..efcf61aa 100644 --- a/I2PService.cpp +++ b/I2PService.cpp @@ -53,7 +53,6 @@ namespace client void TCPIPPipe::Terminate() { if(Kill()) return; - Done(shared_from_this()); if (m_up) { if (m_up->is_open()) { m_up->close(); @@ -66,6 +65,7 @@ namespace client } m_down = nullptr; } + Done(shared_from_this()); } void TCPIPPipe::AsyncReceiveUpstream() @@ -90,11 +90,11 @@ namespace client } } - void TCPIPPipe::UpstreamWrite(const uint8_t * buf, size_t len) + void TCPIPPipe::UpstreamWrite(size_t len) { if (m_up) { LogPrint(eLogDebug, "TCPIPPipe: upstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_up, boost::asio::buffer(buf, len), + boost::asio::async_write(*m_up, boost::asio::buffer(m_upstream_buf, len), boost::asio::transfer_all(), std::bind(&TCPIPPipe::HandleUpstreamWrite, shared_from_this(), @@ -105,11 +105,11 @@ namespace client } } - void TCPIPPipe::DownstreamWrite(const uint8_t * buf, size_t len) + void TCPIPPipe::DownstreamWrite(size_t len) { if (m_down) { LogPrint(eLogDebug, "TCPIPPipe: downstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_down, boost::asio::buffer(buf, len), + boost::asio::async_write(*m_down, boost::asio::buffer(m_downstream_buf, len), boost::asio::transfer_all(), std::bind(&TCPIPPipe::HandleDownstreamWrite, shared_from_this(), @@ -131,9 +131,8 @@ namespace client } else { if (bytes_transfered > 0 ) { memcpy(m_upstream_buf, m_downstream_to_up_buf, bytes_transfered); - UpstreamWrite(m_upstream_buf, bytes_transfered); } - AsyncReceiveDownstream(); + UpstreamWrite(bytes_transfered); } } @@ -142,6 +141,8 @@ namespace client LogPrint(eLogError, "TCPIPPipe: downstream write error:" , ecode.message()); if (ecode != boost::asio::error::operation_aborted) Terminate(); + } else { + AsyncReceiveUpstream(); } } @@ -150,6 +151,8 @@ namespace client LogPrint(eLogError, "TCPIPPipe: upstream write error:" , ecode.message()); if (ecode != boost::asio::error::operation_aborted) Terminate(); + } else { + AsyncReceiveDownstream(); } } @@ -162,10 +165,9 @@ namespace client Terminate(); } else { if (bytes_transfered > 0 ) { - memcpy(m_upstream_buf, m_upstream_to_down_buf, bytes_transfered); - DownstreamWrite(m_upstream_buf, bytes_transfered); + memcpy(m_downstream_buf, m_upstream_to_down_buf, bytes_transfered); } - AsyncReceiveUpstream(); + DownstreamWrite(bytes_transfered); } } diff --git a/I2PService.h b/I2PService.h index 59746a6f..096443db 100644 --- a/I2PService.h +++ b/I2PService.h @@ -77,7 +77,7 @@ namespace client std::atomic m_Dead; //To avoid cleaning up multiple times }; - const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192; + const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192 * 8; // bidirectional pipe for 2 tcp/ip sockets class TCPIPPipe: public I2PServiceHandler, public std::enable_shared_from_this { @@ -93,8 +93,8 @@ namespace client void HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transferred); void HandleUpstreamWrite(const boost::system::error_code & ecode); void HandleDownstreamWrite(const boost::system::error_code & ecode); - void UpstreamWrite(const uint8_t * buf, size_t len); - void DownstreamWrite(const uint8_t * buf, size_t len); + void UpstreamWrite(size_t len); + void DownstreamWrite(size_t len); private: uint8_t m_upstream_to_down_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[TCP_IP_PIPE_BUFFER_SIZE]; uint8_t m_upstream_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_buf[TCP_IP_PIPE_BUFFER_SIZE]; @@ -121,10 +121,11 @@ namespace client void Stop (); const boost::asio::ip::tcp::acceptor& GetAcceptor () const { return m_Acceptor; }; - + + virtual const char* GetName() { return "Generic TCP/IP accepting daemon"; } + protected: virtual std::shared_ptr CreateHandler(std::shared_ptr socket) = 0; - virtual const char* GetName() { return "Generic TCP/IP accepting daemon"; } private: void Accept(); void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 1a01a3c4..843d129f 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -58,25 +58,39 @@ namespace client StreamReceive (); Receive (); } - - void I2PTunnelConnection::Connect () + + static boost::asio::ip::address GetLoopbackAddressFor(const i2p::data::IdentHash & addr) + { + boost::asio::ip::address_v4::bytes_type bytes; + const uint8_t * ident = addr; + bytes[0] = 127; + memcpy (bytes.data ()+1, ident, 3); + boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes); + return ourIP; + } + + static void MapToLoopback(const std::shared_ptr & sock, const i2p::data::IdentHash & addr) + { + + // bind to 127.x.x.x address + // where x.x.x are first three bytes from ident + auto ourIP = GetLoopbackAddressFor(addr); + sock->bind (boost::asio::ip::tcp::endpoint (ourIP, 0)); + + } + + void I2PTunnelConnection::Connect (bool isUniqueLocal) { I2PTunnelSetSocketOptions(m_Socket); - if (m_Socket) { -#ifdef __linux__ - // bind to 127.x.x.x address - // where x.x.x are first three bytes from ident - - if (m_RemoteEndpoint.address ().is_v4 () && + if (m_Socket) + { +#ifdef __linux__ + if (isUniqueLocal && m_RemoteEndpoint.address ().is_v4 () && m_RemoteEndpoint.address ().to_v4 ().to_bytes ()[0] == 127) { m_Socket->open (boost::asio::ip::tcp::v4 ()); - boost::asio::ip::address_v4::bytes_type bytes; - const uint8_t * ident = m_Stream->GetRemoteIdentity ()->GetIdentHash (); - bytes[0] = 127; - memcpy (bytes.data ()+1, ident, 3); - boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes); - m_Socket->bind (boost::asio::ip::tcp::endpoint (ourIP, 0)); + auto ident = m_Stream->GetRemoteIdentity()->GetIdentHash(); + MapToLoopback(m_Socket, ident); } #endif m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, @@ -213,7 +227,9 @@ namespace client // send destination first like received from I2P std::string dest = m_Stream->GetRemoteIdentity ()->ToBase64 (); dest += "\n"; - memcpy (m_StreamBuffer, dest.c_str (), dest.size ()); + if(sizeof(m_StreamBuffer) >= dest.size()) { + memcpy (m_StreamBuffer, dest.c_str (), dest.size ()); + } HandleStreamReceive (boost::system::error_code (), dest.size ()); } Receive (); @@ -416,7 +432,7 @@ namespace client I2PServerTunnel::I2PServerTunnel (const std::string& name, const std::string& address, int port, std::shared_ptr localDestination, int inport, bool gzip): - I2PService (localDestination), m_Name (name), m_Address (address), m_Port (port), m_IsAccessList (false) + I2PService (localDestination), m_IsUniqueLocal(true), m_Name (name), m_Address (address), m_Port (port), m_IsAccessList (false) { m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip); } @@ -493,15 +509,17 @@ namespace client return; } } - CreateI2PConnection (stream); + // new connection + auto conn = CreateI2PConnection (stream); + AddHandler (conn); + conn->Connect (m_IsUniqueLocal); } } - void I2PServerTunnel::CreateI2PConnection (std::shared_ptr stream) + std::shared_ptr I2PServerTunnel::CreateI2PConnection (std::shared_ptr stream) { - auto conn = std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint ()); - AddHandler (conn); - conn->Connect (); + return std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint ()); + } I2PServerTunnelHTTP::I2PServerTunnelHTTP (const std::string& name, const std::string& address, @@ -512,12 +530,10 @@ namespace client { } - void I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr stream) + std::shared_ptr I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr stream) { - auto conn = std::make_shared (this, stream, + return std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint (), m_Host); - AddHandler (conn); - conn->Connect (); } I2PServerTunnelIRC::I2PServerTunnelIRC (const std::string& name, const std::string& address, @@ -528,11 +544,9 @@ namespace client { } - void I2PServerTunnelIRC::CreateI2PConnection (std::shared_ptr stream) + std::shared_ptr I2PServerTunnelIRC::CreateI2PConnection (std::shared_ptr stream) { - auto conn = std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint (), this->m_WebircPass); - AddHandler (conn); - conn->Connect (); + return std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint (), this->m_WebircPass); } void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) @@ -541,7 +555,6 @@ namespace client auto session = ObtainUDPSession(from, toPort, fromPort); session->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); session->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - } void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { @@ -555,11 +568,24 @@ namespace client ++itr; } } - - UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + + void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::vector removePorts; + for (const auto & s : m_Sessions) { + if (now - s.second.second >= delta) + removePorts.push_back(s.first); + } + for(auto port : removePorts) { + m_Sessions.erase(port); + } + } + + UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) { auto ih = from.GetIdentHash(); - for ( UDPSession * s : m_Sessions ) + for (auto & s : m_Sessions ) { if ( s->Identity == ih) { @@ -568,10 +594,19 @@ namespace client return s; } } - /** create new udp session */ - boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0); - m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); - return m_Sessions.back(); + boost::asio::ip::address addr; + /** create new udp session */ + if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) + { + auto ident = from.GetIdentHash(); + addr = GetLoopbackAddressFor(ident); + } + else + addr = m_LocalAddress; + boost::asio::ip::udp::endpoint ep(addr, 0); + m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); + auto & back = m_Sessions.back(); + return back; } UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, @@ -579,7 +614,6 @@ namespace client boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to, uint16_t ourPort, uint16_t theirPort) : m_Destination(localDestination->GetDatagramDestination()), - m_Service(localDestination->GetService()), IPSocket(localDestination->GetService(), localEndpoint), SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()), @@ -603,7 +637,7 @@ namespace client { LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - m_Destination->SendDatagramTo(m_Buffer, len, Identity, 0, 0); + m_Destination->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort); Receive(); } else { LogPrint(eLogError, "UDPSession: ", ecode.message()); @@ -613,9 +647,9 @@ namespace client I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + m_IsUniqueLocal(true), m_Name(name), - LocalPort(port), m_LocalAddress(localAddress), m_RemoteEndpoint(forwardTo) { @@ -641,7 +675,8 @@ namespace client { std::vector > sessions; std::lock_guard lock(m_SessionsMutex); - for ( UDPSession * s : m_Sessions ) + + for ( UDPSessionPtr s : m_Sessions ) { if (!s->m_Destination) continue; auto info = s->m_Destination->GetInfoForRemote(s->Identity); @@ -663,13 +698,12 @@ namespace client std::shared_ptr localDestination, uint16_t remotePort) : m_Name(name), - m_Session(nullptr), m_RemoteDest(remoteDest), m_LocalDest(localDestination), m_LocalEndpoint(localEndpoint), m_RemoteIdent(nullptr), m_ResolveThread(nullptr), - LocalPort(localEndpoint.port()), + m_LocalSocket(localDestination->GetService(), localEndpoint), RemotePort(remotePort), m_cancel_resolve(false) { @@ -677,7 +711,7 @@ namespace client dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5)); } @@ -686,38 +720,52 @@ namespace client m_LocalDest->Start(); if (m_ResolveThread == nullptr) m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + RecvFromLocal(); } + void I2PUDPClientTunnel::RecvFromLocal() + { + m_LocalSocket.async_receive_from(boost::asio::buffer(m_RecvBuff, I2P_UDP_MAX_MTU), + m_RecvEndpoint, std::bind(&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); + } + + void I2PUDPClientTunnel::HandleRecvFromLocal(const boost::system::error_code & ec, std::size_t transferred) + { + if(ec) { + LogPrint(eLogError, "UDP Client: ", ec.message()); + return; + } + if(!m_RemoteIdent) { + LogPrint(eLogWarning, "UDP Client: remote endpoint not resolved yet"); + RecvFromLocal(); + return; // drop, remote not resolved + } + auto remotePort = m_RecvEndpoint.port(); + auto itr = m_Sessions.find(remotePort); + if (itr == m_Sessions.end()) { + // track new udp convo + m_Sessions[remotePort] = {boost::asio::ip::udp::endpoint(m_RecvEndpoint), 0}; + } + // send off to remote i2p destination + LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort); + m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort); + // mark convo as active + m_Sessions[remotePort].second = i2p::util::GetMillisecondsSinceEpoch(); + RecvFromLocal(); + } + std::vector > I2PUDPClientTunnel::GetSessions() { + // TODO: implement std::vector > infos; - if(m_Session && m_LocalDest) - { - auto s = m_Session; - if (s->m_Destination) - { - auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity); - if(info) - { - auto sinfo = std::make_shared(); - sinfo->Name = m_Name; - sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); - sinfo->RemoteIdent = std::make_shared(s->Identity.data()); - sinfo->CurrentIBGW = info->IBGW; - sinfo->CurrentOBEP = info->OBEP; - infos.push_back(sinfo); - } - } - } return infos; } void I2PUDPClientTunnel::TryResolving() { LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); - m_RemoteIdent = new i2p::data::IdentHash; - m_RemoteIdent->Fill(0); + i2p::data::IdentHash * h = new i2p::data::IdentHash; - while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve) + while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *h) && !m_cancel_resolve) { LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest); std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -727,27 +775,28 @@ namespace client LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled"); return; } + m_RemoteIdent = h; LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32()); - // delete existing session - if(m_Session) delete m_Session; - - boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0); - m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, m_RemoteIdent, LocalPort, RemotePort); } void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) { - // address match - if(m_Session) + auto itr = m_Sessions.find(toPort); + // found convo ? + if(itr != m_Sessions.end()) { - // tell session - LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); - m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint); + // found convo + if (len > 0) { + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); + m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second.first); + // mark convo as active + itr->second.second = i2p::util::GetMillisecondsSinceEpoch(); + } } else - LogPrint(eLogWarning, "UDP Client: no session"); + LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } else LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); @@ -758,7 +807,11 @@ namespace client auto dgram = m_LocalDest->GetDatagramDestination(); if (dgram) dgram->ResetReceiver(); - if (m_Session) delete m_Session; + m_Sessions.clear(); + + if(m_LocalSocket.is_open()) + m_LocalSocket.close(); + m_cancel_resolve = true; if(m_ResolveThread) diff --git a/I2PTunnel.h b/I2PTunnel.h index e6f0e84f..4b9b2c9b 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +18,7 @@ namespace i2p { namespace client { - const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 8192; + const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 65536; const int I2P_TUNNEL_CONNECTION_MAX_IDLE = 3600; // in seconds const int I2P_TUNNEL_DESTINATION_REQUEST_TIMEOUT = 10; // in seconds // for HTTP tunnels @@ -37,7 +38,7 @@ namespace client const boost::asio::ip::tcp::endpoint& target, bool quiet = true); // from I2P ~I2PTunnelConnection (); void I2PConnect (const uint8_t * msg = nullptr, size_t len = 0); - void Connect (); + void Connect (bool isUniqueLocal = true); protected: @@ -141,7 +142,6 @@ namespace client struct UDPSession { i2p::datagram::DatagramDestination * m_Destination; - boost::asio::io_service & m_Service; boost::asio::ip::udp::socket IPSocket; i2p::data::IdentHash Identity; boost::asio::ip::udp::endpoint FromEndpoint; @@ -182,6 +182,8 @@ namespace client /** how long has this converstation been idle in ms */ uint64_t idle; }; + + typedef std::shared_ptr UDPSessionPtr; /** server side udp tunnel, many i2p inbound to 1 ip outbound */ class I2PUDPServerTunnel @@ -189,7 +191,7 @@ namespace client public: I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address & localAddress, + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port); ~I2PUDPServerTunnel(); /** expire stale udp conversations */ @@ -199,18 +201,20 @@ namespace client std::vector > GetSessions(); std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + void SetUniqueLocal(bool isUniqueLocal = true) { m_IsUniqueLocal = isUniqueLocal; } + private: void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + UDPSessionPtr ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); private: + bool m_IsUniqueLocal; const std::string m_Name; - const uint16_t LocalPort; boost::asio::ip::address m_LocalAddress; boost::asio::ip::udp::endpoint m_RemoteEndpoint; std::mutex m_SessionsMutex; - std::vector m_Sessions; + std::vector m_Sessions; std::shared_ptr m_LocalDest; }; @@ -228,18 +232,25 @@ namespace client bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); private: + typedef std::pair UDPConvo; + void RecvFromLocal(); + void HandleRecvFromLocal(const boost::system::error_code & e, std::size_t transferred); void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving(); const std::string m_Name; - UDPSession * m_Session; + std::mutex m_SessionsMutex; + std::map m_Sessions; // maps i2p port -> local udp convo const std::string m_RemoteDest; std::shared_ptr m_LocalDest; const boost::asio::ip::udp::endpoint m_LocalEndpoint; i2p::data::IdentHash * m_RemoteIdent; std::thread * m_ResolveThread; - uint16_t LocalPort; + boost::asio::ip::udp::socket m_LocalSocket; + boost::asio::ip::udp::endpoint m_RecvEndpoint; + uint8_t m_RecvBuff[I2P_UDP_MAX_MTU]; uint16_t RemotePort; bool m_cancel_resolve; }; @@ -256,6 +267,9 @@ namespace client void SetAccessList (const std::set& accessList); + void SetUniqueLocal (bool isUniqueLocal) { m_IsUniqueLocal = isUniqueLocal; } + bool IsUniqueLocal () const { return m_IsUniqueLocal; } + const std::string& GetAddress() const { return m_Address; } int GetPort () const { return m_Port; }; uint16_t GetLocalPort () const { return m_PortDestination->GetLocalPort (); }; @@ -272,10 +286,11 @@ namespace client void Accept (); void HandleAccept (std::shared_ptr stream); - virtual void CreateI2PConnection (std::shared_ptr stream); + virtual std::shared_ptr CreateI2PConnection (std::shared_ptr stream); private: - + + bool m_IsUniqueLocal; std::string m_Name, m_Address; int m_Port; boost::asio::ip::tcp::endpoint m_Endpoint; @@ -294,7 +309,7 @@ namespace client private: - void CreateI2PConnection (std::shared_ptr stream); + std::shared_ptr CreateI2PConnection (std::shared_ptr stream); private: @@ -311,7 +326,7 @@ namespace client private: - void CreateI2PConnection (std::shared_ptr stream); + std::shared_ptr CreateI2PConnection (std::shared_ptr stream); private: diff --git a/Identity.cpp b/Identity.cpp index 19c1a240..4e9bee63 100644 --- a/Identity.cpp +++ b/Identity.cpp @@ -509,7 +509,7 @@ namespace data m_Signer.reset (new i2p::crypto::RSASHA5124096Signer (m_SigningPrivateKey)); break; case SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519: - m_Signer.reset (new i2p::crypto::EDDSA25519Signer (m_SigningPrivateKey)); + m_Signer.reset (new i2p::crypto::EDDSA25519Signer (m_SigningPrivateKey, m_Public->GetStandardIdentity ().certificate - i2p::crypto::EDDSA25519_PUBLIC_KEY_LENGTH)); break; default: LogPrint (eLogError, "Identity: Signing key type ", (int)m_Public->GetSigningKeyType (), " is not supported"); diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 65da5def..8f31e246 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -257,7 +257,7 @@ namespace transport void NTCPSession::SendPhase3 () { - auto keys = i2p::context.GetPrivateKeys (); + auto& keys = i2p::context.GetPrivateKeys (); uint8_t * buf = m_ReceiveBuffer; htobe16buf (buf, keys.GetPublic ()->GetFullLen ()); buf += 2; @@ -403,7 +403,7 @@ namespace transport s.Insert (m_RemoteIdentity->GetIdentHash (), 32); // ident s.Insert (tsA); // tsA s.Insert (tsB); // tsB - auto keys = i2p::context.GetPrivateKeys (); + auto& keys = i2p::context.GetPrivateKeys (); auto signatureLen = keys.GetPublic ()->GetSignatureLen (); s.Sign (keys, m_ReceiveBuffer); size_t paddingSize = signatureLen & 0x0F; // %16 @@ -621,7 +621,7 @@ namespace transport if (!m_NextMessage->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (m_NextMessage); } diff --git a/Queue.h b/Queue.h index 39d139b9..9e95b2a5 100644 --- a/Queue.h +++ b/Queue.h @@ -25,7 +25,8 @@ namespace util m_NonEmpty.notify_one (); } - void Put (const std::vector& vec) + templateclass Container, typename... R> + void Put (const Container& vec) { if (!vec.empty ()) { diff --git a/SAM.cpp b/SAM.cpp index c3b4faf1..929b144d 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -108,10 +108,10 @@ namespace client separator++; std::map params; ExtractParams (separator, params); - auto it = params.find (SAM_PARAM_MAX); + //auto it = params.find (SAM_PARAM_MAX); // TODO: check MIN as well - if (it != params.end ()) - version = it->second; + //if (it != params.end ()) + // version = it->second; } if (version[0] == '3') // we support v3 (3.0 and 3.1) only { @@ -464,21 +464,21 @@ namespace client std::string& name = params[SAM_PARAM_NAME]; std::shared_ptr identity; i2p::data::IdentHash ident; + auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination; if (name == "ME") - SendNamingLookupReply (m_Session->localDestination->GetIdentity ()); + SendNamingLookupReply (dest->GetIdentity ()); else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr) SendNamingLookupReply (identity); - else if (m_Session && m_Session->localDestination && - context.GetAddressBook ().GetIdentHash (name, ident)) + else if (context.GetAddressBook ().GetIdentHash (name, ident)) { - auto leaseSet = m_Session->localDestination->FindLeaseSet (ident); + auto leaseSet = dest->FindLeaseSet (ident); if (leaseSet) SendNamingLookupReply (leaseSet->GetIdentity ()); else - m_Session->localDestination->RequestDestination (ident, + dest->RequestDestination (ident, std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete, - shared_from_this (), std::placeholders::_1, ident)); - } + shared_from_this (), std::placeholders::_1, ident)); + } else { LogPrint (eLogError, "SAM: naming failed, unknown address ", name); diff --git a/SSUData.cpp b/SSUData.cpp index 1782da26..c700d8e6 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -154,8 +154,7 @@ namespace transport { uint32_t msgID = bufbe32toh (buf); // message ID buf += 4; - uint8_t frag[4]; - frag[0] = 0; + uint8_t frag[4] = {0}; memcpy (frag + 1, buf, 3); buf += 3; uint32_t fragmentInfo = bufbe32toh (frag); // fragment info @@ -240,7 +239,7 @@ namespace transport if (!msg->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", m_Session.GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (msg); } @@ -371,7 +370,7 @@ namespace transport void SSUData::SendMsgAck (uint32_t msgID) { - uint8_t buf[48 + 18]; // actual length is 44 = 37 + 7 but pad it to multiple of 16 + uint8_t buf[48 + 18] = {0}; // actual length is 44 = 37 + 7 but pad it to multiple of 16 uint8_t * payload = buf + sizeof (SSUHeader); *payload = DATA_FLAG_EXPLICIT_ACKS_INCLUDED; // flag payload++; @@ -393,7 +392,7 @@ namespace transport LogPrint (eLogWarning, "SSU: Fragment number ", fragmentNum, " exceeds 64"); return; } - uint8_t buf[64 + 18]; + uint8_t buf[64 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); *payload = DATA_FLAG_ACK_BITFIELDS_INCLUDED; // flag payload++; diff --git a/SSUSession.cpp b/SSUSession.cpp index 540cf907..844ed2e4 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -350,7 +350,7 @@ namespace transport void SSUSession::SendSessionRequest () { - uint8_t buf[320 + 18]; // 304 bytes for ipv4, 320 for ipv6 + uint8_t buf[320 + 18] = {0}; // 304 bytes for ipv4, 320 for ipv6 uint8_t * payload = buf + sizeof (SSUHeader); uint8_t flag = 0; // fill extended options, 3 bytes extended options don't change message size @@ -392,7 +392,7 @@ namespace transport return; } - uint8_t buf[96 + 18]; + uint8_t buf[96 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); htobe32buf (payload, introducer.iTag); payload += 4; @@ -427,7 +427,7 @@ namespace transport SignedData s; // x,y, remote IP, remote port, our IP, our port, relayTag, signed on time s.Insert (x, 256); // x - uint8_t buf[384 + 18]; + uint8_t buf[384 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); memcpy (payload, m_DHKeysPair->GetPublicKey (), 256); s.Insert (payload, 256); // y @@ -473,14 +473,18 @@ namespace transport m_SignedData = std::unique_ptr(new SignedData (s)); s.Insert (payload - 4, 4); // BOB's signed on time s.Sign (i2p::context.GetPrivateKeys (), payload); // DSA signature - // TODO: fill padding with random data uint8_t iv[16]; RAND_bytes (iv, 16); // random iv // encrypt signature and padding with newly created session key size_t signatureLen = i2p::context.GetIdentity ()->GetSignatureLen (); size_t paddingSize = signatureLen & 0x0F; // %16 - if (paddingSize > 0) signatureLen += (16 - paddingSize); + if (paddingSize > 0) + { + // fill random padding + RAND_bytes(payload + signatureLen, (16 - paddingSize)); + signatureLen += (16 - paddingSize); + } m_SessionKeyEncryption.SetIV (iv); m_SessionKeyEncryption.Encrypt (payload, signatureLen, payload); payload += signatureLen; @@ -493,7 +497,7 @@ namespace transport void SSUSession::SendSessionConfirmed (const uint8_t * y, const uint8_t * ourAddress, size_t ourAddressLen) { - uint8_t buf[512 + 18]; + uint8_t buf[512 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); *payload = 1; // 1 fragment payload++; // info @@ -508,9 +512,8 @@ namespace transport auto signatureLen = i2p::context.GetIdentity ()->GetSignatureLen (); size_t paddingSize = ((payload - buf) + signatureLen)%16; if (paddingSize > 0) paddingSize = 16 - paddingSize; - // TODO: fill padding + RAND_bytes(payload, paddingSize); // fill padding with random payload += paddingSize; // padding size - // signature SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, our signed on time s.Insert (m_DHKeysPair->GetPublicKey (), 256); // x @@ -559,14 +562,14 @@ namespace transport void SSUSession::SendRelayResponse (uint32_t nonce, const boost::asio::ip::udp::endpoint& from, const uint8_t * introKey, const boost::asio::ip::udp::endpoint& to) { - uint8_t buf[80 + 18]; // 64 Alice's ipv4 and 80 Alice's ipv6 - uint8_t * payload = buf + sizeof (SSUHeader); // Charlie's address always v4 if (!to.address ().is_v4 ()) { LogPrint (eLogWarning, "SSU: Charlie's IP must be v4"); return; } + uint8_t buf[80 + 18] = {0}; // 64 Alice's ipv4 and 80 Alice's ipv6 + uint8_t * payload = buf + sizeof (SSUHeader); *payload = 4; payload++; // size htobe32buf (payload, to.address ().to_v4 ().to_ulong ()); // Charlie's IP @@ -619,7 +622,7 @@ namespace transport LogPrint (eLogWarning, "SSU: Alice's IP must be v4"); return; } - uint8_t buf[48 + 18]; + uint8_t buf[48 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); *payload = 4; payload++; // size @@ -1040,7 +1043,7 @@ namespace transport // toAddress is true for Alice<->Chalie communications only // sendAddress is false if message comes from Alice { - uint8_t buf[80 + 18]; + uint8_t buf[80 + 18] = {0}; uint8_t iv[16]; uint8_t * payload = buf + sizeof (SSUHeader); htobe32buf (payload, nonce); @@ -1096,7 +1099,7 @@ namespace transport // encrypt message with session key FillHeaderAndEncrypt (PAYLOAD_TYPE_PEER_TEST, buf, 80); Send (buf, 80); - } + } } void SSUSession::SendPeerTest () @@ -1121,7 +1124,7 @@ namespace transport { if (m_State == eSessionStateEstablished) { - uint8_t buf[48 + 18]; + uint8_t buf[48 + 18] = {0}; uint8_t * payload = buf + sizeof (SSUHeader); *payload = 0; // flags payload++; @@ -1138,7 +1141,7 @@ namespace transport { if (m_IsSessionKey) { - uint8_t buf[48 + 18]; + uint8_t buf[48 + 18] = {0}; // encrypt message with session key FillHeaderAndEncrypt (PAYLOAD_TYPE_SESSION_DESTROYED, buf, 48); try @@ -1155,7 +1158,7 @@ namespace transport void SSUSession::Send (uint8_t type, const uint8_t * payload, size_t len) { - uint8_t buf[SSU_MTU_V4 + 18]; + uint8_t buf[SSU_MTU_V4 + 18] = {0}; size_t msgSize = len + sizeof (SSUHeader); size_t paddingSize = msgSize & 0x0F; // %16 if (paddingSize > 0) msgSize += (16 - paddingSize); diff --git a/Signature.cpp b/Signature.cpp index 11fc8600..c38b1333 100644 --- a/Signature.cpp +++ b/Signature.cpp @@ -467,17 +467,27 @@ namespace crypto return GetEd25519 ()->Verify (m_PublicKey, digest, signature); } - EDDSA25519Signer::EDDSA25519Signer (const uint8_t * signingPrivateKey) + EDDSA25519Signer::EDDSA25519Signer (const uint8_t * signingPrivateKey, const uint8_t * signingPublicKey) { // expand key SHA512 (signingPrivateKey, EDDSA25519_PRIVATE_KEY_LENGTH, m_ExpandedPrivateKey); m_ExpandedPrivateKey[0] &= 0xF8; // drop last 3 bits - m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0x1F; // drop first 3 bits + m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0x3F; // drop first 2 bits m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] |= 0x40; // set second bit + // generate and encode public key BN_CTX * ctx = BN_CTX_new (); auto publicKey = GetEd25519 ()->GeneratePublicKey (m_ExpandedPrivateKey, ctx); GetEd25519 ()->EncodePublicKey (publicKey, m_PublicKeyEncoded, ctx); + + if (signingPublicKey && memcmp (m_PublicKeyEncoded, signingPublicKey, EDDSA25519_PUBLIC_KEY_LENGTH)) + { + // keys don't match, it means older key with 0x1F + LogPrint (eLogWarning, "Older EdDSA key detected"); + m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0xDF; // drop third bit + publicKey = GetEd25519 ()->GeneratePublicKey (m_ExpandedPrivateKey, ctx); + GetEd25519 ()->EncodePublicKey (publicKey, m_PublicKeyEncoded, ctx); + } BN_CTX_free (ctx); } diff --git a/Signature.h b/Signature.h index a4f37980..c2618f91 100644 --- a/Signature.h +++ b/Signature.h @@ -424,7 +424,8 @@ namespace crypto { public: - EDDSA25519Signer (const uint8_t * signingPrivateKey); + EDDSA25519Signer (const uint8_t * signingPrivateKey, const uint8_t * signingPublicKey = nullptr); + // we pass signingPublicKey to check if it matches private key void Sign (const uint8_t * buf, int len, uint8_t * signature) const; const uint8_t * GetPublicKey () const { return m_PublicKeyEncoded; }; diff --git a/Streaming.cpp b/Streaming.cpp index ff14aadb..343b9b38 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -40,15 +40,15 @@ namespace stream { auto packet = m_ReceiveQueue.front (); m_ReceiveQueue.pop (); - delete packet; + m_LocalDestination.DeletePacket (packet); } for (auto it: m_SentPackets) - delete it; + m_LocalDestination.DeletePacket (it); m_SentPackets.clear (); for (auto it: m_SavedPackets) - delete it; + m_LocalDestination.DeletePacket (it); m_SavedPackets.clear (); LogPrint (eLogDebug, "Streaming: Stream deleted"); @@ -83,7 +83,7 @@ namespace stream { // plain ack LogPrint (eLogDebug, "Streaming: Plain ACK received"); - delete packet; + m_LocalDestination.DeletePacket (packet); return; } @@ -131,7 +131,7 @@ namespace stream // we have received duplicate LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again - delete packet; // packet dropped + m_LocalDestination.DeletePacket (packet); // packet dropped } else { @@ -163,7 +163,7 @@ namespace stream void Stream::SavePacket (Packet * packet) { if (!m_SavedPackets.insert (packet).second) - delete packet; + m_LocalDestination.DeletePacket (packet); } void Stream::ProcessPacket (Packet * packet) @@ -216,7 +216,7 @@ namespace stream m_ReceiveTimer.cancel (); } else - delete packet; + m_LocalDestination.DeletePacket (packet); m_LastReceivedSequenceNumber = receivedSeqn; @@ -278,7 +278,7 @@ namespace stream m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); - delete sentPacket; + m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start @@ -345,7 +345,7 @@ namespace stream std::unique_lock l(m_SendBufferMutex); while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) { - Packet * p = new Packet (); + Packet * p = m_LocalDestination.NewPacket (); uint8_t * packet = p->GetBuffer (); // TODO: implement setters size_t size = 0; @@ -532,7 +532,7 @@ namespace stream void Stream::SendClose () { - Packet * p = new Packet (); + Packet * p = m_LocalDestination.NewPacket (); uint8_t * packet = p->GetBuffer (); size_t size = 0; htobe32buf (packet + size, m_SendStreamID); @@ -574,7 +574,7 @@ namespace stream if (!packet->GetLength ()) { m_ReceiveQueue.pop (); - delete packet; + m_LocalDestination.DeletePacket (packet); } } return pos; @@ -852,7 +852,7 @@ namespace stream { for (auto& it: m_SavedPackets) { - for (auto it1: it.second) delete it1; + for (auto it1: it.second) DeletePacket (it1); it.second.clear (); } m_SavedPackets.clear (); @@ -867,6 +867,7 @@ namespace stream { ResetAcceptor (); m_PendingIncomingTimer.cancel (); + m_PendingIncomingStreams.clear (); m_ConnTrackTimer.cancel(); { std::unique_lock l(m_StreamsMutex); @@ -889,7 +890,7 @@ namespace stream else { LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); - delete packet; + DeletePacket (packet); } } else @@ -901,7 +902,7 @@ namespace stream { // already pending LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); - delete packet; // drop it, because previous should be connected + DeletePacket (packet); // drop it, because previous should be connected return; } auto incomingStream = CreateNewIncomingStream (); @@ -980,7 +981,7 @@ namespace stream auto it = s->m_SavedPackets.find (receiveStreamID); if (it != s->m_SavedPackets.end ()) { - for (auto it1: it->second) delete it1; + for (auto it1: it->second) s->DeletePacket (it1); it->second.clear (); s->m_SavedPackets.erase (it); } @@ -1020,14 +1021,16 @@ namespace stream void StreamingDestination::SetAcceptor (const Acceptor& acceptor) { - m_Owner->GetService ().post([acceptor, this](void) + m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet + auto s = shared_from_this (); + m_Owner->GetService ().post([s](void) { - m_Acceptor = acceptor; - for (auto& it: m_PendingIncomingStreams) + // take care about incoming queue + for (auto& it: s->m_PendingIncomingStreams) if (it->GetStatus () == eStreamStatusOpen) // still open? - m_Acceptor (it); - m_PendingIncomingStreams.clear (); - m_PendingIncomingTimer.cancel (); + s->m_Acceptor (it); + s->m_PendingIncomingStreams.clear (); + s->m_PendingIncomingTimer.cancel (); }); } @@ -1074,13 +1077,13 @@ namespace stream void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) { // unzip it - Packet * uncompressed = new Packet; + Packet * uncompressed = NewPacket (); uncompressed->offset = 0; uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); if (uncompressed->len) HandleNextPacket (uncompressed); else - delete uncompressed; + DeletePacket (uncompressed); } std::shared_ptr StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort) diff --git a/Streaming.h b/Streaming.h index 1593f961..828fccc7 100644 --- a/Streaming.h +++ b/Streaming.h @@ -18,6 +18,7 @@ #include "I2NPProtocol.h" #include "Garlic.h" #include "Tunnel.h" +#include "util.h" // MemoryPool namespace i2p { @@ -234,6 +235,9 @@ namespace stream /** set max connections per minute per destination */ void SetMaxConnsPerMinute(const uint32_t conns); + + Packet * NewPacket () { return m_PacketsPool.Acquire (); }; + void DeletePacket (Packet * p) { if (p) m_PacketsPool.Release (p); }; private: @@ -269,7 +273,9 @@ namespace stream /** banned identities */ std::vector m_Banned; uint64_t m_LastBanClear; - + + i2p::util::MemoryPool m_PacketsPool; + public: i2p::data::GzipInflator m_Inflator; diff --git a/Transports.cpp b/Transports.cpp index 163a7da9..1659eeed 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -264,7 +264,7 @@ namespace transport void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { #ifdef WITH_EVENTS - EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}}); + QueueIntEvent("transport.send", ident.ToBase64(), msgs.size()); #endif m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); } diff --git a/Tunnel.cpp b/Tunnel.cpp index 84c4979f..6f9346d2 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -587,6 +587,7 @@ namespace tunnel for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();) { auto tunnel = it->second; + auto pool = tunnel->GetTunnelPool(); switch (tunnel->GetState ()) { case eTunnelStatePending: @@ -612,6 +613,8 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout); // delete it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; @@ -624,6 +627,9 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected); + it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; break; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 07057918..dfdfbf58 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -81,6 +81,8 @@ namespace tunnel } if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); + + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); } void TunnelPool::TunnelExpired (std::shared_ptr expiredTunnel) @@ -109,6 +111,8 @@ namespace tunnel std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.insert (createdTunnel); } + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); + //CreatePairedInboundTunnel (createdTunnel); } @@ -579,5 +583,11 @@ namespace tunnel } return tun; } + + void TunnelPool::OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result) + { + auto peers = tunnel->GetPeers(); + if(m_CustomPeerSelector) m_CustomPeerSelector->OnBuildResult(peers, tunnel->IsInbound(), result); + } } } diff --git a/TunnelPool.h b/TunnelPool.h index 6a73bd67..9e2a3e24 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -23,12 +23,21 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + + enum TunnelBuildResult { + eBuildResultOkay, // tunnel was built okay + eBuildResultRejected, // tunnel build was explicitly rejected + eBuildResultTimeout // tunnel build timed out + }; + /** interface for custom tunnel peer selection algorithm */ struct ITunnelPeerSelector { typedef std::shared_ptr Peer; typedef std::vector TunnelPath; + virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0; + virtual bool OnBuildResult(TunnelPath & peers, bool isInbound, TunnelBuildResult result) = 0; }; typedef std::shared_ptr TunnelPeerSelector; @@ -79,6 +88,8 @@ namespace tunnel /** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */ std::shared_ptr GetLowestLatencyInboundTunnel(std::shared_ptr exclude=nullptr) const; std::shared_ptr GetLowestLatencyOutboundTunnel(std::shared_ptr exclude=nullptr) const; + + void OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result); private: diff --git a/WebSocks.cpp b/WebSocks.cpp new file mode 100644 index 00000000..b6faa711 --- /dev/null +++ b/WebSocks.cpp @@ -0,0 +1,467 @@ +#include "WebSocks.h" +#include "Log.h" +#include + +#ifdef WITH_EVENTS +#include "ClientContext.h" +#include "Identity.h" +#include "Destination.h" +#include "Streaming.h" +#include + +#include +#include + +#include +#define GCC47_BOOST149 ((BOOST_VERSION == 104900) && (__GNUC__ == 4) && (__GNUC_MINOR__ >= 7)) +#if !GCC47_BOOST149 +#include +#endif + +namespace i2p +{ +namespace client +{ + typedef websocketpp::server WebSocksServerImpl; + + typedef std::function)> StreamConnectFunc; + + + struct IWebSocksConn : public I2PServiceHandler + { + IWebSocksConn(I2PService * parent) : I2PServiceHandler(parent) {} + virtual void Close() = 0; + virtual void GotMessage(const websocketpp::connection_hdl & conn, WebSocksServerImpl::message_ptr msg) = 0; + }; + + typedef std::shared_ptr WebSocksConn_ptr; + + WebSocksConn_ptr CreateWebSocksConn(const websocketpp::connection_hdl & conn, WebSocksImpl * parent); + + class WebSocksImpl + { + + typedef std::mutex mutex_t; + typedef std::unique_lock lock_t; + + typedef std::shared_ptr Destination_t; + public: + + typedef WebSocksServerImpl ServerImpl; + typedef ServerImpl::message_ptr MessagePtr; + + WebSocksImpl(const std::string & addr, int port) : + Parent(nullptr), + m_Run(false), + m_Addr(addr), + m_Port(port), + m_Thread(nullptr) + { + m_Server.init_asio(); + m_Server.set_open_handler(std::bind(&WebSocksImpl::ConnOpened, this, std::placeholders::_1)); + } + + void InitializeDestination(WebSocks * parent) + { + Parent = parent; + m_Dest = Parent->GetLocalDestination(); + } + + ServerImpl::connection_ptr GetConn(const websocketpp::connection_hdl & conn) + { + return m_Server.get_con_from_hdl(conn); + } + + void CloseConn(const websocketpp::connection_hdl & conn) + { + auto c = GetConn(conn); + if(c) c->close(websocketpp::close::status::normal, "closed"); + } + + void CreateStreamTo(const std::string & addr, int port, StreamConnectFunc complete) + { + auto & addressbook = i2p::client::context.GetAddressBook(); + i2p::data::IdentHash ident; + if(addressbook.GetIdentHash(addr, ident)) { + // address found + m_Dest->CreateStream(complete, ident, port); + } else { + // not found + complete(nullptr); + } + } + + void ConnOpened(websocketpp::connection_hdl conn) + { + auto ptr = CreateWebSocksConn(conn, this); + Parent->AddHandler(ptr); + m_Conns.push_back(ptr); + } + + void Start() + { + if(m_Run) return; // already started + m_Server.listen(boost::asio::ip::address::from_string(m_Addr), m_Port); + m_Server.start_accept(); + m_Run = true; + m_Thread = new std::thread([&] (){ + while(m_Run) { + try { + m_Server.run(); + } catch( std::exception & ex) { + LogPrint(eLogError, "Websocks runtime exception: ", ex.what()); + } + } + }); + m_Dest->Start(); + } + + void Stop() + { + for(const auto & conn : m_Conns) + conn->Close(); + + m_Dest->Stop(); + m_Run = false; + m_Server.stop(); + if(m_Thread) { + m_Thread->join(); + delete m_Thread; + } + m_Thread = nullptr; + } + + boost::asio::ip::tcp::endpoint GetLocalEndpoint() + { + return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(m_Addr), m_Port); + } + + WebSocks * Parent; + + private: + std::vector m_Conns; + bool m_Run; + ServerImpl m_Server; + std::string m_Addr; + int m_Port; + std::thread * m_Thread; + Destination_t m_Dest; + }; + + struct WebSocksConn : public IWebSocksConn , public std::enable_shared_from_this + { + enum ConnState + { + eWSCInitial, + eWSCTryConnect, + eWSCFailConnect, + eWSCOkayConnect, + eWSCClose, + eWSCEnd + }; + + typedef WebSocksServerImpl ServerImpl; + typedef ServerImpl::message_ptr Message_t; + typedef websocketpp::connection_hdl ServerConn; + typedef std::shared_ptr Destination_t; + typedef std::shared_ptr StreamDest_t; + typedef std::shared_ptr Stream_t; + + ServerConn m_Conn; + Stream_t m_Stream; + ConnState m_State; + WebSocksImpl * m_Parent; + std::string m_RemoteAddr; + int m_RemotePort; + uint8_t m_RecvBuf[2048]; + + WebSocksConn(const ServerConn & conn, WebSocksImpl * parent) : + IWebSocksConn(parent->Parent), + m_Conn(conn), + m_Stream(nullptr), + m_State(eWSCInitial), + m_Parent(parent) + { + + } + + ~WebSocksConn() + { + Close(); + } + + void EnterState(ConnState state) + { + LogPrint(eLogDebug, "websocks: state ", m_State, " -> ", state); + switch(m_State) + { + case eWSCInitial: + if (state == eWSCClose) { + m_State = eWSCClose; + // connection was opened but never used + LogPrint(eLogInfo, "websocks: connection closed but never used"); + Close(); + return; + } else if (state == eWSCTryConnect) { + // we will try to connect + m_State = eWSCTryConnect; + m_Parent->CreateStreamTo(m_RemoteAddr, m_RemotePort, std::bind(&WebSocksConn::ConnectResult, this, std::placeholders::_1)); + } else { + LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state); + } + return; + case eWSCTryConnect: + if(state == eWSCOkayConnect) { + // we connected okay + LogPrint(eLogDebug, "websocks: connected to ", m_RemoteAddr, ":", m_RemotePort); + SendResponse(""); + m_State = eWSCOkayConnect; + } else if(state == eWSCFailConnect) { + // we did not connect okay + LogPrint(eLogDebug, "websocks: failed to connect to ", m_RemoteAddr, ":", m_RemotePort); + SendResponse("failed to connect"); + m_State = eWSCFailConnect; + EnterState(eWSCInitial); + } else if(state == eWSCClose) { + // premature close + LogPrint(eLogWarning, "websocks: websocket connection closed prematurely"); + m_State = eWSCClose; + } else { + LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state); + } + return; + case eWSCFailConnect: + if (state == eWSCInitial) { + // reset to initial state so we can try connecting again + m_RemoteAddr = ""; + m_RemotePort = 0; + LogPrint(eLogDebug, "websocks: reset websocket conn to initial state"); + m_State = eWSCInitial; + } else if (state == eWSCClose) { + // we are going to close the connection + m_State = eWSCClose; + Close(); + } else { + LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state); + } + return; + case eWSCOkayConnect: + if(state == eWSCClose) { + // graceful close + m_State = eWSCClose; + Close(); + } else { + LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state); + } + case eWSCClose: + if(state == eWSCEnd) { + LogPrint(eLogDebug, "websocks: socket ended"); + Kill(); + auto me = shared_from_this(); + Done(me); + } else { + LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state); + } + return; + default: + LogPrint(eLogError, "websocks: bad state ", m_State); + } + } + + void StartForwarding() + { + LogPrint(eLogDebug, "websocks: begin forwarding data"); + uint8_t b[1]; + m_Stream->Send(b, 0); + AsyncRecv(); + } + + void HandleAsyncRecv(const boost::system::error_code &ec, std::size_t n) + { + if(ec) { + // error + LogPrint(eLogWarning, "websocks: connection error ", ec.message()); + EnterState(eWSCClose); + } else { + // forward data + LogPrint(eLogDebug, "websocks recv ", n); + + std::string str((char*)m_RecvBuf, n); + auto conn = m_Parent->GetConn(m_Conn); + if(!conn) { + LogPrint(eLogWarning, "websocks: connection is gone"); + EnterState(eWSCClose); + return; + } + conn->send(str); + AsyncRecv(); + + } + } + + void AsyncRecv() + { + m_Stream->AsyncReceive( + boost::asio::buffer(m_RecvBuf, sizeof(m_RecvBuf)), + std::bind(&WebSocksConn::HandleAsyncRecv, this, std::placeholders::_1, std::placeholders::_2), 60); + } + + /** @brief send error message or empty string for success */ + void SendResponse(const std::string & errormsg) + { + boost::property_tree::ptree resp; + if(errormsg.size()) { + resp.put("error", errormsg); + resp.put("success", 0); + } else { + resp.put("success", 1); + } + std::ostringstream ss; + write_json(ss, resp); + auto conn = m_Parent->GetConn(m_Conn); + if(conn) conn->send(ss.str()); + } + + void ConnectResult(Stream_t stream) + { + m_Stream = stream; + if(m_State == eWSCClose) { + // premature close of websocket + Close(); + return; + } + if(m_Stream) { + // connect good + EnterState(eWSCOkayConnect); + StartForwarding(); + } else { + // connect failed + EnterState(eWSCFailConnect); + } + } + + virtual void GotMessage(const websocketpp::connection_hdl & conn, WebSocksServerImpl::message_ptr msg) + { + (void) conn; + std::string payload = msg->get_payload(); + if(m_State == eWSCOkayConnect) + { + // forward to server + LogPrint(eLogDebug, "websocks: forward ", payload.size()); + m_Stream->Send((uint8_t*)payload.c_str(), payload.size()); + } else if (m_State == eWSCInitial) { + // recv connect request + auto itr = payload.find(":"); + if(itr == std::string::npos) { + // no port + m_RemotePort = 0; + m_RemoteAddr = payload; + } else { + // includes port + m_RemotePort = std::stoi(payload.substr(itr+1)); + m_RemoteAddr = payload.substr(0, itr); + } + EnterState(eWSCTryConnect); + } else { + // wtf? + LogPrint(eLogWarning, "websocks: got message in invalid state ", m_State); + } + } + + virtual void Close() + { + if(m_State == eWSCClose) { + LogPrint(eLogDebug, "websocks: closing connection"); + if(m_Stream) m_Stream->Close(); + m_Parent->CloseConn(m_Conn); + EnterState(eWSCEnd); + } else { + EnterState(eWSCClose); + } + } + }; + + WebSocksConn_ptr CreateWebSocksConn(const websocketpp::connection_hdl & conn, WebSocksImpl * parent) + { + auto ptr = std::make_shared(conn, parent); + auto c = parent->GetConn(conn); + c->set_message_handler(std::bind(&WebSocksConn::GotMessage, ptr.get(), std::placeholders::_1, std::placeholders::_2)); + return ptr; + } + +} +} +#else + +// no websocket support + +namespace i2p +{ +namespace client +{ + class WebSocksImpl + { + public: + WebSocksImpl(const std::string & addr, int port) : m_Addr(addr), m_Port(port) + { + } + + ~WebSocksImpl() + { + } + + void Start() + { + LogPrint(eLogInfo, "WebSockets not enabled on compile time"); + } + + void Stop() + { + } + + void InitializeDestination(WebSocks * parent) + { + } + + boost::asio::ip::tcp::endpoint GetLocalEndpoint() + { + return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(m_Addr), m_Port); + } + + std::string m_Addr; + int m_Port; + + }; +} +} + +#endif +namespace i2p +{ +namespace client +{ + WebSocks::WebSocks(const std::string & addr, int port, std::shared_ptr localDestination) : m_Impl(new WebSocksImpl(addr, port)) + { + m_Impl->InitializeDestination(this); + } + WebSocks::~WebSocks() { delete m_Impl; } + + void WebSocks::Start() + { + m_Impl->Start(); + GetLocalDestination()->Start(); + } + + boost::asio::ip::tcp::endpoint WebSocks::GetLocalEndpoint() const + { + return m_Impl->GetLocalEndpoint(); + } + + void WebSocks::Stop() + { + m_Impl->Stop(); + GetLocalDestination()->Stop(); + } +} +} + diff --git a/WebSocks.h b/WebSocks.h new file mode 100644 index 00000000..2314659f --- /dev/null +++ b/WebSocks.h @@ -0,0 +1,34 @@ +#ifndef WEBSOCKS_H_ +#define WEBSOCKS_H_ +#include +#include +#include "I2PService.h" +#include "Destination.h" + +namespace i2p +{ +namespace client +{ + + class WebSocksImpl; + + /** @brief websocket socks proxy server */ + class WebSocks : public i2p::client::I2PService + { + public: + WebSocks(const std::string & addr, int port, std::shared_ptr localDestination); + ~WebSocks(); + + void Start(); + void Stop(); + + boost::asio::ip::tcp::endpoint GetLocalEndpoint() const; + + const char * GetName() { return "WebSOCKS Proxy"; } + + private: + WebSocksImpl * m_Impl; + }; +} +} +#endif diff --git a/Websocket.cpp b/Websocket.cpp index 0de44efe..a1a58c5f 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -2,6 +2,7 @@ #include "Log.h" #include +#include #include #include @@ -27,7 +28,11 @@ namespace i2p typedef ServerImpl::message_ptr MessagePtr; public: - WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr) + WebsocketServerImpl(const std::string & addr, int port) : + m_run(false), + m_ws_thread(nullptr), + m_ev_thread(nullptr), + m_WebsocketTicker(m_Service) { m_server.init_asio(); m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1)); @@ -44,7 +49,7 @@ namespace i2p void Start() { m_run = true; m_server.start_accept(); - m_thread = new std::thread([&] () { + m_ws_thread = new std::thread([&] () { while(m_run) { try { m_server.run(); @@ -53,16 +58,35 @@ namespace i2p } } }); + m_ev_thread = new std::thread([&] () { + while(m_run) { + try { + m_Service.run(); + break; + } catch (std::exception & e ) { + LogPrint(eLogError, "Websocket service: ", e.what()); + } + } + }); + ScheduleTick(); } void Stop() { m_run = false; + m_Service.stop(); m_server.stop(); - if(m_thread) { - m_thread->join(); - delete m_thread; + + if(m_ev_thread) { + m_ev_thread->join(); + delete m_ev_thread; } - m_thread = nullptr; + m_ev_thread = nullptr; + + if(m_ws_thread) { + m_ws_thread->join(); + delete m_ws_thread; + } + m_ws_thread = nullptr; } void ConnOpened(ServerConn c) @@ -82,11 +106,40 @@ namespace i2p (void) conn; (void) msg; } + + void HandleTick(const boost::system::error_code & ec) + { + + if(ec != boost::asio::error::operation_aborted) + LogPrint(eLogError, "Websocket ticker: ", ec.message()); + // pump collected events to us + i2p::event::core.PumpCollected(this); + ScheduleTick(); + } + + void ScheduleTick() + { + LogPrint(eLogDebug, "Websocket schedule tick"); + boost::posix_time::seconds dlt(1); + m_WebsocketTicker.expires_from_now(dlt); + m_WebsocketTicker.async_wait(std::bind(&WebsocketServerImpl::HandleTick, this, std::placeholders::_1)); + } + /** @brief called from m_ev_thread */ + void HandlePumpEvent(const EventType & ev, const uint64_t & val) + { + EventType e; + for (const auto & i : ev) + e[i.first] = i.second; + + e["number"] = std::to_string(val); + HandleEvent(e); + } + + /** @brief called from m_ws_thread */ void HandleEvent(const EventType & ev) { std::lock_guard lock(m_connsMutex); - LogPrint(eLogDebug, "websocket event"); boost::property_tree::ptree event; for (const auto & item : ev) { event.put(item.first, item.second); @@ -105,10 +158,13 @@ namespace i2p private: typedef std::set > ConnList; bool m_run; - std::thread * m_thread; + std::thread * m_ws_thread; + std::thread * m_ev_thread; std::mutex m_connsMutex; ConnList m_conns; ServerImpl m_server; + boost::asio::io_service m_Service; + boost::asio::deadline_timer m_WebsocketTicker; }; diff --git a/android/jni/Android.mk b/android/jni/Android.mk index c44594f0..5eaa983e 100755 --- a/android/jni/Android.mk +++ b/android/jni/Android.mk @@ -59,7 +59,8 @@ LOCAL_SRC_FILES := DaemonAndroid.cpp i2pd_android.cpp \ ../../TunnelPool.cpp \ ../../Timestamp.cpp \ ../../Event.cpp \ - ../../BloomFilter.cpp \ + ../../WebSocks.cpp \ +../../BloomFilter.cpp \ ../../util.cpp \ ../../i2pd.cpp ../../UPnP.cpp diff --git a/api.cpp b/api.cpp index 5148ed41..ec109439 100644 --- a/api.cpp +++ b/api.cpp @@ -33,6 +33,10 @@ namespace api #else i2p::crypto::InitCrypto (true); #endif + + int netID; i2p::config::GetOption("netid", netID); + i2p::context.SetNetID (netID); + i2p::context.Init (); } diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index a1fd19c2..2baa02aa 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -98,6 +98,7 @@ set (CLIENT_SRC "${CMAKE_SOURCE_DIR}/HTTP.cpp" "${CMAKE_SOURCE_DIR}/HTTPProxy.cpp" "${CMAKE_SOURCE_DIR}/I2CP.cpp" + "${CMAKE_SOURCE_DIR}/WebSocks.cpp" ) if(WITH_WEBSOCKETS) diff --git a/build/docker/README.md b/build/docker/README.md new file mode 100644 index 00000000..df2cdc01 --- /dev/null +++ b/build/docker/README.md @@ -0,0 +1,34 @@ +Howto build & run +================== + +**Build** + +Assuming you're in the root directory of the anoncoin source code. + +$ `cd build/docker` +$ `docker -t meeh/i2pd:latest .` + +**Run** + +To run either the local build, or if not found - fetched prebuild from hub.docker.io, run the following command. + +$ `docker run --name anonnode -v /path/to/i2pd/datadir/on/host:/var/lib/i2pd -p 7070:7070 -p 4444:4444 -p 4447:4447 -p 7656:7656 -p 2827:2827 -p 7654:7654 -p 7650:7650 -d meeh/i2pd` + +All the ports ( -p HOSTPORT:DOCKERPORT ) is optional. However the command above enable all features (Webconsole, HTTP Proxy, BOB, SAM, i2cp, etc) + +The volume ( -v HOSTDIR:DOCKERDIR ) is also optional, but if you don't use it, your config, routerid and private keys will die along with the container. + +**Options** + +Options are set via docker environment variables. This can be set at run with -e parameters. + +* **ENABLE_IPV6** - Enable IPv6 support. Any value can be used - it triggers as long as it's not empty. +* **LOGLEVEL** - Set the loglevel. +* **ENABLE_AUTH** - Enable auth for the webconsole. Username and password needs to be set manually in i2pd.conf cause security reasons. + +**Logging** + +Logging happens to STDOUT as the best practise with docker containers, since infrastructure systems like kubernetes with ELK integration can automaticly forward the log to say, kibana or greylog without manual setup. :) + + + diff --git a/build/Dockerfile b/build/docker/old-ubuntu-based/Dockerfile similarity index 100% rename from build/Dockerfile rename to build/docker/old-ubuntu-based/Dockerfile diff --git a/docs/configuration.md b/docs/configuration.md index 3212aea9..4894bfe3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,8 +35,9 @@ Windows-specific options: All options below still possible in cmdline, but better write it in config file: +* --http.enabled= - If webconsole is enabled. true by default * --http.address= - The address to listen on (HTTP server) -* --http.port= - The port to listen on (HTTP server) +* --http.port= - The port to listen on (HTTP server) 7070 by default * --http.auth - Enable basic HTTP auth for webconsole * --http.user= - Username for basic auth (default: i2pd) * --http.pass= - Password for basic auth (default: random, see logs) diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 00000000..24a415aa --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,24 @@ +#!/bin/sh + +ARGS="" +if [ "${ENABLE_IPV6}" != "" ]; then + ARGS="${ARGS} –ipv6" +fi + +if [ "${LOGLEVEL}" != "" ]; then + ARGS="${ARGS} –loglevel=${LOGLEVEL}" +fi + +if [ "${ENABLE_AUTH}" != "" ]; then + ARGS="${ARGS} –http.auth" +fi + + +# To make ports exposeable +DEFAULT_ARGS=" –http.address=0.0.0.0 –httpproxy.address=0.0.0.0 -socksproxy.address=0.0.0.0 –sam.address=0.0.0.0 –bob.address=0.0.0.0 –i2cp.address=0.0.0.0 –i2pcontrol.port=0.0.0.0 –upnp.enabled=false -service " + +mkdir -p /var/lib/i2pd && chown -R i2pd:nobody /var/lib/i2pd && chmod u+rw /var/lib/i2pd + +gosu i2pd i2pd $DEFAULT_ARGS $ARGS + + diff --git a/filelist.mk b/filelist.mk index 76f58785..d5d703a6 100644 --- a/filelist.mk +++ b/filelist.mk @@ -9,7 +9,7 @@ LIB_SRC = \ LIB_CLIENT_SRC = \ AddressBook.cpp BOB.cpp ClientContext.cpp I2PTunnel.cpp I2PService.cpp \ - SAM.cpp SOCKS.cpp HTTPProxy.cpp I2CP.cpp + SAM.cpp SOCKS.cpp HTTPProxy.cpp I2CP.cpp WebSocks.cpp # also: Daemon{Linux,Win32}.cpp will be added later DAEMON_SRC = \ diff --git a/util.h b/util.h index b8de8b5a..2ee1c286 100644 --- a/util.h +++ b/util.h @@ -1,9 +1,10 @@ #ifndef UTIL_H #define UTIL_H -#include #include -#include +#include +#include +#include #include #ifdef ANDROID @@ -27,6 +28,88 @@ namespace i2p { namespace util { + + template + class MemoryPool + { + public: + + MemoryPool (): m_Head (nullptr) {}; + ~MemoryPool () + { + while (m_Head) + { + auto tmp = m_Head; + m_Head = static_cast(*(void * *)m_Head); // next + delete tmp; + } + } + + template + T * Acquire (TArgs&&... args) + { + if (!m_Head) return new T(args...); + else + { + auto tmp = m_Head; + m_Head = static_cast(*(void * *)m_Head); // next + return new (tmp)T(args...); + } + } + + void Release (T * t) + { + if (!t) return; + t->~T (); + *(void * *)t = m_Head; // next + m_Head = t; + } + + template + std::unique_ptr > AcquireUnique (TArgs&&... args) + { + return std::unique_ptr >(Acquire (args...), + std::bind (&MemoryPool::Release, this, std::placeholders::_1)); + } + + protected: + + T * m_Head; + }; + + template + class MemoryPoolMt: public MemoryPool + { + public: + + MemoryPoolMt () {}; + template + T * AcquireMt (TArgs&&... args) + { + if (!this->m_Head) return new T(args...); + std::lock_guard l(m_Mutex); + return this->Acquire (args...); + } + + void ReleaseMt (T * t) + { + std::lock_guard l(m_Mutex); + this->Release (t); + } + + templateclass C, typename... R> + void ReleaseMt(const C& c) + { + std::lock_guard l(m_Mutex); + for (auto& it: c) + this->Release (it); + } + + private: + + std::mutex m_Mutex; + }; + namespace net { int GetMTU (const boost::asio::ip::address& localAddress);