Merge pull request #731 from PurpleI2P/openssl

recent changes
This commit is contained in:
orignal 2016-12-03 09:38:28 -05:00 committed by GitHub
commit 0ea5fbfe0a
9 changed files with 145 additions and 75 deletions

View File

@ -87,6 +87,8 @@ namespace client
Stop (); Stop ();
if (m_Pool) if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
for (auto& it: m_LeaseSetRequests)
it.second->Complete (nullptr);
} }
void LeaseSetDestination::Run () void LeaseSetDestination::Run ()
@ -130,13 +132,6 @@ namespace client
m_PublishConfirmationTimer.cancel (); m_PublishConfirmationTimer.cancel ();
m_PublishVerificationTimer.cancel (); m_PublishVerificationTimer.cancel ();
for (auto& it: m_LeaseSetRequests)
{
it.second->Complete (nullptr);
it.second->requestTimeoutTimer.cancel ();
}
m_LeaseSetRequests.clear ();
m_IsRunning = false; m_IsRunning = false;
if (m_Pool) if (m_Pool)
{ {
@ -532,6 +527,7 @@ namespace client
if (floodfill) if (floodfill)
{ {
auto request = std::make_shared<LeaseSetRequest> (m_Service); auto request = std::make_shared<LeaseSetRequest> (m_Service);
if (requestComplete)
request->requestComplete.push_back (requestComplete); request->requestComplete.push_back (requestComplete);
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
auto ret = m_LeaseSetRequests.insert (std::pair<i2p::data::IdentHash, std::shared_ptr<LeaseSetRequest> >(dest,request)); auto ret = m_LeaseSetRequests.insert (std::pair<i2p::data::IdentHash, std::shared_ptr<LeaseSetRequest> >(dest,request));
@ -542,7 +538,7 @@ namespace client
{ {
// request failed // request failed
m_LeaseSetRequests.erase (ret.first); m_LeaseSetRequests.erase (ret.first);
requestComplete (nullptr); if (requestComplete) requestComplete (nullptr);
} }
} }
else // duplicate else // duplicate
@ -552,13 +548,13 @@ namespace client
//ret.first->second->requestComplete.push_back (requestComplete); //ret.first->second->requestComplete.push_back (requestComplete);
if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT)
m_LeaseSetRequests.erase (ret.first); m_LeaseSetRequests.erase (ret.first);
requestComplete (nullptr); if (requestComplete) requestComplete (nullptr);
} }
} }
else else
{ {
LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found");
requestComplete (nullptr); if (requestComplete) requestComplete (nullptr);
} }
} }
@ -706,12 +702,12 @@ namespace client
{ {
m_ReadyChecker.cancel(); m_ReadyChecker.cancel();
m_StreamingDestination->Stop (); m_StreamingDestination->Stop ();
m_StreamingDestination->SetOwner (nullptr); //m_StreamingDestination->SetOwner (nullptr);
m_StreamingDestination = nullptr; m_StreamingDestination = nullptr;
for (auto& it: m_StreamingDestinationsByPorts) for (auto& it: m_StreamingDestinationsByPorts)
{ {
it.second->Stop (); it.second->Stop ();
it.second->SetOwner (nullptr); //it.second->SetOwner (nullptr);
} }
m_StreamingDestinationsByPorts.clear (); m_StreamingDestinationsByPorts.clear ();
if (m_DatagramDestination) if (m_DatagramDestination)

View File

@ -45,10 +45,14 @@ else
LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread
endif endif
# UPNP Support (miniupnpc 1.5 or 1.6) # UPNP Support (miniupnpc 1.5 and higher)
ifeq ($(USE_UPNP),yes) ifeq ($(USE_UPNP),yes)
LDFLAGS += -lminiupnpc
CXXFLAGS += -DUSE_UPNP CXXFLAGS += -DUSE_UPNP
ifeq ($(USE_STATIC),yes)
LDLIBS += $(LIBDIR)/libminiupnpc.a
else
LDLIBS += -lminiupnpc
endif
endif endif
IS_64 := $(shell $(CXX) -dumpmachine 2>&1 | $(GREP) -c "64") IS_64 := $(shell $(CXX) -dumpmachine 2>&1 | $(GREP) -c "64")

View File

@ -537,11 +537,11 @@ namespace transport
{ {
boost::system::error_code ec; boost::system::error_code ec;
size_t moreBytes = m_Socket.available(ec); size_t moreBytes = m_Socket.available(ec);
if (moreBytes) if (moreBytes && !ec)
{ {
if (moreBytes > NTCP_BUFFER_SIZE - m_ReceiveBufferOffset) if (moreBytes > NTCP_BUFFER_SIZE - m_ReceiveBufferOffset)
moreBytes = NTCP_BUFFER_SIZE - m_ReceiveBufferOffset; moreBytes = NTCP_BUFFER_SIZE - m_ReceiveBufferOffset;
moreBytes = m_Socket.read_some (boost::asio::buffer (m_ReceiveBuffer + m_ReceiveBufferOffset, moreBytes)); moreBytes = m_Socket.read_some (boost::asio::buffer (m_ReceiveBuffer + m_ReceiveBufferOffset, moreBytes), ec);
if (ec) if (ec)
{ {
LogPrint (eLogInfo, "NTCP: Read more bytes error: ", ec.message ()); LogPrint (eLogInfo, "NTCP: Read more bytes error: ", ec.message ());
@ -589,7 +589,8 @@ namespace transport
else else
{ {
// timestamp // timestamp
LogPrint (eLogDebug, "NTCP: Timestamp"); int diff = (int)bufbe32toh (buf + 2) - (int)i2p::util::GetSecondsSinceEpoch ();
LogPrint (eLogInfo, "NTCP: Timestamp. Time difference ", diff, " seconds");
return true; return true;
} }
} }
@ -650,7 +651,7 @@ namespace transport
sendBuffer = m_TimeSyncBuffer; sendBuffer = m_TimeSyncBuffer;
len = 4; len = 4;
htobuf16(sendBuffer, 0); htobuf16(sendBuffer, 0);
htobe32buf (sendBuffer + 2, time (0)); htobe32buf (sendBuffer + 2, i2p::util::GetSecondsSinceEpoch ());
} }
int rem = (len + 6) & 0x0F; // %16 int rem = (len + 6) & 0x0F; // %16
int padding = 0; int padding = 0;
@ -803,6 +804,12 @@ namespace transport
void NTCPServer::Stop () void NTCPServer::Stop ()
{ {
{
// we have to copy it because Terminate changes m_NTCPSessions
auto ntcpSessions = m_NTCPSessions;
for (auto& it: ntcpSessions)
it.second->Terminate ();
}
m_NTCPSessions.clear (); m_NTCPSessions.clear ();
if (m_IsRunning) if (m_IsRunning)
@ -953,16 +960,29 @@ namespace transport
m_Service.post([=]() m_Service.post([=]()
{ {
if (this->AddNTCPSession (conn)) if (this->AddNTCPSession (conn))
{
auto timer = std::make_shared<boost::asio::deadline_timer>(m_Service);
timer->expires_from_now (boost::posix_time::seconds(NTCP_CONNECT_TIMEOUT));
timer->async_wait ([conn](const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogInfo, "NTCP: Not connected in ", NTCP_CONNECT_TIMEOUT, " seconds");
conn->Terminate ();
}
});
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port),
std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn)); std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer));
}
}); });
} }
void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn) void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer)
{ {
timer->cancel ();
if (ecode) if (ecode)
{ {
LogPrint (eLogError, "NTCP: Connect error ", ecode.message ()); LogPrint (eLogInfo, "NTCP: Connect error ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true); i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true);
conn->Terminate (); conn->Terminate ();

View File

@ -36,6 +36,7 @@ namespace transport
const size_t NTCP_MAX_MESSAGE_SIZE = 16384; const size_t NTCP_MAX_MESSAGE_SIZE = 16384;
const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028) const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028)
const int NTCP_CONNECT_TIMEOUT = 5; // 5 seconds
const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes
const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds
const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448
@ -153,7 +154,7 @@ namespace transport
void HandleAccept (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error); void HandleAccept (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error);
void HandleAcceptV6 (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error); void HandleAcceptV6 (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error);
void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn); void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer);
// timer // timer
void ScheduleTermination (); void ScheduleTermination ();

View File

@ -1008,12 +1008,12 @@ namespace data
}); });
} }
std::shared_ptr<const RouterInfo> NetDb::GetRandomPeerTestRouter () const std::shared_ptr<const RouterInfo> NetDb::GetRandomPeerTestRouter (bool v4only) const
{ {
return GetRandomRouter ( return GetRandomRouter (
[](std::shared_ptr<const RouterInfo> router)->bool [v4only](std::shared_ptr<const RouterInfo> router)->bool
{ {
return !router->IsHidden () && router->IsPeerTesting (); return !router->IsHidden () && router->IsPeerTesting () && router->IsSSU (v4only);
}); });
} }

View File

@ -69,7 +69,7 @@ namespace data
std::shared_ptr<const RouterInfo> GetRandomRouter () const; std::shared_ptr<const RouterInfo> GetRandomRouter () const;
std::shared_ptr<const RouterInfo> GetRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const; std::shared_ptr<const RouterInfo> GetRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const;
std::shared_ptr<const RouterInfo> GetHighBandwidthRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const; std::shared_ptr<const RouterInfo> GetHighBandwidthRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const;
std::shared_ptr<const RouterInfo> GetRandomPeerTestRouter () const; std::shared_ptr<const RouterInfo> GetRandomPeerTestRouter (bool v4only = true) const;
std::shared_ptr<const RouterInfo> GetRandomIntroducer () const; std::shared_ptr<const RouterInfo> GetRandomIntroducer () const;
std::shared_ptr<const RouterInfo> GetClosestFloodfill (const IdentHash& destination, const std::set<IdentHash>& excluded, bool closeThanUsOnly = false) const; std::shared_ptr<const RouterInfo> GetClosestFloodfill (const IdentHash& destination, const std::set<IdentHash>& excluded, bool closeThanUsOnly = false) const;
std::vector<IdentHash> GetClosestFloodfills (const IdentHash& destination, size_t num, std::vector<IdentHash> GetClosestFloodfills (const IdentHash& destination, size_t num,

86
SSU.cpp
View File

@ -20,11 +20,7 @@ namespace transport
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service),
m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6)
{ {
m_SocketV6.open (boost::asio::ip::udp::v6()); OpenSocketV6 ();
m_SocketV6.set_option (boost::asio::ip::v6_only (true));
m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535));
m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535));
m_SocketV6.bind (m_EndpointV6);
} }
SSUServer::SSUServer (int port): SSUServer::SSUServer (int port):
@ -32,14 +28,28 @@ namespace transport
m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr),
m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService),
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversService),
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service),
m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6)
{ {
OpenSocket ();
if (context.SupportsV6 ())
OpenSocketV6 ();
}
SSUServer::~SSUServer ()
{
}
void SSUServer::OpenSocket ()
{
m_Socket.open (boost::asio::ip::udp::v4());
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535));
m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535));
if (context.SupportsV6 ()) m_Socket.bind (m_Endpoint);
}
void SSUServer::OpenSocketV6 ()
{ {
m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.open (boost::asio::ip::udp::v6());
m_SocketV6.set_option (boost::asio::ip::v6_only (true)); m_SocketV6.set_option (boost::asio::ip::v6_only (true));
@ -47,11 +57,6 @@ namespace transport
m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535)); m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535));
m_SocketV6.bind (m_EndpointV6); m_SocketV6.bind (m_EndpointV6);
} }
}
SSUServer::~SSUServer ()
{
}
void SSUServer::Start () void SSUServer::Start ()
{ {
@ -194,12 +199,25 @@ namespace transport
boost::system::error_code ec; boost::system::error_code ec;
size_t moreBytes = m_Socket.available(ec); size_t moreBytes = m_Socket.available(ec);
if (!ec)
{
while (moreBytes && packets.size () < 25) while (moreBytes && packets.size () < 25)
{ {
packet = new SSUPacket (); packet = new SSUPacket ();
packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from); packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, 0, ec);
if (!ec)
{
packets.push_back (packet); packets.push_back (packet);
moreBytes = m_Socket.available(); moreBytes = m_Socket.available(ec);
if (ec) break;
}
else
{
LogPrint (eLogError, "SSU: receive_from error: ", ec.message ());
delete packet;
break;
}
}
} }
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions)); m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions));
@ -207,8 +225,14 @@ namespace transport
} }
else else
{ {
LogPrint (eLogError, "SSU: receive error: ", ecode.message ());
delete packet; delete packet;
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogError, "SSU: receive error: ", ecode.message ());
m_Socket.close ();
OpenSocket ();
Receive ();
}
} }
} }
@ -220,13 +244,27 @@ namespace transport
std::vector<SSUPacket *> packets; std::vector<SSUPacket *> packets;
packets.push_back (packet); packets.push_back (packet);
size_t moreBytes = m_SocketV6.available (); boost::system::error_code ec;
size_t moreBytes = m_SocketV6.available (ec);
if (!ec)
{
while (moreBytes && packets.size () < 25) while (moreBytes && packets.size () < 25)
{ {
packet = new SSUPacket (); packet = new SSUPacket ();
packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from); packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, 0, ec);
if (!ec)
{
packets.push_back (packet); packets.push_back (packet);
moreBytes = m_SocketV6.available(); moreBytes = m_SocketV6.available(ec);
if (ec) break;
}
else
{
LogPrint (eLogError, "SSU: v6 receive_from error: ", ec.message ());
delete packet;
break;
}
}
} }
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6)); m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6));
@ -234,8 +272,14 @@ namespace transport
} }
else else
{ {
LogPrint (eLogError, "SSU: v6 receive error: ", ecode.message ());
delete packet; delete packet;
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogError, "SSU: v6 receive error: ", ecode.message ());
m_SocketV6.close ();
OpenSocketV6 ();
ReceiveV6 ();
}
} }
} }
@ -298,9 +342,9 @@ namespace transport
return nullptr; return nullptr;
} }
void SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest) void SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest, bool v4only)
{ {
auto address = router->GetSSUAddress (!context.SupportsV6 ()); auto address = router->GetSSUAddress (v4only || !context.SupportsV6 ());
if (address) if (address)
CreateSession (router, address->host, address->port, peerTest); CreateSession (router, address->host, address->port, peerTest);
else else

4
SSU.h
View File

@ -42,7 +42,7 @@ namespace transport
~SSUServer (); ~SSUServer ();
void Start (); void Start ();
void Stop (); void Stop ();
void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false); void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false, bool v4only = false);
void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
const boost::asio::ip::address& addr, int port, bool peerTest = false); const boost::asio::ip::address& addr, int port, bool peerTest = false);
void CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest); void CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest);
@ -68,6 +68,8 @@ namespace transport
private: private:
void OpenSocket ();
void OpenSocketV6 ();
void Run (); void Run ();
void RunV6 (); void RunV6 ();
void RunReceivers (); void RunReceivers ();

View File

@ -527,13 +527,14 @@ namespace transport
if (m_SSUServer) if (m_SSUServer)
{ {
bool nat; i2p::config::GetOption("nat", nat); bool nat; i2p::config::GetOption("nat", nat);
if (nat) bool isv4 = i2p::context.SupportsV4 ();
if (nat && isv4)
i2p::context.SetStatus (eRouterStatusTesting); i2p::context.SetStatus (eRouterStatusTesting);
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
auto router = i2p::data::netdb.GetRandomPeerTestRouter (); auto router = i2p::data::netdb.GetRandomPeerTestRouter (isv4); // v4 only if v4
if (router && router->IsSSU (!context.SupportsV6 ())) if (router)
m_SSUServer->CreateSession (router, true); // peer test m_SSUServer->CreateSession (router, true, isv4); // peer test
else else
{ {
// if not peer test capable routers found pick any // if not peer test capable routers found pick any
@ -549,23 +550,25 @@ namespace transport
void Transports::PeerTest () void Transports::PeerTest ()
{ {
if (RoutesRestricted()) return; if (RoutesRestricted() || !i2p::context.SupportsV4 ()) return;
if (m_SSUServer) if (m_SSUServer)
{ {
bool statusChanged = false; bool statusChanged = false;
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
auto router = i2p::data::netdb.GetRandomPeerTestRouter (); auto router = i2p::data::netdb.GetRandomPeerTestRouter (true); // v4 only
if (router && router->IsSSU (!context.SupportsV6 ())) if (router)
{ {
if (!statusChanged) if (!statusChanged)
{ {
statusChanged = true; statusChanged = true;
i2p::context.SetStatus (eRouterStatusTesting); // first time only i2p::context.SetStatus (eRouterStatusTesting); // first time only
} }
m_SSUServer->CreateSession (router, true); // peer test m_SSUServer->CreateSession (router, true, true); // peer test v4
} }
} }
if (!statusChanged)
LogPrint (eLogWarning, "Can't find routers for peer test");
} }
} }