Merge pull request #779 from PurpleI2P/openssl

recent changes
This commit is contained in:
orignal 2017-01-16 22:06:32 -05:00 committed by GitHub
commit e0879fbccb
44 changed files with 1510 additions and 357 deletions

View File

@ -8,6 +8,8 @@
#include "Identity.h"
#include "util.h"
#include "ClientContext.h"
#include "SOCKS.h"
#include "WebSocks.h"
namespace i2p
{
@ -424,10 +426,16 @@ namespace client
try
{
std::string type = section.second.get<std::string> (I2P_TUNNELS_SECTION_TYPE);
if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT)
if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT
|| type == I2P_TUNNELS_SECTION_TYPE_SOCKS
|| type == I2P_TUNNELS_SECTION_TYPE_WEBSOCKS
|| type == I2P_TUNNELS_SECTION_TYPE_HTTPPROXY
|| type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT)
{
// mandatory params
std::string dest = section.second.get<std::string> (I2P_CLIENT_TUNNEL_DESTINATION);
std::string dest;
if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT)
dest = section.second.get<std::string> (I2P_CLIENT_TUNNEL_DESTINATION);
int port = section.second.get<int> (I2P_CLIENT_TUNNEL_PORT);
// optional params
std::string keys = section.second.get (I2P_CLIENT_TUNNEL_KEYS, "");
@ -466,19 +474,45 @@ namespace client
LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists");
} else {
// tcp client
auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort);
if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (),
std::unique_ptr<I2PClientTunnel>(clientTunnel))).second)
boost::asio::ip::tcp::endpoint clientEndpoint;
I2PService * clientTunnel = nullptr;
if (type == I2P_TUNNELS_SECTION_TYPE_SOCKS)
{
// socks proxy
clientTunnel = new i2p::proxy::SOCKSProxy(address, port, "", destinationPort, localDestination);
clientEndpoint = ((i2p::proxy::SOCKSProxy*)clientTunnel)->GetAcceptor().local_endpoint();
}
else if (type == I2P_TUNNELS_SECTION_TYPE_HTTPPROXY)
{
// http proxy
clientTunnel = new i2p::proxy::HTTPProxy(address, port, localDestination);
clientEndpoint = ((i2p::proxy::HTTPProxy*)clientTunnel)->GetAcceptor().local_endpoint();
}
else if (type == I2P_TUNNELS_SECTION_TYPE_WEBSOCKS)
{
// websocks proxy
clientTunnel = new WebSocks(address, port, localDestination);;
clientEndpoint = ((WebSocks*)clientTunnel)->GetLocalEndpoint();
}
else
{
// tcp client
clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort);
clientEndpoint = ((I2PClientTunnel*)clientTunnel)->GetAcceptor().local_endpoint();
}
if (m_ClientTunnels.insert (std::make_pair (clientEndpoint, std::unique_ptr<I2PService>(clientTunnel))).second)
{
clientTunnel->Start ();
numClientTunnels++;
}
else
LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists");
LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientEndpoint, "already exists");
}
}
else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER
|| type == I2P_TUNNELS_SECTION_TYPE_HTTP
|| type == I2P_TUNNELS_SECTION_TYPE_IRC
|| type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
{
// mandatory params
std::string host = section.second.get<std::string> (I2P_SERVER_TUNNEL_HOST);
@ -493,7 +527,8 @@ namespace client
i2p::data::SigningKeyType sigType = section.second.get (I2P_SERVER_TUNNEL_SIGNATURE_TYPE, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256);
uint32_t maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN);
std::string address = section.second.get<std::string> (I2P_SERVER_TUNNEL_ADDRESS, "127.0.0.1");
bool isUniqueLocal = section.second.get(I2P_SERVER_TUNNEL_ENABLE_UNIQUE_LOCAL, true);
// I2CP
std::map<std::string, std::string> options;
ReadI2CPOptions (section, options);
@ -512,6 +547,11 @@ namespace client
auto localAddress = boost::asio::ip::address::from_string(address);
boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port);
I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port);
if(!isUniqueLocal)
{
LogPrint(eLogInfo, "Clients: disabling loopback address mapping");
serverTunnel->SetUniqueLocal(isUniqueLocal);
}
std::lock_guard<std::mutex> lock(m_ForwardsMutex);
if(m_ServerForwards.insert(
std::make_pair(
@ -538,8 +578,12 @@ namespace client
LogPrint(eLogInfo, "Clients: Set Max Conns To ", maxConns);
serverTunnel->SetMaxConnsPerMinute(maxConns);
if(!isUniqueLocal)
{
LogPrint(eLogInfo, "Clients: disabling loopback address mapping");
serverTunnel->SetUniqueLocal(isUniqueLocal);
}
if (accessList.length () > 0)
{
std::set<i2p::data::IdentHash> idents;

View File

@ -26,6 +26,9 @@ namespace client
const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc";
const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient";
const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver";
const char I2P_TUNNELS_SECTION_TYPE_SOCKS[] = "socks";
const char I2P_TUNNELS_SECTION_TYPE_WEBSOCKS[] = "websocks";
const char I2P_TUNNELS_SECTION_TYPE_HTTPPROXY[] = "httpproxy";
const char I2P_CLIENT_TUNNEL_PORT[] = "port";
const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address";
const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination";
@ -42,7 +45,8 @@ namespace client
const char I2P_SERVER_TUNNEL_GZIP[] = "gzip";
const char I2P_SERVER_TUNNEL_WEBIRC_PASSWORD[] = "webircpassword";
const char I2P_SERVER_TUNNEL_ADDRESS[] = "address";
const char I2P_SERVER_TUNNEL_ENABLE_UNIQUE_LOCAL[] = "enableuniquelocal";
class ClientContext
{
public:
@ -92,7 +96,7 @@ namespace client
i2p::proxy::HTTPProxy * m_HttpProxy;
i2p::proxy::SOCKSProxy * m_SocksProxy;
std::map<boost::asio::ip::tcp::endpoint, std::unique_ptr<I2PClientTunnel> > m_ClientTunnels; // local endpoint->tunnel
std::map<boost::asio::ip::tcp::endpoint, std::unique_ptr<I2PService> > m_ClientTunnels; // local endpoint->tunnel
std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PServerTunnel> > m_ServerTunnels; // <destination,port>->tunnel
std::mutex m_ForwardsMutex;

View File

@ -90,7 +90,8 @@ namespace config {
("httpproxy.inbound.quantity", value<std::string>()->default_value("5"), "HTTP proxy inbound tunnels quantity")
("httpproxy.outbound.quantity", value<std::string>()->default_value("5"), "HTTP proxy outbound tunnels quantity")
("httpproxy.latency.min", value<std::string>()->default_value("0"), "HTTP proxy min latency for tunnels")
("httpproxy.latency.max", value<std::string>()->default_value("0"), "HTTP proxy max latency for tunnels")
("httpproxy.latency.max", value<std::string>()->default_value("0"), "HTTP proxy max latency for tunnels")
("httpproxy.outproxy", value<std::string>()->default_value(""), "HTTP proxy upstream out proxy url")
;
options_description socksproxy("SOCKS Proxy options");
@ -201,7 +202,7 @@ namespace config {
("websockets.enabled", value<bool>()->default_value(false), "enable websocket server")
("websockets.address", value<std::string>()->default_value("127.0.0.1"), "address to bind websocket server on")
("websockets.port", value<uint16_t>()->default_value(7666), "port to bind websocket server on");
m_OptionsDesc
.add(general)
.add(limits)

View File

@ -307,7 +307,6 @@ namespace i2p
d.m_WebsocketServer->Start();
i2p::event::core.SetListener(d.m_WebsocketServer->ToListener());
}
#endif
return true;
}

View File

@ -45,7 +45,8 @@ namespace datagram
owner->Sign (buf1, len, signature);
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
ObtainSession(identity)->SendMsg(msg);
auto session = ObtainSession(identity);
session->SendMsg(msg);
}
@ -69,7 +70,8 @@ namespace datagram
if (verified)
{
auto h = identity.GetIdentHash();
ObtainSession(h)->Ack();
auto session = ObtainSession(h);
session->Ack();
auto r = FindReceiver(toPort);
if(r)
r(identity, fromPort, toPort, buf + headerLen, len -headerLen);
@ -180,7 +182,8 @@ namespace datagram
// we used this session
m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
// schedule send
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg));
auto self = shared_from_this();
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, self, msg));
}
DatagramSession::Info DatagramSession::GetSessionInfo() const
@ -237,11 +240,11 @@ namespace datagram
m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel);
path->outboundTunnel = m_CurrentOutboundTunnel;
}
if(m_CurrentRemoteLease && ! m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) {
if(m_CurrentRemoteLease && m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) {
// bad lease, switch to next one
if(m_RemoteLeaseSet) {
auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&](const i2p::data::Lease& l) -> bool {
return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway || l.endDate <= m_CurrentRemoteLease->endDate;
return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway;
});
auto sz = ls.size();
if (sz) {
@ -332,7 +335,8 @@ namespace datagram
{
boost::posix_time::milliseconds dlt(100);
m_SendQueueTimer.expires_from_now(dlt);
m_SendQueueTimer.async_wait([&](const boost::system::error_code & ec) { if(ec) return; FlushSendQueue(); });
auto self = shared_from_this();
m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) { if(ec) return; self->FlushSendQueue(); });
}
}
}

View File

@ -34,7 +34,7 @@ namespace datagram
// max 64 messages buffered in send queue for each datagram session
const size_t DATAGRAM_SEND_QUEUE_MAX_SIZE = 64;
class DatagramSession
class DatagramSession : public std::enable_shared_from_this<DatagramSession>
{
public:
DatagramSession(i2p::client::ClientDestination * localDestination,
@ -90,7 +90,9 @@ namespace datagram
uint64_t m_LastUse;
bool m_RequestingLS;
};
typedef std::shared_ptr<DatagramSession> DatagramSession_ptr;
const size_t MAX_DATAGRAM_SIZE = 32768;
class DatagramDestination
{
@ -132,7 +134,7 @@ namespace datagram
i2p::data::IdentityEx m_Identity;
Receiver m_Receiver; // default
std::mutex m_SessionsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<DatagramSession> > m_Sessions;
std::map<i2p::data::IdentHash, DatagramSession_ptr > m_Sessions;
std::mutex m_ReceiversMutex;
std::map<uint16_t, Receiver> m_ReceiversByPorts;

54
Dockerfile Normal file
View File

@ -0,0 +1,54 @@
FROM alpine:latest
MAINTAINER Mikal Villa <mikal@sigterm.no>
ENV GIT_BRANCH="master"
ENV I2PD_PREFIX="/opt/i2pd-${GIT_BRANCH}"
ENV PATH=${I2PD_PREFIX}/bin:$PATH
ENV GOSU_VERSION=1.7
ENV GOSU_SHASUM="34049cfc713e8b74b90d6de49690fa601dc040021980812b2f1f691534be8a50 /usr/local/bin/gosu"
RUN mkdir /user && adduser -S -h /user i2pd && chown -R i2pd:nobody /user
#
# Each RUN is a layer, adding the dependencies and building i2pd in one layer takes around 8-900Mb, so to keep the
# image under 20mb we need to remove all the build dependencies in the same "RUN" / layer.
#
# 1. install deps, clone and build.
# 2. strip binaries.
# 3. Purge all dependencies and other unrelated packages, including build directory.
RUN apk --no-cache --virtual build-dependendencies add make gcc g++ libtool boost-dev build-base openssl-dev openssl git \
&& mkdir -p /tmp/build \
&& cd /tmp/build && git clone -b ${GIT_BRANCH} https://github.com/PurpleI2P/i2pd.git \
&& cd i2pd \
&& make -j4 \
&& mkdir -p ${I2PD_PREFIX}/bin \
&& mv i2pd ${I2PD_PREFIX}/bin/ \
&& cd ${I2PD_PREFIX}/bin \
&& strip i2pd \
&& rm -fr /tmp/build && apk --purge del build-dependendencies build-base fortify-headers boost-dev zlib-dev openssl-dev \
boost-python3 python3 gdbm boost-unit_test_framework boost-python linux-headers boost-prg_exec_monitor \
boost-serialization boost-signals boost-wave boost-wserialization boost-math boost-graph boost-regex git pcre \
libtool g++ gcc pkgconfig
# 2. Adding required libraries to run i2pd to ensure it will run.
RUN apk --no-cache add boost-filesystem boost-system boost-program_options boost-date_time boost-thread boost-iostreams openssl musl-utils libstdc++
# Gosu is a replacement for su/sudo in docker and not a backdoor :) See https://github.com/tianon/gosu
RUN wget -O /usr/local/bin/gosu https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64 \
&& echo "${GOSU_SHASUM}" | sha256sum -c && chmod +x /usr/local/bin/gosu
COPY entrypoint.sh /entrypoint.sh
RUN chmod a+x /entrypoint.sh
RUN echo "export PATH=${PATH}" >> /etc/profile
VOLUME [ "/var/lib/i2pd" ]
EXPOSE 7070 4444 4447 7656 2827 7654 7650
ENTRYPOINT [ "/entrypoint.sh" ]

View File

@ -17,15 +17,44 @@ namespace i2p
void EventCore::QueueEvent(const EventType & ev)
{
if(m_listener)
m_listener->HandleEvent(ev);
if(m_listener) m_listener->HandleEvent(ev);
}
void EventCore::CollectEvent(const std::string & type, const std::string & ident, uint64_t val)
{
std::unique_lock<std::mutex> lock(m_collect_mutex);
std::string key = type + "." + ident;
if (m_collected.find(key) == m_collected.end())
{
m_collected[key] = {type, key, 0};
}
m_collected[key].Val += val;
}
void EventCore::PumpCollected(EventListener * listener)
{
std::unique_lock<std::mutex> lock(m_collect_mutex);
if(listener)
{
for(const auto & ev : m_collected) {
listener->HandlePumpEvent({{"type", ev.second.Key}, {"ident", ev.second.Ident}}, ev.second.Val);
}
}
m_collected.clear();
}
}
}
void EmitEvent(const EventType & e)
void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val)
{
#ifdef WITH_EVENTS
i2p::event::core.CollectEvent(type, ident, val);
#endif
}
void EmitEvent(const EventType & e)
{
#if WITH_EVENTS
i2p::event::core.QueueEvent(e);
#endif
}

18
Event.h
View File

@ -3,6 +3,8 @@
#include <map>
#include <string>
#include <memory>
#include <mutex>
#include <tuple>
#include <boost/asio.hpp>
@ -16,15 +18,27 @@ namespace i2p
public:
virtual ~EventListener() {};
virtual void HandleEvent(const EventType & ev) = 0;
/** @brief handle collected event when pumped */
virtual void HandlePumpEvent(const EventType & ev, const uint64_t & val) = 0;
};
class EventCore
{
public:
void QueueEvent(const EventType & ev);
void CollectEvent(const std::string & type, const std::string & ident, uint64_t val);
void SetListener(EventListener * l);
void PumpCollected(EventListener * l);
private:
std::mutex m_collect_mutex;
struct CollectedEvent
{
std::string Key;
std::string Ident;
uint64_t Val;
};
std::map<std::string, CollectedEvent> m_collected;
EventListener * m_listener = nullptr;
};
#ifdef WITH_EVENTS
@ -32,6 +46,8 @@ namespace i2p
#endif
}
}
void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val);
void EmitEvent(const EventType & ev);
#endif

View File

@ -259,16 +259,21 @@ namespace http {
return eoh + strlen(HTTP_EOH);
}
std::string HTTPReq::to_string() {
std::stringstream ss;
ss << method << " " << uri << " " << version << CRLF;
void HTTPReq::write(std::ostream & o) {
o << method << " " << uri << " " << version << CRLF;
for (auto & h : headers) {
ss << h.first << ": " << h.second << CRLF;
o << h.first << ": " << h.second << CRLF;
}
ss << CRLF;
return ss.str();
o << CRLF;
}
std::string HTTPReq::to_string()
{
std::stringstream ss;
write(ss);
return ss.str();
}
bool HTTPRes::is_chunked() {
auto it = headers.find("Transfer-Encoding");
if (it == headers.end())

5
HTTP.h
View File

@ -82,6 +82,9 @@ namespace http {
/** @brief Serialize HTTP request to string */
std::string to_string();
void write(std::ostream & o);
};
struct HTTPRes : HTTPMsg {
@ -116,6 +119,8 @@ namespace http {
*/
std::string to_string();
void write(std::ostream & o);
/** @brief Checks that response declared as chunked data */
bool is_chunked();

View File

@ -64,15 +64,40 @@ namespace proxy {
void HostNotFound(std::string & host);
void SendProxyError(std::string & content);
void ForwardToUpstreamProxy();
void HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec);
void HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec);
void HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transfered);
void HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transfered);
typedef std::function<void(boost::asio::ip::tcp::endpoint)> ProxyResolvedHandler;
void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr, ProxyResolvedHandler handler);
void SocksProxySuccess();
void HandoverToUpstreamProxy();
uint8_t m_recv_chunk[8192];
std::string m_recv_buf; // from client
std::string m_send_buf; // to upstream
std::shared_ptr<boost::asio::ip::tcp::socket> m_sock;
std::shared_ptr<boost::asio::ip::tcp::socket> m_proxysock;
boost::asio::ip::tcp::resolver m_proxy_resolver;
i2p::http::URL m_ProxyURL;
i2p::http::URL m_RequestURL;
uint8_t m_socks_buf[255+8]; // for socks request/response
ssize_t m_req_len;
i2p::http::URL m_ClientRequestURL;
i2p::http::HTTPReq m_ClientRequest;
i2p::http::HTTPRes m_ClientResponse;
std::stringstream m_ClientRequestBuffer;
public:
HTTPReqHandler(HTTPProxy * parent, std::shared_ptr<boost::asio::ip::tcp::socket> sock) :
I2PServiceHandler(parent), m_sock(sock) {}
I2PServiceHandler(parent), m_sock(sock),
m_proxysock(std::make_shared<boost::asio::ip::tcp::socket>(parent->GetService())),
m_proxy_resolver(parent->GetService()) {}
~HTTPReqHandler() { Terminate(); }
void Handle () { AsyncSockRead(); } /* overload */
};
@ -97,6 +122,13 @@ namespace proxy {
m_sock->close();
m_sock = nullptr;
}
if(m_proxysock)
{
LogPrint(eLogDebug, "HTTPProxy: close proxysock");
if(m_proxysock->is_open())
m_proxysock->close();
m_proxysock = nullptr;
}
Done(shared_from_this());
}
@ -142,7 +174,7 @@ namespace proxy {
<< "</html>\r\n";
res.body = ss.str();
std::string response = res.to_string();
boost::asio::async_write(*m_sock, boost::asio::buffer(response),
boost::asio::async_write(*m_sock, boost::asio::buffer(response), boost::asio::transfer_all(),
std::bind(&HTTPReqHandler::SentHTTPFailed, shared_from_this(), std::placeholders::_1));
}
@ -198,54 +230,51 @@ namespace proxy {
*/
bool HTTPReqHandler::HandleRequest()
{
i2p::http::HTTPReq req;
i2p::http::URL url;
std::string b64;
int req_len = 0;
req_len = req.parse(m_recv_buf);
m_req_len = m_ClientRequest.parse(m_recv_buf);
if (req_len == 0)
if (m_req_len == 0)
return false; /* need more data */
if (req_len < 0) {
if (m_req_len < 0) {
LogPrint(eLogError, "HTTPProxy: unable to parse request");
GenericProxyError("Invalid request", "Proxy unable to parse your request");
return true; /* parse error */
}
/* parsing success, now let's look inside request */
LogPrint(eLogDebug, "HTTPProxy: requested: ", req.uri);
url.parse(req.uri);
LogPrint(eLogDebug, "HTTPProxy: requested: ", m_ClientRequest.uri);
m_RequestURL.parse(m_ClientRequest.uri);
if (ExtractAddressHelper(url, b64)) {
i2p::client::context.GetAddressBook ().InsertAddress (url.host, b64);
LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", url.host);
std::string full_url = url.to_string();
if (ExtractAddressHelper(m_RequestURL, b64)) {
i2p::client::context.GetAddressBook ().InsertAddress (m_RequestURL.host, b64);
LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", m_RequestURL.host);
std::string full_url = m_RequestURL.to_string();
std::stringstream ss;
ss << "Host " << url.host << " added to router's addressbook from helper. "
ss << "Host " << m_RequestURL.host << " added to router's addressbook from helper. "
<< "Click <a href=\"" << full_url << "\">here</a> to proceed.";
GenericProxyInfo("Addresshelper found", ss.str().c_str());
return true; /* request processed */
}
SanitizeHTTPRequest(req);
SanitizeHTTPRequest(m_ClientRequest);
std::string dest_host = url.host;
uint16_t dest_port = url.port;
std::string dest_host = m_RequestURL.host;
uint16_t dest_port = m_RequestURL.port;
/* always set port, even if missing in request */
if (!dest_port) {
dest_port = (url.schema == "https") ? 443 : 80;
dest_port = (m_RequestURL.schema == "https") ? 443 : 80;
}
/* detect dest_host, set proper 'Host' header in upstream request */
auto h = req.headers.find("Host");
auto h = m_ClientRequest.headers.find("Host");
if (dest_host != "") {
/* absolute url, replace 'Host' header */
std::string h = dest_host;
if (dest_port != 0 && dest_port != 80)
h += ":" + std::to_string(dest_port);
req.add_header("Host", h, true);
} else if (h != req.headers.end()) {
m_ClientRequest.add_header("Host", h, true);
} else if (h != m_ClientRequest.headers.end()) {
/* relative url and 'Host' header provided. transparent proxy mode? */
i2p::http::URL u;
std::string t = "http://" + h->second;
@ -265,23 +294,31 @@ namespace proxy {
HostNotFound(dest_host);
return true; /* request processed */
}
/* TODO: outproxy handler here */
} else {
LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": not implemented yet");
std::string message = "Host" + dest_host + "not inside I2P network, but outproxy support not implemented yet";
GenericProxyError("Outproxy failure", message.c_str());
std::string outproxyUrl; i2p::config::GetOption("httpproxy.outproxy", outproxyUrl);
if(outproxyUrl.size()) {
LogPrint (eLogDebug, "HTTPProxy: use outproxy ", outproxyUrl);
if(m_ProxyURL.parse(outproxyUrl))
ForwardToUpstreamProxy();
else
GenericProxyError("Outproxy failure", "bad outproxy settings");
} else {
LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": no outprxy enabled");
std::string message = "Host" + dest_host + "not inside I2P network, but outproxy is not enabled";
GenericProxyError("Outproxy failure", message.c_str());
}
return true;
}
/* make relative url */
url.schema = "";
url.host = "";
req.uri = url.to_string();
m_RequestURL.schema = "";
m_RequestURL.host = "";
m_ClientRequest.uri = m_RequestURL.to_string();
/* drop original request from recv buffer */
m_recv_buf.erase(0, req_len);
m_recv_buf.erase(0, m_req_len);
/* build new buffer from modified request and data from original request */
m_send_buf = req.to_string();
m_send_buf = m_ClientRequest.to_string();
m_send_buf.append(m_recv_buf);
/* connect to destination */
LogPrint(eLogDebug, "HTTPProxy: connecting to host ", dest_host, ":", dest_port);
@ -290,6 +327,144 @@ namespace proxy {
return true;
}
void HTTPReqHandler::ForwardToUpstreamProxy()
{
LogPrint(eLogDebug, "HTTPProxy: forward to upstream");
// build http requset
m_ClientRequestURL = m_RequestURL;
LogPrint(eLogDebug, "HTTPProxy: ", m_ClientRequestURL.host);
m_ClientRequestURL.schema = "";
m_ClientRequestURL.host = "";
m_ClientRequest.uri = m_ClientRequestURL.to_string();
m_ClientRequest.write(m_ClientRequestBuffer);
m_ClientRequestBuffer << m_recv_buf.substr(m_req_len);
// assume http if empty schema
if (m_ProxyURL.schema == "" || m_ProxyURL.schema == "http") {
// handle upstream http proxy
if (!m_ProxyURL.port) m_ProxyURL.port = 80;
boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port));
m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) {
m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1));
}));
} else if (m_ProxyURL.schema == "socks") {
// handle upstream socks proxy
if (!m_ProxyURL.port) m_ProxyURL.port = 9050; // default to tor default if not specified
boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port));
m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) {
m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1));
}));
} else {
// unknown type, complain
GenericProxyError("unknown outproxy url", m_ProxyURL.to_string().c_str());
}
}
void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::iterator it, ProxyResolvedHandler handler)
{
if(ec) GenericProxyError("cannot resolve upstream proxy", ec.message().c_str());
else handler(*it);
}
void HTTPReqHandler::HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec)
{
if(!ec) {
if(m_RequestURL.host.size() > 255) {
GenericProxyError("hostname too long", m_RequestURL.host.c_str());
return;
}
uint16_t port = m_RequestURL.port;
if(!port) port = 80;
LogPrint(eLogDebug, "HTTPProxy: connected to socks upstream");
std::string host = m_RequestURL.host;
std::size_t reqsize = 0;
m_socks_buf[0] = '\x04';
m_socks_buf[1] = 1;
htobe16buf(m_socks_buf+2, port);
m_socks_buf[4] = 0;
m_socks_buf[5] = 0;
m_socks_buf[6] = 0;
m_socks_buf[7] = 1;
// user id
m_socks_buf[8] = 'i';
m_socks_buf[9] = '2';
m_socks_buf[10] = 'p';
m_socks_buf[11] = 'd';
m_socks_buf[12] = 0;
reqsize += 13;
memcpy(m_socks_buf+ reqsize, host.c_str(), host.size());
reqsize += host.size();
m_socks_buf[++reqsize] = 0;
boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_socks_buf, reqsize), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::HandleSocksProxySendHandshake, this, std::placeholders::_1, std::placeholders::_2));
} else GenericProxyError("cannot connect to upstream socks proxy", ec.message().c_str());
}
void HTTPReqHandler::HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transferred)
{
LogPrint(eLogDebug, "HTTPProxy: upstream socks handshake sent");
if(ec) GenericProxyError("Cannot negotiate with socks proxy", ec.message().c_str());
else m_proxysock->async_read_some(boost::asio::buffer(m_socks_buf, 8), std::bind(&HTTPReqHandler::HandleSocksProxyReply, this, std::placeholders::_1, std::placeholders::_2));
}
void HTTPReqHandler::HandoverToUpstreamProxy()
{
LogPrint(eLogDebug, "HTTPProxy: handover to socks proxy");
auto connection = std::make_shared<i2p::client::TCPIPPipe>(GetOwner(), m_proxysock, m_sock);
m_sock = nullptr;
m_proxysock = nullptr;
GetOwner()->AddHandler(connection);
connection->Start();
Terminate();
}
void HTTPReqHandler::SocksProxySuccess()
{
if(m_ClientRequest.method == "CONNECT") {
m_ClientResponse.code = 200;
m_send_buf = m_ClientResponse.to_string();
boost::asio::async_write(*m_sock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&] (const boost::system::error_code & ec, std::size_t transferred) {
if(ec) GenericProxyError("socks proxy error", ec.message().c_str());
else HandoverToUpstreamProxy();
});
} else {
m_send_buf = m_ClientRequestBuffer.str();
LogPrint(eLogDebug, "HTTPProxy: send ", m_send_buf.size(), " bytes");
boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&](const boost::system::error_code & ec, std::size_t transferred) {
if(ec) GenericProxyError("failed to send request to upstream", ec.message().c_str());
else HandoverToUpstreamProxy();
});
}
}
void HTTPReqHandler::HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transferred)
{
if(!ec)
{
if(m_socks_buf[1] == 90) {
// success
SocksProxySuccess();
} else {
std::stringstream ss;
ss << "error code: ";
ss << (int) m_socks_buf[1];
std::string msg = ss.str();
GenericProxyError("Socks Proxy error", msg.c_str());
}
}
else GenericProxyError("No Reply From socks proxy", ec.message().c_str());
}
void HTTPReqHandler::HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec)
{
if(!ec) {
LogPrint(eLogDebug, "HTTPProxy: connected to http upstream");
GenericProxyError("cannot connect", "http out proxy not implemented");
} else GenericProxyError("cannot connect to upstream http proxy", ec.message().c_str());
}
/* will be called after some data received from client */
void HTTPReqHandler::HandleSockRecv(const boost::system::error_code & ecode, std::size_t len)
{

View File

@ -2,6 +2,8 @@
#include <sstream>
#include <openssl/x509.h>
#include <openssl/pem.h>
#include <boost/lexical_cast.hpp>
#include <boost/date_time/local_time/local_time.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/property_tree/ini_parser.hpp>
@ -14,7 +16,6 @@
#include "Crypto.h"
#include "FS.h"
#include "Log.h"
#include "HTTP.h"
#include "Config.h"
#include "NetDb.h"
#include "RouterContext.h"
@ -189,66 +190,71 @@ namespace client
if (ecode) {
LogPrint (eLogError, "I2PControl: read error: ", ecode.message ());
return;
}
/* try to parse received data */
std::stringstream json;
std::string response;
bool isHTTP = false;
if (memcmp (buf->data (), "POST", 4) == 0) {
long int remains = 0;
isHTTP = true;
i2p::http::HTTPReq req;
std::size_t len = req.parse(buf->data(), bytes_transferred);
if (len <= 0) {
LogPrint(eLogError, "I2PControl: incomplete/malformed POST request");
return;
}
/* append to json chunk of data from 1st request */
json.write(buf->data() + len, bytes_transferred - len);
remains = req.content_length();
/* if request has Content-Length header, fetch rest of data and store to json buffer */
while (remains > 0) {
len = ((long int) buf->size() < remains) ? buf->size() : remains;
bytes_transferred = boost::asio::read (*socket, boost::asio::buffer (buf->data (), len));
json.write(buf->data(), bytes_transferred);
remains -= bytes_transferred;
}
} else {
json.write(buf->data(), bytes_transferred);
}
//LogPrint(eLogDebug, "I2PControl: json from request: ", json.str());
try
{
bool isHtml = !memcmp (buf->data (), "POST", 4);
std::stringstream ss;
ss.write (buf->data (), bytes_transferred);
if (isHtml)
{
std::string header;
size_t contentLength = 0;
while (!ss.eof () && header != "\r")
{
std::getline(ss, header);
auto colon = header.find (':');
if (colon != std::string::npos && header.substr (0, colon) == "Content-Length")
contentLength = std::stoi (header.substr (colon + 1));
}
if (ss.eof ())
{
LogPrint (eLogError, "I2PControl: malformed request, HTTP header expected");
return; // TODO:
}
std::streamoff rem = contentLength + ss.tellg () - bytes_transferred; // more bytes to read
if (rem > 0)
{
bytes_transferred = boost::asio::read (*socket, boost::asio::buffer (buf->data (), rem));
ss.write (buf->data (), bytes_transferred);
}
}
std::ostringstream response;
#if GCC47_BOOST149
LogPrint (eLogError, "I2PControl: json_read is not supported due bug in boost 1.49 with gcc 4.7");
BuildErrorResponse(response, 32603, "JSON requests is not supported with this version of boost");
LogPrint (eLogError, "I2PControl: json_read is not supported due bug in boost 1.49 with gcc 4.7");
response << "{\"id\":null,\"error\":";
response << "{\"code\":-32603,\"message\":\"JSON requests is not supported with this version of boost\"},";
response << "\"jsonrpc\":\"2.0\"}";
#else
/* now try to parse json itself */
std::string j_str = json.str();
std::stringstream _json(j_str);
try {
boost::property_tree::ptree pt;
boost::property_tree::read_json (_json, pt);
boost::property_tree::ptree pt;
boost::property_tree::read_json (ss, pt);
std::string id = pt.get<std::string>("id");
std::string method = pt.get<std::string>("method");
auto it = m_MethodHandlers.find (method);
if (it != m_MethodHandlers.end ()) {
std::ostringstream ss;
ss << "{\"id\":" << id << ",\"result\":{";
(this->*(it->second))(pt.get_child ("params"), ss);
ss << "},\"jsonrpc\":\"2.0\"}";
response = ss.str();
} else {
LogPrint (eLogWarning, "I2PControl: unknown method ", method);
BuildErrorResponse(response, 32601, "Method not found");
}
} catch (std::exception& ex) {
LogPrint (eLogError, "I2PControl: exception when handle request: ", ex.what ());
BuildErrorResponse(response, 32603, ex.what());
} catch (...) {
LogPrint (eLogError, "I2PControl: handle request unknown exception");
}
std::string id = pt.get<std::string>("id");
std::string method = pt.get<std::string>("method");
auto it = m_MethodHandlers.find (method);
if (it != m_MethodHandlers.end ())
{
response << "{\"id\":" << id << ",\"result\":{";
(this->*(it->second))(pt.get_child ("params"), response);
response << "},\"jsonrpc\":\"2.0\"}";
} else {
LogPrint (eLogWarning, "I2PControl: unknown method ", method);
response << "{\"id\":null,\"error\":";
response << "{\"code\":-32601,\"message\":\"Method not found\"},";
response << "\"jsonrpc\":\"2.0\"}";
}
#endif
SendResponse (socket, buf, response, isHTTP);
SendResponse (socket, buf, response, isHtml);
}
catch (std::exception& ex)
{
LogPrint (eLogError, "I2PControl: exception when handle request: ", ex.what ());
}
catch (...)
{
LogPrint (eLogError, "I2PControl: handle request unknown exception");
}
}
}
void I2PControlService::InsertParam (std::ostringstream& ss, const std::string& name, int value) const
@ -270,28 +276,27 @@ namespace client
ss << "\"" << name << "\":" << std::fixed << std::setprecision(2) << value;
}
void I2PControlService::BuildErrorResponse (std::string & content, int code, const char *message) {
std::stringstream ss;
ss << "{\"id\":null,\"error\":";
ss << "{\"code\":" << -code << ",\"message\":\"" << message << "\"},";
ss << "\"jsonrpc\":\"2.0\"}";
content = ss.str();
}
void I2PControlService::SendResponse (std::shared_ptr<ssl_socket> socket,
std::shared_ptr<I2PControlBuffer> buf, std::string& content, bool isHTTP)
std::shared_ptr<I2PControlBuffer> buf, std::ostringstream& response, bool isHtml)
{
if (isHTTP) {
i2p::http::HTTPRes res;
res.code = 200;
res.add_header("Content-Type", "application/json");
res.add_header("Connection", "close");
res.body = content;
std::string tmp = res.to_string();
content = tmp;
size_t len = response.str ().length (), offset = 0;
if (isHtml)
{
std::ostringstream header;
header << "HTTP/1.1 200 OK\r\n";
header << "Connection: close\r\n";
header << "Content-Length: " << boost::lexical_cast<std::string>(len) << "\r\n";
header << "Content-Type: application/json\r\n";
header << "Date: ";
auto facet = new boost::local_time::local_time_facet ("%a, %d %b %Y %H:%M:%S GMT");
header.imbue(std::locale (header.getloc(), facet));
header << boost::posix_time::second_clock::local_time() << "\r\n";
header << "\r\n";
offset = header.str ().size ();
memcpy (buf->data (), header.str ().c_str (), offset);
}
std::copy(content.begin(), content.end(), buf->begin());
boost::asio::async_write (*socket, boost::asio::buffer (buf->data (), content.length()),
memcpy (buf->data () + offset, response.str ().c_str (), len);
boost::asio::async_write (*socket, boost::asio::buffer (buf->data (), offset + len),
boost::asio::transfer_all (),
std::bind(&I2PControlService::HandleResponseSent, this,
std::placeholders::_1, std::placeholders::_2, socket, buf));
@ -318,7 +323,7 @@ namespace client
}
InsertParam (results, "API", api);
results << ",";
std::string token = std::to_string(i2p::util::GetSecondsSinceEpoch ());
std::string token = boost::lexical_cast<std::string>(i2p::util::GetSecondsSinceEpoch ());
m_Tokens.insert (token);
InsertParam (results, "Token", token);
}
@ -344,10 +349,9 @@ namespace client
(this->*(it1->second))(it.second.data ());
InsertParam (results, it.first, "");
}
else {
else
LogPrint (eLogError, "I2PControl: I2PControl unknown request: ", it.first);
}
}
}
}
void I2PControlService::PasswordHandler (const std::string& value)
@ -361,20 +365,17 @@ namespace client
void I2PControlService::RouterInfoHandler (const boost::property_tree::ptree& params, std::ostringstream& results)
{
for (auto it = params.begin (); it != params.end (); ++it)
for (auto it = params.begin (); it != params.end (); it++)
{
LogPrint (eLogDebug, "I2PControl: RouterInfo request: ", it->first);
if (it != params.begin ()) results << ",";
auto it1 = m_RouterInfoHandlers.find (it->first);
if (it1 != m_RouterInfoHandlers.end ())
{
if (it != params.begin ()) results << ",";
(this->*(it1->second))(results);
}
else
{
InsertParam(results, it->first, "");
LogPrint (eLogError, "I2PControl: RouterInfo unknown request ", it->first);
}
}
}
@ -440,20 +441,15 @@ namespace client
void I2PControlService::RouterManagerHandler (const boost::property_tree::ptree& params, std::ostringstream& results)
{
for (auto it = params.begin (); it != params.end (); ++it)
for (auto it = params.begin (); it != params.end (); it++)
{
if (it != params.begin ()) results << ",";
if (it != params.begin ()) results << ",";
LogPrint (eLogDebug, "I2PControl: RouterManager request: ", it->first);
auto it1 = m_RouterManagerHandlers.find (it->first);
if (it1 != m_RouterManagerHandlers.end ())
{
(this->*(it1->second))(results);
}
else
{
InsertParam(results, it->first, "");
if (it1 != m_RouterManagerHandlers.end ()) {
(this->*(it1->second))(results);
} else
LogPrint (eLogError, "I2PControl: RouterManager unknown request: ", it->first);
}
}
}
@ -494,20 +490,15 @@ namespace client
// network setting
void I2PControlService::NetworkSettingHandler (const boost::property_tree::ptree& params, std::ostringstream& results)
{
for (auto it = params.begin (); it != params.end (); ++it)
for (auto it = params.begin (); it != params.end (); it++)
{
if (it != params.begin ()) results << ",";
LogPrint (eLogDebug, "I2PControl: NetworkSetting request: ", it->first);
auto it1 = m_NetworkSettingHandlers.find (it->first);
if (it1 != m_NetworkSettingHandlers.end ())
{
(this->*(it1->second))(it->second.data (), results);
}
else
{
InsertParam(results, it->first, "");
if (it1 != m_NetworkSettingHandlers.end ()) {
(this->*(it1->second))(it->second.data (), results);
} else
LogPrint (eLogError, "I2PControl: NetworkSetting unknown request: ", it->first);
}
}
}
@ -545,7 +536,7 @@ namespace client
X509_gmtime_adj (X509_get_notAfter (x509), I2P_CONTROL_CERTIFICATE_VALIDITY*24*60*60); // expiration
X509_set_pubkey (x509, pkey); // public key
X509_NAME * name = X509_get_subject_name (x509);
X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *)"RU", -1, -1, 0); // country (Russia by default)
X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *)"A1", -1, -1, 0); // country (Anonymous proxy)
X509_NAME_add_entry_by_txt (name, "O", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_ORGANIZATION, -1, -1, 0); // organization
X509_NAME_add_entry_by_txt (name, "CN", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_COMMON_NAME, -1, -1, 0); // common name
X509_set_issuer_name (x509, name); // set issuer to ourselves

View File

@ -45,9 +45,8 @@ namespace client
void ReadRequest (std::shared_ptr<ssl_socket> socket);
void HandleRequestReceived (const boost::system::error_code& ecode, size_t bytes_transferred,
std::shared_ptr<ssl_socket> socket, std::shared_ptr<I2PControlBuffer> buf);
void BuildErrorResponse (std::string & content, int code, const char *message);
void SendResponse (std::shared_ptr<ssl_socket> socket,
std::shared_ptr<I2PControlBuffer> buf, std::string& response, bool isHtml);
std::shared_ptr<I2PControlBuffer> buf, std::ostringstream& response, bool isHtml);
void HandleResponseSent (const boost::system::error_code& ecode, std::size_t bytes_transferred,
std::shared_ptr<ssl_socket> socket, std::shared_ptr<I2PControlBuffer> buf);
@ -120,4 +119,3 @@ namespace client
}
#endif

View File

@ -53,7 +53,6 @@ namespace client
void TCPIPPipe::Terminate()
{
if(Kill()) return;
Done(shared_from_this());
if (m_up) {
if (m_up->is_open()) {
m_up->close();
@ -66,6 +65,7 @@ namespace client
}
m_down = nullptr;
}
Done(shared_from_this());
}
void TCPIPPipe::AsyncReceiveUpstream()
@ -90,11 +90,11 @@ namespace client
}
}
void TCPIPPipe::UpstreamWrite(const uint8_t * buf, size_t len)
void TCPIPPipe::UpstreamWrite(size_t len)
{
if (m_up) {
LogPrint(eLogDebug, "TCPIPPipe: upstream: ", (int) len, " bytes written");
boost::asio::async_write(*m_up, boost::asio::buffer(buf, len),
boost::asio::async_write(*m_up, boost::asio::buffer(m_upstream_buf, len),
boost::asio::transfer_all(),
std::bind(&TCPIPPipe::HandleUpstreamWrite,
shared_from_this(),
@ -105,11 +105,11 @@ namespace client
}
}
void TCPIPPipe::DownstreamWrite(const uint8_t * buf, size_t len)
void TCPIPPipe::DownstreamWrite(size_t len)
{
if (m_down) {
LogPrint(eLogDebug, "TCPIPPipe: downstream: ", (int) len, " bytes written");
boost::asio::async_write(*m_down, boost::asio::buffer(buf, len),
boost::asio::async_write(*m_down, boost::asio::buffer(m_downstream_buf, len),
boost::asio::transfer_all(),
std::bind(&TCPIPPipe::HandleDownstreamWrite,
shared_from_this(),
@ -131,9 +131,8 @@ namespace client
} else {
if (bytes_transfered > 0 ) {
memcpy(m_upstream_buf, m_downstream_to_up_buf, bytes_transfered);
UpstreamWrite(m_upstream_buf, bytes_transfered);
}
AsyncReceiveDownstream();
UpstreamWrite(bytes_transfered);
}
}
@ -142,6 +141,8 @@ namespace client
LogPrint(eLogError, "TCPIPPipe: downstream write error:" , ecode.message());
if (ecode != boost::asio::error::operation_aborted)
Terminate();
} else {
AsyncReceiveUpstream();
}
}
@ -150,6 +151,8 @@ namespace client
LogPrint(eLogError, "TCPIPPipe: upstream write error:" , ecode.message());
if (ecode != boost::asio::error::operation_aborted)
Terminate();
} else {
AsyncReceiveDownstream();
}
}
@ -162,10 +165,9 @@ namespace client
Terminate();
} else {
if (bytes_transfered > 0 ) {
memcpy(m_upstream_buf, m_upstream_to_down_buf, bytes_transfered);
DownstreamWrite(m_upstream_buf, bytes_transfered);
memcpy(m_downstream_buf, m_upstream_to_down_buf, bytes_transfered);
}
AsyncReceiveUpstream();
DownstreamWrite(bytes_transfered);
}
}

View File

@ -77,7 +77,7 @@ namespace client
std::atomic<bool> m_Dead; //To avoid cleaning up multiple times
};
const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192;
const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192 * 8;
// bidirectional pipe for 2 tcp/ip sockets
class TCPIPPipe: public I2PServiceHandler, public std::enable_shared_from_this<TCPIPPipe> {
@ -93,8 +93,8 @@ namespace client
void HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transferred);
void HandleUpstreamWrite(const boost::system::error_code & ecode);
void HandleDownstreamWrite(const boost::system::error_code & ecode);
void UpstreamWrite(const uint8_t * buf, size_t len);
void DownstreamWrite(const uint8_t * buf, size_t len);
void UpstreamWrite(size_t len);
void DownstreamWrite(size_t len);
private:
uint8_t m_upstream_to_down_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[TCP_IP_PIPE_BUFFER_SIZE];
uint8_t m_upstream_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_buf[TCP_IP_PIPE_BUFFER_SIZE];
@ -121,10 +121,11 @@ namespace client
void Stop ();
const boost::asio::ip::tcp::acceptor& GetAcceptor () const { return m_Acceptor; };
virtual const char* GetName() { return "Generic TCP/IP accepting daemon"; }
protected:
virtual std::shared_ptr<I2PServiceHandler> CreateHandler(std::shared_ptr<boost::asio::ip::tcp::socket> socket) = 0;
virtual const char* GetName() { return "Generic TCP/IP accepting daemon"; }
private:
void Accept();
void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<boost::asio::ip::tcp::socket> socket);

View File

@ -58,25 +58,39 @@ namespace client
StreamReceive ();
Receive ();
}
void I2PTunnelConnection::Connect ()
static boost::asio::ip::address GetLoopbackAddressFor(const i2p::data::IdentHash & addr)
{
boost::asio::ip::address_v4::bytes_type bytes;
const uint8_t * ident = addr;
bytes[0] = 127;
memcpy (bytes.data ()+1, ident, 3);
boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes);
return ourIP;
}
static void MapToLoopback(const std::shared_ptr<boost::asio::ip::tcp::socket> & sock, const i2p::data::IdentHash & addr)
{
// bind to 127.x.x.x address
// where x.x.x are first three bytes from ident
auto ourIP = GetLoopbackAddressFor(addr);
sock->bind (boost::asio::ip::tcp::endpoint (ourIP, 0));
}
void I2PTunnelConnection::Connect (bool isUniqueLocal)
{
I2PTunnelSetSocketOptions(m_Socket);
if (m_Socket) {
#ifdef __linux__
// bind to 127.x.x.x address
// where x.x.x are first three bytes from ident
if (m_RemoteEndpoint.address ().is_v4 () &&
if (m_Socket)
{
#ifdef __linux__
if (isUniqueLocal && m_RemoteEndpoint.address ().is_v4 () &&
m_RemoteEndpoint.address ().to_v4 ().to_bytes ()[0] == 127)
{
m_Socket->open (boost::asio::ip::tcp::v4 ());
boost::asio::ip::address_v4::bytes_type bytes;
const uint8_t * ident = m_Stream->GetRemoteIdentity ()->GetIdentHash ();
bytes[0] = 127;
memcpy (bytes.data ()+1, ident, 3);
boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes);
m_Socket->bind (boost::asio::ip::tcp::endpoint (ourIP, 0));
auto ident = m_Stream->GetRemoteIdentity()->GetIdentHash();
MapToLoopback(m_Socket, ident);
}
#endif
m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect,
@ -213,7 +227,9 @@ namespace client
// send destination first like received from I2P
std::string dest = m_Stream->GetRemoteIdentity ()->ToBase64 ();
dest += "\n";
memcpy (m_StreamBuffer, dest.c_str (), dest.size ());
if(sizeof(m_StreamBuffer) >= dest.size()) {
memcpy (m_StreamBuffer, dest.c_str (), dest.size ());
}
HandleStreamReceive (boost::system::error_code (), dest.size ());
}
Receive ();
@ -416,7 +432,7 @@ namespace client
I2PServerTunnel::I2PServerTunnel (const std::string& name, const std::string& address,
int port, std::shared_ptr<ClientDestination> localDestination, int inport, bool gzip):
I2PService (localDestination), m_Name (name), m_Address (address), m_Port (port), m_IsAccessList (false)
I2PService (localDestination), m_IsUniqueLocal(true), m_Name (name), m_Address (address), m_Port (port), m_IsAccessList (false)
{
m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip);
}
@ -493,15 +509,17 @@ namespace client
return;
}
}
CreateI2PConnection (stream);
// new connection
auto conn = CreateI2PConnection (stream);
AddHandler (conn);
conn->Connect (m_IsUniqueLocal);
}
}
void I2PServerTunnel::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
std::shared_ptr<I2PTunnelConnection> I2PServerTunnel::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
{
auto conn = std::make_shared<I2PTunnelConnection> (this, stream, std::make_shared<boost::asio::ip::tcp::socket> (GetService ()), GetEndpoint ());
AddHandler (conn);
conn->Connect ();
return std::make_shared<I2PTunnelConnection> (this, stream, std::make_shared<boost::asio::ip::tcp::socket> (GetService ()), GetEndpoint ());
}
I2PServerTunnelHTTP::I2PServerTunnelHTTP (const std::string& name, const std::string& address,
@ -512,12 +530,10 @@ namespace client
{
}
void I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
std::shared_ptr<I2PTunnelConnection> I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
{
auto conn = std::make_shared<I2PTunnelConnectionHTTP> (this, stream,
return std::make_shared<I2PTunnelConnectionHTTP> (this, stream,
std::make_shared<boost::asio::ip::tcp::socket> (GetService ()), GetEndpoint (), m_Host);
AddHandler (conn);
conn->Connect ();
}
I2PServerTunnelIRC::I2PServerTunnelIRC (const std::string& name, const std::string& address,
@ -528,11 +544,9 @@ namespace client
{
}
void I2PServerTunnelIRC::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
std::shared_ptr<I2PTunnelConnection> I2PServerTunnelIRC::CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream)
{
auto conn = std::make_shared<I2PTunnelConnectionIRC> (this, stream, std::make_shared<boost::asio::ip::tcp::socket> (GetService ()), GetEndpoint (), this->m_WebircPass);
AddHandler (conn);
conn->Connect ();
return std::make_shared<I2PTunnelConnectionIRC> (this, stream, std::make_shared<boost::asio::ip::tcp::socket> (GetService ()), GetEndpoint (), this->m_WebircPass);
}
void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
@ -541,7 +555,6 @@ namespace client
auto session = ObtainUDPSession(from, toPort, fromPort);
session->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint);
session->LastActivity = i2p::util::GetMillisecondsSinceEpoch();
}
void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) {
@ -555,11 +568,24 @@ namespace client
++itr;
}
}
UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort)
void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) {
std::lock_guard<std::mutex> lock(m_SessionsMutex);
uint64_t now = i2p::util::GetMillisecondsSinceEpoch();
std::vector<uint16_t> removePorts;
for (const auto & s : m_Sessions) {
if (now - s.second.second >= delta)
removePorts.push_back(s.first);
}
for(auto port : removePorts) {
m_Sessions.erase(port);
}
}
UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort)
{
auto ih = from.GetIdentHash();
for ( UDPSession * s : m_Sessions )
for (auto & s : m_Sessions )
{
if ( s->Identity == ih)
{
@ -568,10 +594,19 @@ namespace client
return s;
}
}
/** create new udp session */
boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0);
m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort));
return m_Sessions.back();
boost::asio::ip::address addr;
/** create new udp session */
if(m_IsUniqueLocal && m_LocalAddress.is_loopback())
{
auto ident = from.GetIdentHash();
addr = GetLoopbackAddressFor(ident);
}
else
addr = m_LocalAddress;
boost::asio::ip::udp::endpoint ep(addr, 0);
m_Sessions.push_back(std::make_shared<UDPSession>(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort));
auto & back = m_Sessions.back();
return back;
}
UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint,
@ -579,7 +614,6 @@ namespace client
boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to,
uint16_t ourPort, uint16_t theirPort) :
m_Destination(localDestination->GetDatagramDestination()),
m_Service(localDestination->GetService()),
IPSocket(localDestination->GetService(), localEndpoint),
SendEndpoint(endpoint),
LastActivity(i2p::util::GetMillisecondsSinceEpoch()),
@ -603,7 +637,7 @@ namespace client
{
LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint);
LastActivity = i2p::util::GetMillisecondsSinceEpoch();
m_Destination->SendDatagramTo(m_Buffer, len, Identity, 0, 0);
m_Destination->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort);
Receive();
} else {
LogPrint(eLogError, "UDPSession: ", ecode.message());
@ -613,9 +647,9 @@ namespace client
I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr<i2p::client::ClientDestination> localDestination,
const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) :
boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) :
m_IsUniqueLocal(true),
m_Name(name),
LocalPort(port),
m_LocalAddress(localAddress),
m_RemoteEndpoint(forwardTo)
{
@ -641,7 +675,8 @@ namespace client
{
std::vector<std::shared_ptr<DatagramSessionInfo> > sessions;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
for ( UDPSession * s : m_Sessions )
for ( UDPSessionPtr s : m_Sessions )
{
if (!s->m_Destination) continue;
auto info = s->m_Destination->GetInfoForRemote(s->Identity);
@ -663,13 +698,12 @@ namespace client
std::shared_ptr<i2p::client::ClientDestination> localDestination,
uint16_t remotePort) :
m_Name(name),
m_Session(nullptr),
m_RemoteDest(remoteDest),
m_LocalDest(localDestination),
m_LocalEndpoint(localEndpoint),
m_RemoteIdent(nullptr),
m_ResolveThread(nullptr),
LocalPort(localEndpoint.port()),
m_LocalSocket(localDestination->GetService(), localEndpoint),
RemotePort(remotePort),
m_cancel_resolve(false)
{
@ -677,7 +711,7 @@ namespace client
dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5));
}
@ -686,38 +720,52 @@ namespace client
m_LocalDest->Start();
if (m_ResolveThread == nullptr)
m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this));
RecvFromLocal();
}
void I2PUDPClientTunnel::RecvFromLocal()
{
m_LocalSocket.async_receive_from(boost::asio::buffer(m_RecvBuff, I2P_UDP_MAX_MTU),
m_RecvEndpoint, std::bind(&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2));
}
void I2PUDPClientTunnel::HandleRecvFromLocal(const boost::system::error_code & ec, std::size_t transferred)
{
if(ec) {
LogPrint(eLogError, "UDP Client: ", ec.message());
return;
}
if(!m_RemoteIdent) {
LogPrint(eLogWarning, "UDP Client: remote endpoint not resolved yet");
RecvFromLocal();
return; // drop, remote not resolved
}
auto remotePort = m_RecvEndpoint.port();
auto itr = m_Sessions.find(remotePort);
if (itr == m_Sessions.end()) {
// track new udp convo
m_Sessions[remotePort] = {boost::asio::ip::udp::endpoint(m_RecvEndpoint), 0};
}
// send off to remote i2p destination
LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort);
m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort);
// mark convo as active
m_Sessions[remotePort].second = i2p::util::GetMillisecondsSinceEpoch();
RecvFromLocal();
}
std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPClientTunnel::GetSessions()
{
// TODO: implement
std::vector<std::shared_ptr<DatagramSessionInfo> > infos;
if(m_Session && m_LocalDest)
{
auto s = m_Session;
if (s->m_Destination)
{
auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity);
if(info)
{
auto sinfo = std::make_shared<DatagramSessionInfo>();
sinfo->Name = m_Name;
sinfo->LocalIdent = std::make_shared<i2p::data::IdentHash>(m_LocalDest->GetIdentHash().data());
sinfo->RemoteIdent = std::make_shared<i2p::data::IdentHash>(s->Identity.data());
sinfo->CurrentIBGW = info->IBGW;
sinfo->CurrentOBEP = info->OBEP;
infos.push_back(sinfo);
}
}
}
return infos;
}
void I2PUDPClientTunnel::TryResolving() {
LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest);
m_RemoteIdent = new i2p::data::IdentHash;
m_RemoteIdent->Fill(0);
i2p::data::IdentHash * h = new i2p::data::IdentHash;
while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve)
while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *h) && !m_cancel_resolve)
{
LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest);
std::this_thread::sleep_for(std::chrono::seconds(1));
@ -727,27 +775,28 @@ namespace client
LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled");
return;
}
m_RemoteIdent = h;
LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32());
// delete existing session
if(m_Session) delete m_Session;
boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0);
m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, m_RemoteIdent, LocalPort, RemotePort);
}
void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent)
{
// address match
if(m_Session)
auto itr = m_Sessions.find(toPort);
// found convo ?
if(itr != m_Sessions.end())
{
// tell session
LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32());
m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint);
// found convo
if (len > 0) {
LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32());
m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second.first);
// mark convo as active
itr->second.second = i2p::util::GetMillisecondsSinceEpoch();
}
}
else
LogPrint(eLogWarning, "UDP Client: no session");
LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort);
}
else
LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32());
@ -758,7 +807,11 @@ namespace client
auto dgram = m_LocalDest->GetDatagramDestination();
if (dgram) dgram->ResetReceiver();
if (m_Session) delete m_Session;
m_Sessions.clear();
if(m_LocalSocket.is_open())
m_LocalSocket.close();
m_cancel_resolve = true;
if(m_ResolveThread)

View File

@ -4,6 +4,7 @@
#include <inttypes.h>
#include <string>
#include <set>
#include <tuple>
#include <memory>
#include <sstream>
#include <boost/asio.hpp>
@ -17,7 +18,7 @@ namespace i2p
{
namespace client
{
const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 8192;
const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 65536;
const int I2P_TUNNEL_CONNECTION_MAX_IDLE = 3600; // in seconds
const int I2P_TUNNEL_DESTINATION_REQUEST_TIMEOUT = 10; // in seconds
// for HTTP tunnels
@ -37,7 +38,7 @@ namespace client
const boost::asio::ip::tcp::endpoint& target, bool quiet = true); // from I2P
~I2PTunnelConnection ();
void I2PConnect (const uint8_t * msg = nullptr, size_t len = 0);
void Connect ();
void Connect (bool isUniqueLocal = true);
protected:
@ -141,7 +142,6 @@ namespace client
struct UDPSession
{
i2p::datagram::DatagramDestination * m_Destination;
boost::asio::io_service & m_Service;
boost::asio::ip::udp::socket IPSocket;
i2p::data::IdentHash Identity;
boost::asio::ip::udp::endpoint FromEndpoint;
@ -182,6 +182,8 @@ namespace client
/** how long has this converstation been idle in ms */
uint64_t idle;
};
typedef std::shared_ptr<UDPSession> UDPSessionPtr;
/** server side udp tunnel, many i2p inbound to 1 ip outbound */
class I2PUDPServerTunnel
@ -189,7 +191,7 @@ namespace client
public:
I2PUDPServerTunnel(const std::string & name,
std::shared_ptr<i2p::client::ClientDestination> localDestination,
const boost::asio::ip::address & localAddress,
boost::asio::ip::address localAddress,
boost::asio::ip::udp::endpoint forwardTo, uint16_t port);
~I2PUDPServerTunnel();
/** expire stale udp conversations */
@ -199,18 +201,20 @@ namespace client
std::vector<std::shared_ptr<DatagramSessionInfo> > GetSessions();
std::shared_ptr<ClientDestination> GetLocalDestination () const { return m_LocalDest; }
void SetUniqueLocal(bool isUniqueLocal = true) { m_IsUniqueLocal = isUniqueLocal; }
private:
void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort);
UDPSessionPtr ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort);
private:
bool m_IsUniqueLocal;
const std::string m_Name;
const uint16_t LocalPort;
boost::asio::ip::address m_LocalAddress;
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
std::mutex m_SessionsMutex;
std::vector<UDPSession*> m_Sessions;
std::vector<UDPSessionPtr> m_Sessions;
std::shared_ptr<i2p::client::ClientDestination> m_LocalDest;
};
@ -228,18 +232,25 @@ namespace client
bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); }
std::shared_ptr<ClientDestination> GetLocalDestination () const { return m_LocalDest; }
void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT);
private:
typedef std::pair<boost::asio::ip::udp::endpoint, uint64_t> UDPConvo;
void RecvFromLocal();
void HandleRecvFromLocal(const boost::system::error_code & e, std::size_t transferred);
void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
void TryResolving();
const std::string m_Name;
UDPSession * m_Session;
std::mutex m_SessionsMutex;
std::map<uint16_t, UDPConvo > m_Sessions; // maps i2p port -> local udp convo
const std::string m_RemoteDest;
std::shared_ptr<i2p::client::ClientDestination> m_LocalDest;
const boost::asio::ip::udp::endpoint m_LocalEndpoint;
i2p::data::IdentHash * m_RemoteIdent;
std::thread * m_ResolveThread;
uint16_t LocalPort;
boost::asio::ip::udp::socket m_LocalSocket;
boost::asio::ip::udp::endpoint m_RecvEndpoint;
uint8_t m_RecvBuff[I2P_UDP_MAX_MTU];
uint16_t RemotePort;
bool m_cancel_resolve;
};
@ -256,6 +267,9 @@ namespace client
void SetAccessList (const std::set<i2p::data::IdentHash>& accessList);
void SetUniqueLocal (bool isUniqueLocal) { m_IsUniqueLocal = isUniqueLocal; }
bool IsUniqueLocal () const { return m_IsUniqueLocal; }
const std::string& GetAddress() const { return m_Address; }
int GetPort () const { return m_Port; };
uint16_t GetLocalPort () const { return m_PortDestination->GetLocalPort (); };
@ -272,10 +286,11 @@ namespace client
void Accept ();
void HandleAccept (std::shared_ptr<i2p::stream::Stream> stream);
virtual void CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
virtual std::shared_ptr<I2PTunnelConnection> CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
private:
bool m_IsUniqueLocal;
std::string m_Name, m_Address;
int m_Port;
boost::asio::ip::tcp::endpoint m_Endpoint;
@ -294,7 +309,7 @@ namespace client
private:
void CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
std::shared_ptr<I2PTunnelConnection> CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
private:
@ -311,7 +326,7 @@ namespace client
private:
void CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
std::shared_ptr<I2PTunnelConnection> CreateI2PConnection (std::shared_ptr<i2p::stream::Stream> stream);
private:

View File

@ -509,7 +509,7 @@ namespace data
m_Signer.reset (new i2p::crypto::RSASHA5124096Signer (m_SigningPrivateKey));
break;
case SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519:
m_Signer.reset (new i2p::crypto::EDDSA25519Signer (m_SigningPrivateKey));
m_Signer.reset (new i2p::crypto::EDDSA25519Signer (m_SigningPrivateKey, m_Public->GetStandardIdentity ().certificate - i2p::crypto::EDDSA25519_PUBLIC_KEY_LENGTH));
break;
default:
LogPrint (eLogError, "Identity: Signing key type ", (int)m_Public->GetSigningKeyType (), " is not supported");

View File

@ -257,7 +257,7 @@ namespace transport
void NTCPSession::SendPhase3 ()
{
auto keys = i2p::context.GetPrivateKeys ();
auto& keys = i2p::context.GetPrivateKeys ();
uint8_t * buf = m_ReceiveBuffer;
htobe16buf (buf, keys.GetPublic ()->GetFullLen ());
buf += 2;
@ -403,7 +403,7 @@ namespace transport
s.Insert (m_RemoteIdentity->GetIdentHash (), 32); // ident
s.Insert (tsA); // tsA
s.Insert (tsB); // tsB
auto keys = i2p::context.GetPrivateKeys ();
auto& keys = i2p::context.GetPrivateKeys ();
auto signatureLen = keys.GetPublic ()->GetSignatureLen ();
s.Sign (keys, m_ReceiveBuffer);
size_t paddingSize = signatureLen & 0x0F; // %16
@ -621,7 +621,7 @@ namespace transport
if (!m_NextMessage->IsExpired ())
{
#ifdef WITH_EVENTS
EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}});
QueueIntEvent("transport.recvmsg", GetIdentHashBase64(), 1);
#endif
m_Handler.PutNextMessage (m_NextMessage);
}

View File

@ -25,7 +25,8 @@ namespace util
m_NonEmpty.notify_one ();
}
void Put (const std::vector<Element>& vec)
template<template<typename, typename...>class Container, typename... R>
void Put (const Container<Element, R...>& vec)
{
if (!vec.empty ())
{

20
SAM.cpp
View File

@ -108,10 +108,10 @@ namespace client
separator++;
std::map<std::string, std::string> params;
ExtractParams (separator, params);
auto it = params.find (SAM_PARAM_MAX);
//auto it = params.find (SAM_PARAM_MAX);
// TODO: check MIN as well
if (it != params.end ())
version = it->second;
//if (it != params.end ())
// version = it->second;
}
if (version[0] == '3') // we support v3 (3.0 and 3.1) only
{
@ -464,21 +464,21 @@ namespace client
std::string& name = params[SAM_PARAM_NAME];
std::shared_ptr<const i2p::data::IdentityEx> identity;
i2p::data::IdentHash ident;
auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination;
if (name == "ME")
SendNamingLookupReply (m_Session->localDestination->GetIdentity ());
SendNamingLookupReply (dest->GetIdentity ());
else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr)
SendNamingLookupReply (identity);
else if (m_Session && m_Session->localDestination &&
context.GetAddressBook ().GetIdentHash (name, ident))
else if (context.GetAddressBook ().GetIdentHash (name, ident))
{
auto leaseSet = m_Session->localDestination->FindLeaseSet (ident);
auto leaseSet = dest->FindLeaseSet (ident);
if (leaseSet)
SendNamingLookupReply (leaseSet->GetIdentity ());
else
m_Session->localDestination->RequestDestination (ident,
dest->RequestDestination (ident,
std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete,
shared_from_this (), std::placeholders::_1, ident));
}
shared_from_this (), std::placeholders::_1, ident));
}
else
{
LogPrint (eLogError, "SAM: naming failed, unknown address ", name);

View File

@ -154,8 +154,7 @@ namespace transport
{
uint32_t msgID = bufbe32toh (buf); // message ID
buf += 4;
uint8_t frag[4];
frag[0] = 0;
uint8_t frag[4] = {0};
memcpy (frag + 1, buf, 3);
buf += 3;
uint32_t fragmentInfo = bufbe32toh (frag); // fragment info
@ -240,7 +239,7 @@ namespace transport
if (!msg->IsExpired ())
{
#ifdef WITH_EVENTS
EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}});
QueueIntEvent("transport.recvmsg", m_Session.GetIdentHashBase64(), 1);
#endif
m_Handler.PutNextMessage (msg);
}
@ -371,7 +370,7 @@ namespace transport
void SSUData::SendMsgAck (uint32_t msgID)
{
uint8_t buf[48 + 18]; // actual length is 44 = 37 + 7 but pad it to multiple of 16
uint8_t buf[48 + 18] = {0}; // actual length is 44 = 37 + 7 but pad it to multiple of 16
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_EXPLICIT_ACKS_INCLUDED; // flag
payload++;
@ -393,7 +392,7 @@ namespace transport
LogPrint (eLogWarning, "SSU: Fragment number ", fragmentNum, " exceeds 64");
return;
}
uint8_t buf[64 + 18];
uint8_t buf[64 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_ACK_BITFIELDS_INCLUDED; // flag
payload++;

View File

@ -350,7 +350,7 @@ namespace transport
void SSUSession::SendSessionRequest ()
{
uint8_t buf[320 + 18]; // 304 bytes for ipv4, 320 for ipv6
uint8_t buf[320 + 18] = {0}; // 304 bytes for ipv4, 320 for ipv6
uint8_t * payload = buf + sizeof (SSUHeader);
uint8_t flag = 0;
// fill extended options, 3 bytes extended options don't change message size
@ -392,7 +392,7 @@ namespace transport
return;
}
uint8_t buf[96 + 18];
uint8_t buf[96 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
htobe32buf (payload, introducer.iTag);
payload += 4;
@ -427,7 +427,7 @@ namespace transport
SignedData s; // x,y, remote IP, remote port, our IP, our port, relayTag, signed on time
s.Insert (x, 256); // x
uint8_t buf[384 + 18];
uint8_t buf[384 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
memcpy (payload, m_DHKeysPair->GetPublicKey (), 256);
s.Insert (payload, 256); // y
@ -473,14 +473,18 @@ namespace transport
m_SignedData = std::unique_ptr<SignedData>(new SignedData (s));
s.Insert (payload - 4, 4); // BOB's signed on time
s.Sign (i2p::context.GetPrivateKeys (), payload); // DSA signature
// TODO: fill padding with random data
uint8_t iv[16];
RAND_bytes (iv, 16); // random iv
// encrypt signature and padding with newly created session key
size_t signatureLen = i2p::context.GetIdentity ()->GetSignatureLen ();
size_t paddingSize = signatureLen & 0x0F; // %16
if (paddingSize > 0) signatureLen += (16 - paddingSize);
if (paddingSize > 0)
{
// fill random padding
RAND_bytes(payload + signatureLen, (16 - paddingSize));
signatureLen += (16 - paddingSize);
}
m_SessionKeyEncryption.SetIV (iv);
m_SessionKeyEncryption.Encrypt (payload, signatureLen, payload);
payload += signatureLen;
@ -493,7 +497,7 @@ namespace transport
void SSUSession::SendSessionConfirmed (const uint8_t * y, const uint8_t * ourAddress, size_t ourAddressLen)
{
uint8_t buf[512 + 18];
uint8_t buf[512 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = 1; // 1 fragment
payload++; // info
@ -508,9 +512,8 @@ namespace transport
auto signatureLen = i2p::context.GetIdentity ()->GetSignatureLen ();
size_t paddingSize = ((payload - buf) + signatureLen)%16;
if (paddingSize > 0) paddingSize = 16 - paddingSize;
// TODO: fill padding
RAND_bytes(payload, paddingSize); // fill padding with random
payload += paddingSize; // padding size
// signature
SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, our signed on time
s.Insert (m_DHKeysPair->GetPublicKey (), 256); // x
@ -559,14 +562,14 @@ namespace transport
void SSUSession::SendRelayResponse (uint32_t nonce, const boost::asio::ip::udp::endpoint& from,
const uint8_t * introKey, const boost::asio::ip::udp::endpoint& to)
{
uint8_t buf[80 + 18]; // 64 Alice's ipv4 and 80 Alice's ipv6
uint8_t * payload = buf + sizeof (SSUHeader);
// Charlie's address always v4
if (!to.address ().is_v4 ())
{
LogPrint (eLogWarning, "SSU: Charlie's IP must be v4");
return;
}
uint8_t buf[80 + 18] = {0}; // 64 Alice's ipv4 and 80 Alice's ipv6
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = 4;
payload++; // size
htobe32buf (payload, to.address ().to_v4 ().to_ulong ()); // Charlie's IP
@ -619,7 +622,7 @@ namespace transport
LogPrint (eLogWarning, "SSU: Alice's IP must be v4");
return;
}
uint8_t buf[48 + 18];
uint8_t buf[48 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = 4;
payload++; // size
@ -1040,7 +1043,7 @@ namespace transport
// toAddress is true for Alice<->Chalie communications only
// sendAddress is false if message comes from Alice
{
uint8_t buf[80 + 18];
uint8_t buf[80 + 18] = {0};
uint8_t iv[16];
uint8_t * payload = buf + sizeof (SSUHeader);
htobe32buf (payload, nonce);
@ -1096,7 +1099,7 @@ namespace transport
// encrypt message with session key
FillHeaderAndEncrypt (PAYLOAD_TYPE_PEER_TEST, buf, 80);
Send (buf, 80);
}
}
}
void SSUSession::SendPeerTest ()
@ -1121,7 +1124,7 @@ namespace transport
{
if (m_State == eSessionStateEstablished)
{
uint8_t buf[48 + 18];
uint8_t buf[48 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = 0; // flags
payload++;
@ -1138,7 +1141,7 @@ namespace transport
{
if (m_IsSessionKey)
{
uint8_t buf[48 + 18];
uint8_t buf[48 + 18] = {0};
// encrypt message with session key
FillHeaderAndEncrypt (PAYLOAD_TYPE_SESSION_DESTROYED, buf, 48);
try
@ -1155,7 +1158,7 @@ namespace transport
void SSUSession::Send (uint8_t type, const uint8_t * payload, size_t len)
{
uint8_t buf[SSU_MTU_V4 + 18];
uint8_t buf[SSU_MTU_V4 + 18] = {0};
size_t msgSize = len + sizeof (SSUHeader);
size_t paddingSize = msgSize & 0x0F; // %16
if (paddingSize > 0) msgSize += (16 - paddingSize);

View File

@ -467,17 +467,27 @@ namespace crypto
return GetEd25519 ()->Verify (m_PublicKey, digest, signature);
}
EDDSA25519Signer::EDDSA25519Signer (const uint8_t * signingPrivateKey)
EDDSA25519Signer::EDDSA25519Signer (const uint8_t * signingPrivateKey, const uint8_t * signingPublicKey)
{
// expand key
SHA512 (signingPrivateKey, EDDSA25519_PRIVATE_KEY_LENGTH, m_ExpandedPrivateKey);
m_ExpandedPrivateKey[0] &= 0xF8; // drop last 3 bits
m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0x1F; // drop first 3 bits
m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0x3F; // drop first 2 bits
m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] |= 0x40; // set second bit
// generate and encode public key
BN_CTX * ctx = BN_CTX_new ();
auto publicKey = GetEd25519 ()->GeneratePublicKey (m_ExpandedPrivateKey, ctx);
GetEd25519 ()->EncodePublicKey (publicKey, m_PublicKeyEncoded, ctx);
if (signingPublicKey && memcmp (m_PublicKeyEncoded, signingPublicKey, EDDSA25519_PUBLIC_KEY_LENGTH))
{
// keys don't match, it means older key with 0x1F
LogPrint (eLogWarning, "Older EdDSA key detected");
m_ExpandedPrivateKey[EDDSA25519_PRIVATE_KEY_LENGTH - 1] &= 0xDF; // drop third bit
publicKey = GetEd25519 ()->GeneratePublicKey (m_ExpandedPrivateKey, ctx);
GetEd25519 ()->EncodePublicKey (publicKey, m_PublicKeyEncoded, ctx);
}
BN_CTX_free (ctx);
}

View File

@ -424,7 +424,8 @@ namespace crypto
{
public:
EDDSA25519Signer (const uint8_t * signingPrivateKey);
EDDSA25519Signer (const uint8_t * signingPrivateKey, const uint8_t * signingPublicKey = nullptr);
// we pass signingPublicKey to check if it matches private key
void Sign (const uint8_t * buf, int len, uint8_t * signature) const;
const uint8_t * GetPublicKey () const { return m_PublicKeyEncoded; };

View File

@ -40,15 +40,15 @@ namespace stream
{
auto packet = m_ReceiveQueue.front ();
m_ReceiveQueue.pop ();
delete packet;
m_LocalDestination.DeletePacket (packet);
}
for (auto it: m_SentPackets)
delete it;
m_LocalDestination.DeletePacket (it);
m_SentPackets.clear ();
for (auto it: m_SavedPackets)
delete it;
m_LocalDestination.DeletePacket (it);
m_SavedPackets.clear ();
LogPrint (eLogDebug, "Streaming: Stream deleted");
@ -83,7 +83,7 @@ namespace stream
{
// plain ack
LogPrint (eLogDebug, "Streaming: Plain ACK received");
delete packet;
m_LocalDestination.DeletePacket (packet);
return;
}
@ -131,7 +131,7 @@ namespace stream
// we have received duplicate
LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID);
SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped
m_LocalDestination.DeletePacket (packet); // packet dropped
}
else
{
@ -163,7 +163,7 @@ namespace stream
void Stream::SavePacket (Packet * packet)
{
if (!m_SavedPackets.insert (packet).second)
delete packet;
m_LocalDestination.DeletePacket (packet);
}
void Stream::ProcessPacket (Packet * packet)
@ -216,7 +216,7 @@ namespace stream
m_ReceiveTimer.cancel ();
}
else
delete packet;
m_LocalDestination.DeletePacket (packet);
m_LastReceivedSequenceNumber = receivedSeqn;
@ -278,7 +278,7 @@ namespace stream
m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
m_SentPackets.erase (it++);
delete sentPacket;
m_LocalDestination.DeletePacket (sentPacket);
acknowledged = true;
if (m_WindowSize < WINDOW_SIZE)
m_WindowSize++; // slow start
@ -345,7 +345,7 @@ namespace stream
std::unique_lock<std::mutex> l(m_SendBufferMutex);
while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0))
{
Packet * p = new Packet ();
Packet * p = m_LocalDestination.NewPacket ();
uint8_t * packet = p->GetBuffer ();
// TODO: implement setters
size_t size = 0;
@ -532,7 +532,7 @@ namespace stream
void Stream::SendClose ()
{
Packet * p = new Packet ();
Packet * p = m_LocalDestination.NewPacket ();
uint8_t * packet = p->GetBuffer ();
size_t size = 0;
htobe32buf (packet + size, m_SendStreamID);
@ -574,7 +574,7 @@ namespace stream
if (!packet->GetLength ())
{
m_ReceiveQueue.pop ();
delete packet;
m_LocalDestination.DeletePacket (packet);
}
}
return pos;
@ -852,7 +852,7 @@ namespace stream
{
for (auto& it: m_SavedPackets)
{
for (auto it1: it.second) delete it1;
for (auto it1: it.second) DeletePacket (it1);
it.second.clear ();
}
m_SavedPackets.clear ();
@ -867,6 +867,7 @@ namespace stream
{
ResetAcceptor ();
m_PendingIncomingTimer.cancel ();
m_PendingIncomingStreams.clear ();
m_ConnTrackTimer.cancel();
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
@ -889,7 +890,7 @@ namespace stream
else
{
LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID);
delete packet;
DeletePacket (packet);
}
}
else
@ -901,7 +902,7 @@ namespace stream
{
// already pending
LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists");
delete packet; // drop it, because previous should be connected
DeletePacket (packet); // drop it, because previous should be connected
return;
}
auto incomingStream = CreateNewIncomingStream ();
@ -980,7 +981,7 @@ namespace stream
auto it = s->m_SavedPackets.find (receiveStreamID);
if (it != s->m_SavedPackets.end ())
{
for (auto it1: it->second) delete it1;
for (auto it1: it->second) s->DeletePacket (it1);
it->second.clear ();
s->m_SavedPackets.erase (it);
}
@ -1020,14 +1021,16 @@ namespace stream
void StreamingDestination::SetAcceptor (const Acceptor& acceptor)
{
m_Owner->GetService ().post([acceptor, this](void)
m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet
auto s = shared_from_this ();
m_Owner->GetService ().post([s](void)
{
m_Acceptor = acceptor;
for (auto& it: m_PendingIncomingStreams)
// take care about incoming queue
for (auto& it: s->m_PendingIncomingStreams)
if (it->GetStatus () == eStreamStatusOpen) // still open?
m_Acceptor (it);
m_PendingIncomingStreams.clear ();
m_PendingIncomingTimer.cancel ();
s->m_Acceptor (it);
s->m_PendingIncomingStreams.clear ();
s->m_PendingIncomingTimer.cancel ();
});
}
@ -1074,13 +1077,13 @@ namespace stream
void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len)
{
// unzip it
Packet * uncompressed = new Packet;
Packet * uncompressed = NewPacket ();
uncompressed->offset = 0;
uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE);
if (uncompressed->len)
HandleNextPacket (uncompressed);
else
delete uncompressed;
DeletePacket (uncompressed);
}
std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort)

View File

@ -18,6 +18,7 @@
#include "I2NPProtocol.h"
#include "Garlic.h"
#include "Tunnel.h"
#include "util.h" // MemoryPool
namespace i2p
{
@ -234,6 +235,9 @@ namespace stream
/** set max connections per minute per destination */
void SetMaxConnsPerMinute(const uint32_t conns);
Packet * NewPacket () { return m_PacketsPool.Acquire (); };
void DeletePacket (Packet * p) { if (p) m_PacketsPool.Release (p); };
private:
@ -269,7 +273,9 @@ namespace stream
/** banned identities */
std::vector<i2p::data::IdentHash> m_Banned;
uint64_t m_LastBanClear;
i2p::util::MemoryPool<Packet> m_PacketsPool;
public:
i2p::data::GzipInflator m_Inflator;

View File

@ -264,7 +264,7 @@ namespace transport
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{
#ifdef WITH_EVENTS
EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}});
QueueIntEvent("transport.send", ident.ToBase64(), msgs.size());
#endif
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
}

View File

@ -587,6 +587,7 @@ namespace tunnel
for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();)
{
auto tunnel = it->second;
auto pool = tunnel->GetTunnelPool();
switch (tunnel->GetState ())
{
case eTunnelStatePending:
@ -612,6 +613,8 @@ namespace tunnel
#ifdef WITH_EVENTS
EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed);
#endif
// for i2lua
if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout);
// delete
it = pendingTunnels.erase (it);
m_NumFailedTunnelCreations++;
@ -624,6 +627,9 @@ namespace tunnel
#ifdef WITH_EVENTS
EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed);
#endif
// for i2lua
if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected);
it = pendingTunnels.erase (it);
m_NumFailedTunnelCreations++;
break;

View File

@ -81,6 +81,8 @@ namespace tunnel
}
if (m_LocalDestination)
m_LocalDestination->SetLeaseSetUpdated ();
OnTunnelBuildResult(createdTunnel, eBuildResultOkay);
}
void TunnelPool::TunnelExpired (std::shared_ptr<InboundTunnel> expiredTunnel)
@ -109,6 +111,8 @@ namespace tunnel
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
m_OutboundTunnels.insert (createdTunnel);
}
OnTunnelBuildResult(createdTunnel, eBuildResultOkay);
//CreatePairedInboundTunnel (createdTunnel);
}
@ -579,5 +583,11 @@ namespace tunnel
}
return tun;
}
void TunnelPool::OnTunnelBuildResult(std::shared_ptr<Tunnel> tunnel, TunnelBuildResult result)
{
auto peers = tunnel->GetPeers();
if(m_CustomPeerSelector) m_CustomPeerSelector->OnBuildResult(peers, tunnel->IsInbound(), result);
}
}
}

View File

@ -23,12 +23,21 @@ namespace tunnel
class InboundTunnel;
class OutboundTunnel;
enum TunnelBuildResult {
eBuildResultOkay, // tunnel was built okay
eBuildResultRejected, // tunnel build was explicitly rejected
eBuildResultTimeout // tunnel build timed out
};
/** interface for custom tunnel peer selection algorithm */
struct ITunnelPeerSelector
{
typedef std::shared_ptr<const i2p::data::IdentityEx> Peer;
typedef std::vector<Peer> TunnelPath;
virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0;
virtual bool OnBuildResult(TunnelPath & peers, bool isInbound, TunnelBuildResult result) = 0;
};
typedef std::shared_ptr<ITunnelPeerSelector> TunnelPeerSelector;
@ -79,6 +88,8 @@ namespace tunnel
/** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */
std::shared_ptr<InboundTunnel> GetLowestLatencyInboundTunnel(std::shared_ptr<InboundTunnel> exclude=nullptr) const;
std::shared_ptr<OutboundTunnel> GetLowestLatencyOutboundTunnel(std::shared_ptr<OutboundTunnel> exclude=nullptr) const;
void OnTunnelBuildResult(std::shared_ptr<Tunnel> tunnel, TunnelBuildResult result);
private:

467
WebSocks.cpp Normal file
View File

@ -0,0 +1,467 @@
#include "WebSocks.h"
#include "Log.h"
#include <string>
#ifdef WITH_EVENTS
#include "ClientContext.h"
#include "Identity.h"
#include "Destination.h"
#include "Streaming.h"
#include <functional>
#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>
#include <boost/property_tree/ini_parser.hpp>
#define GCC47_BOOST149 ((BOOST_VERSION == 104900) && (__GNUC__ == 4) && (__GNUC_MINOR__ >= 7))
#if !GCC47_BOOST149
#include <boost/property_tree/json_parser.hpp>
#endif
namespace i2p
{
namespace client
{
typedef websocketpp::server<websocketpp::config::asio> WebSocksServerImpl;
typedef std::function<void(std::shared_ptr<i2p::stream::Stream>)> StreamConnectFunc;
struct IWebSocksConn : public I2PServiceHandler
{
IWebSocksConn(I2PService * parent) : I2PServiceHandler(parent) {}
virtual void Close() = 0;
virtual void GotMessage(const websocketpp::connection_hdl & conn, WebSocksServerImpl::message_ptr msg) = 0;
};
typedef std::shared_ptr<IWebSocksConn> WebSocksConn_ptr;
WebSocksConn_ptr CreateWebSocksConn(const websocketpp::connection_hdl & conn, WebSocksImpl * parent);
class WebSocksImpl
{
typedef std::mutex mutex_t;
typedef std::unique_lock<mutex_t> lock_t;
typedef std::shared_ptr<ClientDestination> Destination_t;
public:
typedef WebSocksServerImpl ServerImpl;
typedef ServerImpl::message_ptr MessagePtr;
WebSocksImpl(const std::string & addr, int port) :
Parent(nullptr),
m_Run(false),
m_Addr(addr),
m_Port(port),
m_Thread(nullptr)
{
m_Server.init_asio();
m_Server.set_open_handler(std::bind(&WebSocksImpl::ConnOpened, this, std::placeholders::_1));
}
void InitializeDestination(WebSocks * parent)
{
Parent = parent;
m_Dest = Parent->GetLocalDestination();
}
ServerImpl::connection_ptr GetConn(const websocketpp::connection_hdl & conn)
{
return m_Server.get_con_from_hdl(conn);
}
void CloseConn(const websocketpp::connection_hdl & conn)
{
auto c = GetConn(conn);
if(c) c->close(websocketpp::close::status::normal, "closed");
}
void CreateStreamTo(const std::string & addr, int port, StreamConnectFunc complete)
{
auto & addressbook = i2p::client::context.GetAddressBook();
i2p::data::IdentHash ident;
if(addressbook.GetIdentHash(addr, ident)) {
// address found
m_Dest->CreateStream(complete, ident, port);
} else {
// not found
complete(nullptr);
}
}
void ConnOpened(websocketpp::connection_hdl conn)
{
auto ptr = CreateWebSocksConn(conn, this);
Parent->AddHandler(ptr);
m_Conns.push_back(ptr);
}
void Start()
{
if(m_Run) return; // already started
m_Server.listen(boost::asio::ip::address::from_string(m_Addr), m_Port);
m_Server.start_accept();
m_Run = true;
m_Thread = new std::thread([&] (){
while(m_Run) {
try {
m_Server.run();
} catch( std::exception & ex) {
LogPrint(eLogError, "Websocks runtime exception: ", ex.what());
}
}
});
m_Dest->Start();
}
void Stop()
{
for(const auto & conn : m_Conns)
conn->Close();
m_Dest->Stop();
m_Run = false;
m_Server.stop();
if(m_Thread) {
m_Thread->join();
delete m_Thread;
}
m_Thread = nullptr;
}
boost::asio::ip::tcp::endpoint GetLocalEndpoint()
{
return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(m_Addr), m_Port);
}
WebSocks * Parent;
private:
std::vector<WebSocksConn_ptr> m_Conns;
bool m_Run;
ServerImpl m_Server;
std::string m_Addr;
int m_Port;
std::thread * m_Thread;
Destination_t m_Dest;
};
struct WebSocksConn : public IWebSocksConn , public std::enable_shared_from_this<WebSocksConn>
{
enum ConnState
{
eWSCInitial,
eWSCTryConnect,
eWSCFailConnect,
eWSCOkayConnect,
eWSCClose,
eWSCEnd
};
typedef WebSocksServerImpl ServerImpl;
typedef ServerImpl::message_ptr Message_t;
typedef websocketpp::connection_hdl ServerConn;
typedef std::shared_ptr<ClientDestination> Destination_t;
typedef std::shared_ptr<i2p::stream::StreamingDestination> StreamDest_t;
typedef std::shared_ptr<i2p::stream::Stream> Stream_t;
ServerConn m_Conn;
Stream_t m_Stream;
ConnState m_State;
WebSocksImpl * m_Parent;
std::string m_RemoteAddr;
int m_RemotePort;
uint8_t m_RecvBuf[2048];
WebSocksConn(const ServerConn & conn, WebSocksImpl * parent) :
IWebSocksConn(parent->Parent),
m_Conn(conn),
m_Stream(nullptr),
m_State(eWSCInitial),
m_Parent(parent)
{
}
~WebSocksConn()
{
Close();
}
void EnterState(ConnState state)
{
LogPrint(eLogDebug, "websocks: state ", m_State, " -> ", state);
switch(m_State)
{
case eWSCInitial:
if (state == eWSCClose) {
m_State = eWSCClose;
// connection was opened but never used
LogPrint(eLogInfo, "websocks: connection closed but never used");
Close();
return;
} else if (state == eWSCTryConnect) {
// we will try to connect
m_State = eWSCTryConnect;
m_Parent->CreateStreamTo(m_RemoteAddr, m_RemotePort, std::bind(&WebSocksConn::ConnectResult, this, std::placeholders::_1));
} else {
LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state);
}
return;
case eWSCTryConnect:
if(state == eWSCOkayConnect) {
// we connected okay
LogPrint(eLogDebug, "websocks: connected to ", m_RemoteAddr, ":", m_RemotePort);
SendResponse("");
m_State = eWSCOkayConnect;
} else if(state == eWSCFailConnect) {
// we did not connect okay
LogPrint(eLogDebug, "websocks: failed to connect to ", m_RemoteAddr, ":", m_RemotePort);
SendResponse("failed to connect");
m_State = eWSCFailConnect;
EnterState(eWSCInitial);
} else if(state == eWSCClose) {
// premature close
LogPrint(eLogWarning, "websocks: websocket connection closed prematurely");
m_State = eWSCClose;
} else {
LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state);
}
return;
case eWSCFailConnect:
if (state == eWSCInitial) {
// reset to initial state so we can try connecting again
m_RemoteAddr = "";
m_RemotePort = 0;
LogPrint(eLogDebug, "websocks: reset websocket conn to initial state");
m_State = eWSCInitial;
} else if (state == eWSCClose) {
// we are going to close the connection
m_State = eWSCClose;
Close();
} else {
LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state);
}
return;
case eWSCOkayConnect:
if(state == eWSCClose) {
// graceful close
m_State = eWSCClose;
Close();
} else {
LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state);
}
case eWSCClose:
if(state == eWSCEnd) {
LogPrint(eLogDebug, "websocks: socket ended");
Kill();
auto me = shared_from_this();
Done(me);
} else {
LogPrint(eLogWarning, "websocks: invalid state change ", m_State, " -> ", state);
}
return;
default:
LogPrint(eLogError, "websocks: bad state ", m_State);
}
}
void StartForwarding()
{
LogPrint(eLogDebug, "websocks: begin forwarding data");
uint8_t b[1];
m_Stream->Send(b, 0);
AsyncRecv();
}
void HandleAsyncRecv(const boost::system::error_code &ec, std::size_t n)
{
if(ec) {
// error
LogPrint(eLogWarning, "websocks: connection error ", ec.message());
EnterState(eWSCClose);
} else {
// forward data
LogPrint(eLogDebug, "websocks recv ", n);
std::string str((char*)m_RecvBuf, n);
auto conn = m_Parent->GetConn(m_Conn);
if(!conn) {
LogPrint(eLogWarning, "websocks: connection is gone");
EnterState(eWSCClose);
return;
}
conn->send(str);
AsyncRecv();
}
}
void AsyncRecv()
{
m_Stream->AsyncReceive(
boost::asio::buffer(m_RecvBuf, sizeof(m_RecvBuf)),
std::bind(&WebSocksConn::HandleAsyncRecv, this, std::placeholders::_1, std::placeholders::_2), 60);
}
/** @brief send error message or empty string for success */
void SendResponse(const std::string & errormsg)
{
boost::property_tree::ptree resp;
if(errormsg.size()) {
resp.put("error", errormsg);
resp.put("success", 0);
} else {
resp.put("success", 1);
}
std::ostringstream ss;
write_json(ss, resp);
auto conn = m_Parent->GetConn(m_Conn);
if(conn) conn->send(ss.str());
}
void ConnectResult(Stream_t stream)
{
m_Stream = stream;
if(m_State == eWSCClose) {
// premature close of websocket
Close();
return;
}
if(m_Stream) {
// connect good
EnterState(eWSCOkayConnect);
StartForwarding();
} else {
// connect failed
EnterState(eWSCFailConnect);
}
}
virtual void GotMessage(const websocketpp::connection_hdl & conn, WebSocksServerImpl::message_ptr msg)
{
(void) conn;
std::string payload = msg->get_payload();
if(m_State == eWSCOkayConnect)
{
// forward to server
LogPrint(eLogDebug, "websocks: forward ", payload.size());
m_Stream->Send((uint8_t*)payload.c_str(), payload.size());
} else if (m_State == eWSCInitial) {
// recv connect request
auto itr = payload.find(":");
if(itr == std::string::npos) {
// no port
m_RemotePort = 0;
m_RemoteAddr = payload;
} else {
// includes port
m_RemotePort = std::stoi(payload.substr(itr+1));
m_RemoteAddr = payload.substr(0, itr);
}
EnterState(eWSCTryConnect);
} else {
// wtf?
LogPrint(eLogWarning, "websocks: got message in invalid state ", m_State);
}
}
virtual void Close()
{
if(m_State == eWSCClose) {
LogPrint(eLogDebug, "websocks: closing connection");
if(m_Stream) m_Stream->Close();
m_Parent->CloseConn(m_Conn);
EnterState(eWSCEnd);
} else {
EnterState(eWSCClose);
}
}
};
WebSocksConn_ptr CreateWebSocksConn(const websocketpp::connection_hdl & conn, WebSocksImpl * parent)
{
auto ptr = std::make_shared<WebSocksConn>(conn, parent);
auto c = parent->GetConn(conn);
c->set_message_handler(std::bind(&WebSocksConn::GotMessage, ptr.get(), std::placeholders::_1, std::placeholders::_2));
return ptr;
}
}
}
#else
// no websocket support
namespace i2p
{
namespace client
{
class WebSocksImpl
{
public:
WebSocksImpl(const std::string & addr, int port) : m_Addr(addr), m_Port(port)
{
}
~WebSocksImpl()
{
}
void Start()
{
LogPrint(eLogInfo, "WebSockets not enabled on compile time");
}
void Stop()
{
}
void InitializeDestination(WebSocks * parent)
{
}
boost::asio::ip::tcp::endpoint GetLocalEndpoint()
{
return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(m_Addr), m_Port);
}
std::string m_Addr;
int m_Port;
};
}
}
#endif
namespace i2p
{
namespace client
{
WebSocks::WebSocks(const std::string & addr, int port, std::shared_ptr<ClientDestination> localDestination) : m_Impl(new WebSocksImpl(addr, port))
{
m_Impl->InitializeDestination(this);
}
WebSocks::~WebSocks() { delete m_Impl; }
void WebSocks::Start()
{
m_Impl->Start();
GetLocalDestination()->Start();
}
boost::asio::ip::tcp::endpoint WebSocks::GetLocalEndpoint() const
{
return m_Impl->GetLocalEndpoint();
}
void WebSocks::Stop()
{
m_Impl->Stop();
GetLocalDestination()->Stop();
}
}
}

34
WebSocks.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef WEBSOCKS_H_
#define WEBSOCKS_H_
#include <string>
#include <memory>
#include "I2PService.h"
#include "Destination.h"
namespace i2p
{
namespace client
{
class WebSocksImpl;
/** @brief websocket socks proxy server */
class WebSocks : public i2p::client::I2PService
{
public:
WebSocks(const std::string & addr, int port, std::shared_ptr<ClientDestination> localDestination);
~WebSocks();
void Start();
void Stop();
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
const char * GetName() { return "WebSOCKS Proxy"; }
private:
WebSocksImpl * m_Impl;
};
}
}
#endif

View File

@ -2,6 +2,7 @@
#include "Log.h"
#include <set>
#include <functional>
#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>
@ -27,7 +28,11 @@ namespace i2p
typedef ServerImpl::message_ptr MessagePtr;
public:
WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr)
WebsocketServerImpl(const std::string & addr, int port) :
m_run(false),
m_ws_thread(nullptr),
m_ev_thread(nullptr),
m_WebsocketTicker(m_Service)
{
m_server.init_asio();
m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1));
@ -44,7 +49,7 @@ namespace i2p
void Start() {
m_run = true;
m_server.start_accept();
m_thread = new std::thread([&] () {
m_ws_thread = new std::thread([&] () {
while(m_run) {
try {
m_server.run();
@ -53,16 +58,35 @@ namespace i2p
}
}
});
m_ev_thread = new std::thread([&] () {
while(m_run) {
try {
m_Service.run();
break;
} catch (std::exception & e ) {
LogPrint(eLogError, "Websocket service: ", e.what());
}
}
});
ScheduleTick();
}
void Stop() {
m_run = false;
m_Service.stop();
m_server.stop();
if(m_thread) {
m_thread->join();
delete m_thread;
if(m_ev_thread) {
m_ev_thread->join();
delete m_ev_thread;
}
m_thread = nullptr;
m_ev_thread = nullptr;
if(m_ws_thread) {
m_ws_thread->join();
delete m_ws_thread;
}
m_ws_thread = nullptr;
}
void ConnOpened(ServerConn c)
@ -82,11 +106,40 @@ namespace i2p
(void) conn;
(void) msg;
}
void HandleTick(const boost::system::error_code & ec)
{
if(ec != boost::asio::error::operation_aborted)
LogPrint(eLogError, "Websocket ticker: ", ec.message());
// pump collected events to us
i2p::event::core.PumpCollected(this);
ScheduleTick();
}
void ScheduleTick()
{
LogPrint(eLogDebug, "Websocket schedule tick");
boost::posix_time::seconds dlt(1);
m_WebsocketTicker.expires_from_now(dlt);
m_WebsocketTicker.async_wait(std::bind(&WebsocketServerImpl::HandleTick, this, std::placeholders::_1));
}
/** @brief called from m_ev_thread */
void HandlePumpEvent(const EventType & ev, const uint64_t & val)
{
EventType e;
for (const auto & i : ev)
e[i.first] = i.second;
e["number"] = std::to_string(val);
HandleEvent(e);
}
/** @brief called from m_ws_thread */
void HandleEvent(const EventType & ev)
{
std::lock_guard<std::mutex> lock(m_connsMutex);
LogPrint(eLogDebug, "websocket event");
boost::property_tree::ptree event;
for (const auto & item : ev) {
event.put(item.first, item.second);
@ -105,10 +158,13 @@ namespace i2p
private:
typedef std::set<ServerConn, std::owner_less<ServerConn> > ConnList;
bool m_run;
std::thread * m_thread;
std::thread * m_ws_thread;
std::thread * m_ev_thread;
std::mutex m_connsMutex;
ConnList m_conns;
ServerImpl m_server;
boost::asio::io_service m_Service;
boost::asio::deadline_timer m_WebsocketTicker;
};

View File

@ -59,7 +59,8 @@ LOCAL_SRC_FILES := DaemonAndroid.cpp i2pd_android.cpp \
../../TunnelPool.cpp \
../../Timestamp.cpp \
../../Event.cpp \
../../BloomFilter.cpp \
../../WebSocks.cpp \
../../BloomFilter.cpp \
../../util.cpp \
../../i2pd.cpp ../../UPnP.cpp

View File

@ -33,6 +33,10 @@ namespace api
#else
i2p::crypto::InitCrypto (true);
#endif
int netID; i2p::config::GetOption("netid", netID);
i2p::context.SetNetID (netID);
i2p::context.Init ();
}

View File

@ -98,6 +98,7 @@ set (CLIENT_SRC
"${CMAKE_SOURCE_DIR}/HTTP.cpp"
"${CMAKE_SOURCE_DIR}/HTTPProxy.cpp"
"${CMAKE_SOURCE_DIR}/I2CP.cpp"
"${CMAKE_SOURCE_DIR}/WebSocks.cpp"
)
if(WITH_WEBSOCKETS)

34
build/docker/README.md Normal file
View File

@ -0,0 +1,34 @@
Howto build & run
==================
**Build**
Assuming you're in the root directory of the anoncoin source code.
$ `cd build/docker`
$ `docker -t meeh/i2pd:latest .`
**Run**
To run either the local build, or if not found - fetched prebuild from hub.docker.io, run the following command.
$ `docker run --name anonnode -v /path/to/i2pd/datadir/on/host:/var/lib/i2pd -p 7070:7070 -p 4444:4444 -p 4447:4447 -p 7656:7656 -p 2827:2827 -p 7654:7654 -p 7650:7650 -d meeh/i2pd`
All the ports ( -p HOSTPORT:DOCKERPORT ) is optional. However the command above enable all features (Webconsole, HTTP Proxy, BOB, SAM, i2cp, etc)
The volume ( -v HOSTDIR:DOCKERDIR ) is also optional, but if you don't use it, your config, routerid and private keys will die along with the container.
**Options**
Options are set via docker environment variables. This can be set at run with -e parameters.
* **ENABLE_IPV6** - Enable IPv6 support. Any value can be used - it triggers as long as it's not empty.
* **LOGLEVEL** - Set the loglevel.
* **ENABLE_AUTH** - Enable auth for the webconsole. Username and password needs to be set manually in i2pd.conf cause security reasons.
**Logging**
Logging happens to STDOUT as the best practise with docker containers, since infrastructure systems like kubernetes with ELK integration can automaticly forward the log to say, kibana or greylog without manual setup. :)

View File

@ -35,8 +35,9 @@ Windows-specific options:
All options below still possible in cmdline, but better write it in config file:
* --http.enabled= - If webconsole is enabled. true by default
* --http.address= - The address to listen on (HTTP server)
* --http.port= - The port to listen on (HTTP server)
* --http.port= - The port to listen on (HTTP server) 7070 by default
* --http.auth - Enable basic HTTP auth for webconsole
* --http.user= - Username for basic auth (default: i2pd)
* --http.pass= - Password for basic auth (default: random, see logs)

24
entrypoint.sh Normal file
View File

@ -0,0 +1,24 @@
#!/bin/sh
ARGS=""
if [ "${ENABLE_IPV6}" != "" ]; then
ARGS="${ARGS} ipv6"
fi
if [ "${LOGLEVEL}" != "" ]; then
ARGS="${ARGS} loglevel=${LOGLEVEL}"
fi
if [ "${ENABLE_AUTH}" != "" ]; then
ARGS="${ARGS} http.auth"
fi
# To make ports exposeable
DEFAULT_ARGS=" http.address=0.0.0.0 httpproxy.address=0.0.0.0 -socksproxy.address=0.0.0.0 sam.address=0.0.0.0 bob.address=0.0.0.0 i2cp.address=0.0.0.0 i2pcontrol.port=0.0.0.0 upnp.enabled=false -service "
mkdir -p /var/lib/i2pd && chown -R i2pd:nobody /var/lib/i2pd && chmod u+rw /var/lib/i2pd
gosu i2pd i2pd $DEFAULT_ARGS $ARGS

View File

@ -9,7 +9,7 @@ LIB_SRC = \
LIB_CLIENT_SRC = \
AddressBook.cpp BOB.cpp ClientContext.cpp I2PTunnel.cpp I2PService.cpp \
SAM.cpp SOCKS.cpp HTTPProxy.cpp I2CP.cpp
SAM.cpp SOCKS.cpp HTTPProxy.cpp I2CP.cpp WebSocks.cpp
# also: Daemon{Linux,Win32}.cpp will be added later
DAEMON_SRC = \

87
util.h
View File

@ -1,9 +1,10 @@
#ifndef UTIL_H
#define UTIL_H
#include <map>
#include <string>
#include <iostream>
#include <functional>
#include <memory>
#include <mutex>
#include <boost/asio.hpp>
#ifdef ANDROID
@ -27,6 +28,88 @@ namespace i2p
{
namespace util
{
template<class T>
class MemoryPool
{
public:
MemoryPool (): m_Head (nullptr) {};
~MemoryPool ()
{
while (m_Head)
{
auto tmp = m_Head;
m_Head = static_cast<T*>(*(void * *)m_Head); // next
delete tmp;
}
}
template<typename... TArgs>
T * Acquire (TArgs&&... args)
{
if (!m_Head) return new T(args...);
else
{
auto tmp = m_Head;
m_Head = static_cast<T*>(*(void * *)m_Head); // next
return new (tmp)T(args...);
}
}
void Release (T * t)
{
if (!t) return;
t->~T ();
*(void * *)t = m_Head; // next
m_Head = t;
}
template<typename... TArgs>
std::unique_ptr<T, std::function<void(T*)> > AcquireUnique (TArgs&&... args)
{
return std::unique_ptr<T, std::function<void(T*)> >(Acquire (args...),
std::bind (&MemoryPool<T>::Release, this, std::placeholders::_1));
}
protected:
T * m_Head;
};
template<class T>
class MemoryPoolMt: public MemoryPool<T>
{
public:
MemoryPoolMt () {};
template<typename... TArgs>
T * AcquireMt (TArgs&&... args)
{
if (!this->m_Head) return new T(args...);
std::lock_guard<std::mutex> l(m_Mutex);
return this->Acquire (args...);
}
void ReleaseMt (T * t)
{
std::lock_guard<std::mutex> l(m_Mutex);
this->Release (t);
}
template<template<typename, typename...>class C, typename... R>
void ReleaseMt(const C<T *, R...>& c)
{
std::lock_guard<std::mutex> l(m_Mutex);
for (auto& it: c)
this->Release (it);
}
private:
std::mutex m_Mutex;
};
namespace net
{
int GetMTU (const boost::asio::ip::address& localAddress);