Merge branch 'master' of github-meeh420:orignal/i2pd

* 'master' of github-meeh420:orignal/i2pd:
  delete unreachable SSU sessions
  show number sent/received bytes through the status page
  resend
  fixed memory leak
  close connection to first hop of declined tunnel
  process extended data
  handle individaul bitfields ack
  Moving file list to a common one. Still in makefile format, but now with CPP/H as input instead of OBJECTS. Issue #82
  Removing Qt build file. Issue #82
  create inbound tunnel though outbound
  Makefile now detects which file to use (OSX or Linux) Issue #82
  Prepare support for POST/PUT
  close SSU session if not established
  send ack per fragment. temporary disble check for duplicated through IV
  fixed memory leak
  save out-of-sequence fragments
This commit is contained in:
Meeh 2014-07-21 20:09:14 +02:00
commit 45289891d6
18 changed files with 479 additions and 298 deletions

View File

@ -13,7 +13,7 @@ namespace i2p
namespace util
{
const std::string HTTPConnection::itoopieImage =
const std::string HTTPConnection::itoopieImage =
"<img alt=\"\" src=\"data:image/png;base64,"
"iVBORw0KGgoAAAANSUhEUgAAADYAAABECAYAAAG9qPaBAAAACXBIWXMAAA7DAAAOwwHHb6h"
"kAAAStElEQVR4nMU7CXRUVbL1ek0nTUI6BLNAICEIhESWQAAFgsH8D4g5IwSHg6PfEQ/E0T"
@ -108,8 +108,8 @@ namespace util
"13ZO0Q0SjbAu6DCkQs4FYg4EwKcOv4WsAFLePCb04Y/ZFCY2MjQ8uGE4cTx7A9f7/dk8r/B"
"+U0vclvzH+PAAAAAElFTkSuQmCC"
"\" />";
namespace misc_strings
namespace misc_strings
{
const char name_value_separator[] = { ':', ' ' };
@ -146,7 +146,7 @@ namespace util
buffers.push_back(boost::asio::buffer(misc_strings::crlf));
}
buffers.push_back(boost::asio::buffer(misc_strings::crlf));
}
}
buffers.push_back(boost::asio::buffer(content));
return buffers;
}
@ -154,9 +154,9 @@ namespace util
void HTTPConnection::Terminate ()
{
if (m_Stream)
{
{
m_Stream->Close ();
DeleteStream (m_Stream);
DeleteStream (m_Stream);
}
m_Socket->close ();
delete this;
@ -193,11 +193,11 @@ namespace util
{
b32 = address.substr (1, pos - 1); // excluding leading '/' to next '/'
uri = address.substr (pos); // rest of line
}
}
HandleDestinationRequest (b32, uri);
}
else
HandleDestinationRequest (b32, uri);
}
else
HandleRequest ();
}
@ -209,10 +209,10 @@ namespace util
char * http = strstr (get, "HTTP");
if (http)
return std::string (get + 4, http - get - 5);
}
}
return "";
}
}
void HTTPConnection::HandleWriteReply (const boost::system::error_code& ecode)
{
Terminate ();
@ -233,7 +233,7 @@ namespace util
FillContent (s);
s << "</html>";
SendReply (s.str ());
}
}
void HTTPConnection::FillContent (std::stringstream& s)
{
@ -241,7 +241,7 @@ namespace util
s << "Our external address:" << "<BR>";
for (auto& address : i2p::context.GetRouterInfo().GetAddresses())
{
switch (address.transportStyle)
switch (address.transportStyle)
{
case i2p::data::RouterInfo::eTransportNTCP:
s << "NTCP&nbsp;&nbsp;";
@ -257,31 +257,31 @@ namespace util
s << "<BR>Routers: " << i2p::data::netdb.GetNumRouters () << " ";
s << "Floodfills: " << i2p::data::netdb.GetNumFloodfills () << " ";
s << "LeaseSets: " << i2p::data::netdb.GetNumLeaseSets () << "<BR>";
s << "<P>Tunnels</P>";
for (auto it: i2p::tunnel::tunnels.GetOutboundTunnels ())
{
{
it->GetTunnelConfig ()->Print (s);
if (it->GetTunnelPool () && !it->GetTunnelPool ()->IsExploratory ())
s << " " << "Pool";
if (it->IsFailed ())
s << " " << "Failed";
s << " " << (int)it->GetNumSentBytes () << "<BR>";
}
}
for (auto it: i2p::tunnel::tunnels.GetInboundTunnels ())
{
{
it.second->GetTunnelConfig ()->Print (s);
if (it.second->GetTunnelPool () && !it.second->GetTunnelPool ()->IsExploratory ())
s << " " << "Pool";
if (it.second->IsFailed ())
s << " " << "Failed";
s << " " << (int)it.second->GetNumReceivedBytes () << "<BR>";
}
}
s << "<P>Transit tunnels</P>";
for (auto it: i2p::tunnel::tunnels.GetTransitTunnels ())
{
{
if (dynamic_cast<i2p::tunnel::TransitTunnelGateway *>(it.second))
s << it.second->GetTunnelID () << "-->";
else if (dynamic_cast<i2p::tunnel::TransitTunnelEndpoint *>(it.second))
@ -289,23 +289,24 @@ namespace util
else
s << "-->" << it.second->GetTunnelID () << "-->";
s << " " << it.second->GetNumTransmittedBytes () << "<BR>";
}
}
s << "<P>Transports</P>";
s << "NTCP<BR>";
for (auto it: i2p::transports.GetNTCPSessions ())
{
{
// RouterInfo of incoming connection doesn't have address
bool outgoing = it.second->GetRemoteRouterInfo ().GetNTCPAddress ();
if (it.second->IsEstablished ())
{
if (outgoing) s << "-->";
s << it.second->GetRemoteRouterInfo ().GetIdentHashAbbreviation () << ": "
s << it.second->GetRemoteRouterInfo ().GetIdentHashAbbreviation () << ": "
<< it.second->GetSocket ().remote_endpoint().address ().to_string ();
if (!outgoing) s << "-->";
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
s << "<BR>";
}
}
}
}
auto ssuServer = i2p::transports.GetSSUServer ();
if (ssuServer)
{
@ -318,13 +319,18 @@ namespace util
if (outgoing) s << "-->";
s << endpoint.address ().to_string () << ":" << endpoint.port ();
if (!outgoing) s << "-->";
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
s << "<BR>";
}
}
}
}
s << "<p><a href=\"zmw2cyw2vj7f6obx3msmdvdepdhnw2ctc4okza2zjxlukkdfckhq\">Flibusta</a></p>";
}
}
void HTTPConnection::HandleDestinationRequest (const std::string& address, const std::string& uri)
{
HandleDestinationRequest(address, "GET", "", uri);
}
void HTTPConnection::HandleDestinationRequest (const std::string& address, const std::string& method, const std::string& data, const std::string& uri)
{
i2p::data::IdentHash destination;
std::string fullAddress;
@ -363,7 +369,7 @@ namespace util
fullAddress = address + ".b32.i2p";
}
}
auto leaseSet = i2p::data::netdb.FindLeaseSet (destination);
if (!leaseSet || !leaseSet->HasNonExpiredLeases ())
{
@ -374,23 +380,29 @@ namespace util
{
SendReply (leaseSet ? "<html>" + itoopieImage + "<br>Leases expired</html>" : "<html>" + itoopieImage + "LeaseSet not found</html>", 504);
return;
}
}
}
if (!m_Stream)
if (!m_Stream)
m_Stream = i2p::stream::CreateStream (*leaseSet);
if (m_Stream)
{
std::string request = "GET " + uri + " HTTP/1.1\n Host:" + fullAddress + "\n";
m_Stream->Send ((uint8_t *)request.c_str (), request.length (), 10);
std::string request = method+" " + uri + " HTTP/1.1\n Host:" + fullAddress + "\r\n";
if (!strcmp(method.c_str(), "GET"))
{
// POST/PUT, apply body
request += "\r\n"+ data;
}
LogPrint("HTTP Client Request: ", request);
m_Stream->Send ((uint8_t *)request.c_str (), request.length (), 10);
AsyncStreamReceive ();
}
}
}
}
void HTTPConnection::AsyncStreamReceive ()
{
if (m_Stream)
m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, 8192),
boost::bind (&HTTPConnection::HandleStreamReceive, this,
boost::bind (&HTTPConnection::HandleStreamReceive, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
45); // 45 seconds timeout
}
@ -408,7 +420,7 @@ namespace util
SendReply ("<html>" + itoopieImage + "<br>Not responding</html>", 504);
else
Terminate ();
}
}
}
void HTTPConnection::SendReply (const std::string& content, int status)
@ -421,16 +433,16 @@ namespace util
m_Reply.headers[1].value = "text/html";
boost::asio::async_write (*m_Socket, m_Reply.to_buffers(status),
boost::bind (&HTTPConnection::HandleWriteReply, this,
boost::bind (&HTTPConnection::HandleWriteReply, this,
boost::asio::placeholders::error));
}
HTTPServer::HTTPServer (int port):
m_Thread (nullptr), m_Work (m_Service),
HTTPServer::HTTPServer (int port):
m_Thread (nullptr), m_Work (m_Service),
m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)),
m_NewSocket (nullptr)
{
}
HTTPServer::~HTTPServer ()
@ -450,17 +462,17 @@ namespace util
m_Acceptor.close();
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
}
void HTTPServer::Run ()
{
m_Service.run ();
}
}
void HTTPServer::Accept ()
{
@ -476,7 +488,7 @@ namespace util
CreateConnection(m_NewSocket); // new HTTPConnection(m_NewSocket);
Accept ();
}
}
}
void HTTPServer::CreateConnection(boost::asio::ip::tcp::socket * m_NewSocket)
{

View File

@ -14,13 +14,13 @@ namespace util
class HTTPConnection
{
protected:
struct header
{
std::string name;
std::string value;
};
struct request
{
std::string method;
@ -38,7 +38,7 @@ namespace util
std::vector<boost::asio::const_buffer> to_buffers (int status);
};
public:
HTTPConnection (boost::asio::ip::tcp::socket * socket): m_Socket (socket), m_Stream (nullptr) { Receive (); };
@ -48,9 +48,9 @@ namespace util
void Terminate ();
void Receive ();
void HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void AsyncStreamReceive ();
void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleWriteReply(const boost::system::error_code& ecode);
void HandleWrite (const boost::system::error_code& ecode);
void SendReply (const std::string& content, int status = 200);
@ -58,9 +58,9 @@ namespace util
void HandleRequest ();
void FillContent (std::stringstream& s);
std::string ExtractAddress ();
protected:
boost::asio::ip::tcp::socket * m_Socket;
i2p::stream::Stream * m_Stream;
char m_Buffer[8192], m_StreamBuffer[8192];
@ -68,14 +68,16 @@ namespace util
reply m_Reply;
protected:
virtual void HandleDestinationRequest(const std::string& address, const std::string& uri);
virtual void HandleDestinationRequest(const std::string& address, const std::string& method, const std::string& data, const std::string& uri);
virtual void RunRequest ();
private:
static const std::string itoopieImage;
};
};
class HTTPServer
{
@ -89,9 +91,9 @@ namespace util
private:
void Run ();
void Run ();
void Accept ();
void HandleAccept(const boost::system::error_code& ecode);
void HandleAccept(const boost::system::error_code& ecode);
private:
@ -103,7 +105,7 @@ namespace util
protected:
virtual void CreateConnection(boost::asio::ip::tcp::socket * m_NewSocket);
};
};
}
}

View File

@ -359,6 +359,7 @@ namespace i2p
else
{
LogPrint ("Outbound tunnel ", tunnel->GetTunnelID (), " has been declined");
i2p::transports.CloseSession (tunnel->GetTunnelConfig ()->GetFirstHop ()->router);
delete tunnel;
}
}

View File

@ -1,38 +1,9 @@
CC = g++
CFLAGS = -g -Wall -std=c++0x
OBJECTS = obj/CryptoConst.o obj/base64.o obj/NTCPSession.o obj/RouterInfo.o obj/Transports.o \
obj/RouterContext.o obj/NetDb.o obj/LeaseSet.o obj/Tunnel.o obj/TunnelEndpoint.o \
obj/TunnelGateway.o obj/TransitTunnel.o obj/I2NPProtocol.o obj/Log.o obj/Garlic.o \
obj/HTTPServer.o obj/Streaming.o obj/Identity.o obj/SSU.o obj/util.o obj/Reseed.o \
obj/UPnP.o obj/TunnelPool.o obj/HTTPProxy.o obj/AddressBook.o obj/Daemon.o \
obj/DaemonLinux.o obj/SSUData.o obj/i2p.o obj/aes.o obj/SOCKS.o
INCFLAGS =
LDFLAGS = -Wl,-rpath,/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem -lboost_regex -lboost_program_options -lpthread
LIBS =
UNAME := $(shell uname -s)
#check if AES-NI is supported by CPU
ifneq ($(shell grep -c aes /proc/cpuinfo),0)
CPU_FLAGS = -DAESNI
ifeq ($(UNAME),Darwin)
include Makefile.osx
else
include Makefile.linux
endif
all: obj i2p
i2p: $(OBJECTS:obj/%=obj/%)
$(CC) -o $@ $^ $(LDFLAGS) $(LIBS)
.SUFFIXES:
.SUFFIXES: .c .cc .C .cpp .o
obj/%.o : %.cpp
$(CC) -o $@ $< -c $(CFLAGS) $(INCFLAGS) $(CPU_FLAGS)
obj:
mkdir -p obj
clean:
rm -fr obj i2p
.PHONY: all
.PHONY: clean

33
Makefile.linux Normal file
View File

@ -0,0 +1,33 @@
CC = g++
CFLAGS = -g -Wall -std=c++0x
include filelist.mk
INCFLAGS =
LDFLAGS = -Wl,-rpath,/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem -lboost_regex -lboost_program_options -lpthread
LIBS =
#check if AES-NI is supported by CPU
ifneq ($(shell grep -c aes /proc/cpuinfo),0)
CPU_FLAGS = -DAESNI
endif
all: obj i2p
i2p: $(OBJECTS:obj/%=obj/%)
$(CC) -o $@ $^ $(LDFLAGS) $(LIBS)
.SUFFIXES:
.SUFFIXES: .c .cc .C .cpp .o
obj/%.o : %.cpp
$(CC) -o $@ $< -c $(CFLAGS) $(INCFLAGS) $(CPU_FLAGS)
obj:
mkdir -p obj
clean:
rm -fr obj i2p
.PHONY: all
.PHONY: clean

View File

@ -1,11 +1,6 @@
CC = clang++
CFLAGS = -g -Wall -std=c++11 -lstdc++ -I/usr/local/include
OBJECTS = obj/CryptoConst.o obj/base64.o obj/NTCPSession.o obj/RouterInfo.o obj/Transports.o \
obj/RouterContext.o obj/NetDb.o obj/LeaseSet.o obj/Tunnel.o obj/TunnelEndpoint.o \
obj/TunnelGateway.o obj/TransitTunnel.o obj/I2NPProtocol.o obj/Log.o obj/Garlic.o \
obj/HTTPServer.o obj/Streaming.o obj/Identity.o obj/SSU.o obj/util.o obj/Reseed.o \
obj/UPnP.o obj/TunnelPool.o obj/HTTPProxy.o obj/AddressBook.o obj/Daemon.o \
obj/DaemonLinux.o obj/SSUData.o obj/i2p.o obj/aes.o obj/SOCKS.o
include filelist.mk
INCFLAGS = -DCRYPTOPP_DISABLE_ASM
LDFLAGS = -Wl,-rpath,/usr/local/lib -L/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem -lboost_regex -lboost_program_options -lpthread
LIBS =

View File

@ -21,15 +21,15 @@ namespace ntcp
{
NTCPSession::NTCPSession (boost::asio::io_service& service, i2p::data::RouterInfo& in_RemoteRouterInfo):
m_Socket (service), m_TerminationTimer (service), m_IsEstablished (false),
m_RemoteRouterInfo (in_RemoteRouterInfo), m_ReceiveBufferOffset (0), m_NextMessage (nullptr)
m_RemoteRouterInfo (in_RemoteRouterInfo), m_ReceiveBufferOffset (0), m_NextMessage (nullptr),
m_NumSentBytes (0), m_NumReceivedBytes (0)
{
m_DHKeysPair = i2p::transports.GetNextDHKeysPair ();
m_DHKeysPair = i2p::transports.GetNextDHKeysPair ();
}
NTCPSession::~NTCPSession ()
{
delete m_DHKeysPair;
delete m_NextMessage;
}
void NTCPSession::CreateAESKey (uint8_t * pubKey, uint8_t * aesKey)
@ -403,7 +403,7 @@ namespace ntcp
}
else
{
LogPrint ("Received: ", bytes_transferred);
m_NumReceivedBytes += bytes_transferred;
m_ReceiveBufferOffset += bytes_transferred;
if (m_ReceiveBufferOffset >= 16)
@ -514,7 +514,7 @@ namespace ntcp
}
else
{
LogPrint ("Msg sent: ", bytes_transferred);
m_NumSentBytes += bytes_transferred;
ScheduleTermination (); // reset termination timer
}
}

View File

@ -78,6 +78,9 @@ namespace ntcp
void ClientLogin ();
void ServerLogin ();
void SendI2NPMessage (I2NPMessage * msg);
size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
protected:
@ -142,6 +145,8 @@ namespace ntcp
i2p::I2NPMessage * m_NextMessage;
std::list<i2p::I2NPMessage *> m_DelayedMessages;
size_t m_NextMessageOffset;
size_t m_NumSentBytes, m_NumReceivedBytes;
};
class NTCPClient: public NTCPSession

21
SSU.cpp
View File

@ -19,7 +19,8 @@ namespace ssu
const i2p::data::RouterInfo * router, bool peerTest ):
m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_RemoteRouter (router),
m_Timer (m_Server.GetService ()), m_PeerTest (peerTest), m_State (eSessionStateUnknown),
m_IsSessionKey (false), m_RelayTag (0), m_Data (*this)
m_IsSessionKey (false), m_RelayTag (0), m_Data (*this),
m_NumSentBytes (0), m_NumReceivedBytes (0)
{
m_DHKeysPair = i2p::transports.GetNextDHKeysPair ();
}
@ -74,6 +75,7 @@ namespace ssu
void SSUSession::ProcessNextMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint)
{
m_NumReceivedBytes += len;
if (m_State == eSessionStateIntroduced)
{
// HolePunch received
@ -83,11 +85,12 @@ namespace ssu
}
else
{
ScheduleTermination ();
// check for duplicate
if (m_State == eSessionStateEstablished)
ScheduleTermination ();
/* // check for duplicate
const uint8_t * iv = ((SSUHeader *)buf)->iv;
if (m_ReceivedIVs.count (iv)) return; // duplicate detected
m_ReceivedIVs.insert (iv);
m_ReceivedIVs.insert (iv);*/
if (m_IsSessionKey && Validate (buf, len, m_MacKey)) // try session key first
DecryptSessionKey (buf, len);
@ -652,7 +655,6 @@ namespace ssu
if (m_State != eSessionStateFailed)
{
m_State = eSessionStateFailed;
Close ();
m_Server.DeleteSession (this); // delete this
}
}
@ -822,6 +824,7 @@ namespace ssu
// encrypt message with session key
FillHeaderAndEncrypt (PAYLOAD_TYPE_SESSION_DESTROYED, buf, 48);
Send (buf, 48);
LogPrint ("SSU session destoryed sent");
}
}
@ -842,6 +845,7 @@ namespace ssu
void SSUSession::Send (const uint8_t * buf, size_t size)
{
m_NumSentBytes += size;
m_Server.Send (buf, size, m_RemoteEndpoint);
}
@ -910,7 +914,6 @@ namespace ssu
void SSUServer::Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to)
{
m_Socket.send_to (boost::asio::buffer (buf, len), to);
LogPrint ("SSU sent ", len, " bytes");
}
void SSUServer::Receive ()
@ -923,7 +926,6 @@ namespace ssu
{
if (!ecode)
{
LogPrint ("SSU received ", bytes_transferred, " bytes");
SSUSession * session = nullptr;
auto it = m_Sessions.find (m_SenderEndpoint);
if (it != m_Sessions.end ())
@ -1020,7 +1022,12 @@ namespace ssu
introducerSession->Introduce (introducer->iTag, introducer->iKey);
}
else
{
LogPrint ("Router is unreachable, but no introducers presented. Ignored");
m_Sessions.erase (remoteEndpoint);
delete session;
session = nullptr;
}
}
}
}

7
SSU.h
View File

@ -31,7 +31,6 @@ namespace ssu
};
#pragma pack()
const size_t SSU_MTU = 1484;
const int SSU_CONNECT_TIMEOUT = 5; // 5 seconds
const int SSU_TERMINATION_TIMEOUT = 330; // 5.5 minutes
@ -74,7 +73,10 @@ namespace ssu
void SendPeerTest (); // Alice
SessionState GetState () const { return m_State; };
size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
private:
void CreateAESandMacKey (const uint8_t * pubKey);
@ -132,6 +134,7 @@ namespace ssu
std::list<i2p::I2NPMessage *> m_DelayedMessages;
std::set<IV> m_ReceivedIVs;
SSUData m_Data;
size_t m_NumSentBytes, m_NumReceivedBytes;
};
class SSUServer

View File

@ -1,4 +1,7 @@
#include <stdlib.h>
#include <boost/bind.hpp>
#include "Log.h"
#include "Timestamp.h"
#include "SSU.h"
#include "SSUData.h"
@ -7,7 +10,7 @@ namespace i2p
namespace ssu
{
SSUData::SSUData (SSUSession& session):
m_Session (session)
m_Session (session), m_ResendTimer (session.m_Server.GetService ())
{
}
@ -20,10 +23,7 @@ namespace ssu
delete it.second;
}
for (auto it: m_SentMessages)
{
for (auto f: it.second)
delete[] f;
}
delete it.second;
}
void SSUData::ProcessSentMessageAck (uint32_t msgID)
@ -31,19 +31,15 @@ namespace ssu
auto it = m_SentMessages.find (msgID);
if (it != m_SentMessages.end ())
{
// delete all ack-ed message's fragments
for (auto f: it->second)
delete[] f;
delete it->second;
m_SentMessages.erase (it);
if (m_SentMessages.empty ())
m_ResendTimer.cancel ();
}
}
void SSUData::ProcessMessage (uint8_t * buf, size_t len)
void SSUData::ProcessAcks (uint8_t *& buf, uint8_t flag)
{
//uint8_t * start = buf;
uint8_t flag = *buf;
buf++;
LogPrint ("Process SSU data flags=", (int)flag);
if (flag & DATA_FLAG_EXPLICIT_ACKS_INCLUDED)
{
// explicit ACKs
@ -60,13 +56,45 @@ namespace ssu
buf++;
for (int i = 0; i < numBitfields; i++)
{
uint32_t msgID = be32toh (*(uint32_t *)buf);
buf += 4; // msgID
// TODO: process individual Ack bitfields
while (*buf & 0x80) // not last
auto it = m_SentMessages.find (msgID);
// process individual Ack bitfields
bool isNonLast = false;
int fragment = 0;
do
{
uint8_t bitfield = *buf;
isNonLast = bitfield & 0x80;
bitfield &= 0x7F; // clear MSB
if (bitfield && it != m_SentMessages.end ())
{
int numSentFragments = it->second->fragments.size ();
// process bits
uint8_t mask = 0x40;
for (int j = 0; j < 7; j++)
{
if (bitfield & mask)
{
if (fragment < numSentFragments)
{
delete it->second->fragments[fragment];
it->second->fragments[fragment] = nullptr;
}
}
fragment++;
mask >>= 1;
}
}
buf++;
buf++; // last byte
}
while (isNonLast);
}
}
}
}
void SSUData::ProcessFragments (uint8_t * buf)
{
uint8_t numFragments = *buf; // number of fragments
buf++;
for (int i = 0; i < numFragments; i++)
@ -82,77 +110,124 @@ namespace ssu
bool isLast = fragmentInfo & 0x010000; // bit 16
uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17
LogPrint ("SSU data fragment ", (int)fragmentNum, " of message ", msgID, " size=", (int)fragmentSize, isLast ? " last" : " non-last");
I2NPMessage * msg = nullptr;
if (fragmentNum > 0) // follow-up fragment
{
auto it = m_IncomleteMessages.find (msgID);
if (it != m_IncomleteMessages.end ())
{
if (fragmentNum == it->second->nextFragmentNum)
{
// expected fragment
msg = it->second->msg;
memcpy (msg->buf + msg->len, buf, fragmentSize);
msg->len += fragmentSize;
it->second->nextFragmentNum++;
}
else if (fragmentNum < it->second->nextFragmentNum)
// duplicate fragment
LogPrint ("Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ". Ignored");
else
{
// missing fragment
LogPrint ("Missing fragments from ", it->second->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID);
//TODO
}
if (isLast)
{
if (!msg)
DeleteI2NPMessage (it->second->msg);
delete it->second;
m_IncomleteMessages.erase (it);
}
}
else
// TODO:
LogPrint ("Unexpected follow-on fragment ", (int)fragmentNum, " of message ", msgID);
}
else // first fragment
{
msg = NewI2NPMessage ();
memcpy (msg->GetSSUHeader (), buf, fragmentSize);
msg->len += fragmentSize - sizeof (I2NPHeaderShort);
}
if (msg)
{
if (!fragmentNum && !isLast)
m_IncomleteMessages[msgID] = new IncompleteMessage (msg);
if (isLast)
// find message with msgID
I2NPMessage * msg = nullptr;
IncompleteMessage * incompleteMessage = nullptr;
auto it = m_IncomleteMessages.find (msgID);
if (it != m_IncomleteMessages.end ())
{
// message exists
incompleteMessage = it->second;
msg = incompleteMessage->msg;
}
else
{
// create new message
msg = NewI2NPMessage ();
msg->len -= sizeof (I2NPHeaderShort);
incompleteMessage = new IncompleteMessage (msg);
m_IncomleteMessages[msgID] = incompleteMessage;
}
// handle current fragment
if (fragmentNum == incompleteMessage->nextFragmentNum)
{
// expected fragment
memcpy (msg->buf + msg->len, buf, fragmentSize);
msg->len += fragmentSize;
incompleteMessage->nextFragmentNum++;
if (!isLast && !incompleteMessage->savedFragments.empty ())
{
SendMsgAck (msgID);
msg->FromSSU (msgID);
if (m_Session.GetState () == eSessionStateEstablished)
i2p::HandleI2NPMessage (msg);
else
// try saved fragments
for (auto it1 = incompleteMessage->savedFragments.begin (); it1 != incompleteMessage->savedFragments.end ();)
{
// we expect DeliveryStatus
if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus)
auto savedFragment = *it1;
if (savedFragment->fragmentNum == incompleteMessage->nextFragmentNum)
{
LogPrint ("SSU session established");
m_Session.Established ();
}
memcpy (msg->buf + msg->len, savedFragment->buf, savedFragment->len);
msg->len += savedFragment->len;
isLast = savedFragment->isLast;
incompleteMessage->nextFragmentNum++;
incompleteMessage->savedFragments.erase (it1++);
delete savedFragment;
}
else
LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID);
DeleteI2NPMessage (msg);
break;
}
if (isLast)
LogPrint ("Message ", msgID, " complete");
}
}
else
{
if (fragmentNum < incompleteMessage->nextFragmentNum)
// duplicate fragment
LogPrint ("Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ". Ignored");
else
{
// missing fragment
LogPrint ("Missing fragments from ", (int)incompleteMessage->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID);
auto savedFragment = new Fragment (fragmentNum, buf, fragmentSize, isLast);
if (!incompleteMessage->savedFragments.insert (savedFragment).second)
{
LogPrint ("Fragment ", (int)fragmentNum, " of message ", msgID, " already saved");
delete savedFragment;
}
}
}
isLast = false;
}
if (isLast)
{
// delete incomplete message
delete incompleteMessage;
m_IncomleteMessages.erase (msgID);
// process message
SendMsgAck (msgID);
msg->FromSSU (msgID);
if (m_Session.GetState () == eSessionStateEstablished)
i2p::HandleI2NPMessage (msg);
else
{
// we expect DeliveryStatus
if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus)
{
LogPrint ("SSU session established");
m_Session.Established ();
}
else
LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID);
DeleteI2NPMessage (msg);
}
}
else
SendFragmentAck (msgID, fragmentNum);
buf += fragmentSize;
}
}
void SSUData::ProcessMessage (uint8_t * buf, size_t len)
{
//uint8_t * start = buf;
uint8_t flag = *buf;
buf++;
LogPrint ("Process SSU data flags=", (int)flag);
// process acks if presented
if (flag & (DATA_FLAG_ACK_BITFIELDS_INCLUDED | DATA_FLAG_EXPLICIT_ACKS_INCLUDED))
ProcessAcks (buf, flag);
// extended data if presented
if (flag & DATA_FLAG_EXTENDED_DATA_INCLUDED)
{
uint8_t extendedDataSize = *buf;
buf++; // size
LogPrint ("SSU extended data of ", extendedDataSize, " bytes presented");
buf += extendedDataSize;
}
// process data
ProcessFragments (buf);
}
void SSUData::Send (i2p::I2NPMessage * msg)
{
uint32_t msgID = msg->ToSSU ();
@ -161,8 +236,14 @@ namespace ssu
LogPrint ("SSU message ", msgID, " already sent");
DeleteI2NPMessage (msg);
return;
}
auto fragments = m_SentMessages[msgID];
}
if (m_SentMessages.empty ()) // schedule resend at first message only
ScheduleResend ();
SentMessage * sentMessage = new SentMessage;
m_SentMessages[msgID] = sentMessage;
sentMessage->nextResendTime = i2p::util::GetSecondsSinceEpoch () + RESEND_INTERVAL;
sentMessage->numResends = 0;
auto& fragments = sentMessage->fragments;
msgID = htobe32 (msgID);
size_t payloadSize = SSU_MTU - sizeof (SSUHeader) - 9; // 9 = flag + #frg(1) + messageID(4) + frag info (3)
size_t len = msg->GetLength ();
@ -171,8 +252,9 @@ namespace ssu
uint32_t fragmentNum = 0;
while (len > 0)
{
uint8_t * buf = new uint8_t[SSU_MTU + 18];
fragments.push_back (buf);
Fragment * fragment = new Fragment;
uint8_t * buf = fragment->buf;
fragments.push_back (fragment);
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_WANT_REPLY; // for compatibility
payload++;
@ -195,6 +277,7 @@ namespace ssu
size += payload - buf;
if (size & 0x0F) // make sure 16 bytes boundary
size = ((size >> 4) + 1) << 4; // (/16 + 1)*16
fragment->len = size;
// encrypt message with session key
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, size);
@ -228,6 +311,77 @@ namespace ssu
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48);
m_Session.Send (buf, 48);
}
void SSUData::SendFragmentAck (uint32_t msgID, int fragmentNum)
{
if (fragmentNum > 64)
{
LogPrint ("Fragment number ", fragmentNum, " exceeds 64");
return;
}
uint8_t buf[64 + 18];
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_ACK_BITFIELDS_INCLUDED; // flag
payload++;
*payload = 1; // number of ACK bitfields
payload++;
// one ack
*(uint32_t *)(payload) = htobe32 (msgID); // msgID
payload += 4;
div_t d = div (fragmentNum, 7);
memset (payload, 0x80, d.quot); // 0x80 means non-last
payload += d.quot;
*payload = 0x40 >> d.rem; // set corresponding bit
payload++;
*payload = 0; // number of fragments
size_t len = d.quot < 4 ? 48 : 64; // 48 = 37 + 7 + 4 (3+1)
// encrypt message with session key
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, len);
m_Session.Send (buf, len);
}
void SSUData::ScheduleResend()
{
m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL));
m_ResendTimer.async_wait (boost::bind (&SSUData::HandleResendTimer,
this, boost::asio::placeholders::error));
}
void SSUData::HandleResendTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_SentMessages.begin (); it != m_SentMessages.end ();)
{
if (ts >= it->second->nextResendTime)
{
bool isEmpty = true;
for (auto f: it->second->fragments)
if (f)
{
isEmpty = false;
m_Session.Send (f->buf, f->len); // resend
}
it->second->numResends++;
if (isEmpty || it->second->numResends >= MAX_NUM_RESENDS)
{
delete it->second;
it = m_SentMessages.erase (it);
}
else
it++;
}
else
it++;
}
if (!m_SentMessages.empty ())
ScheduleResend ();
}
}
}
}

View File

@ -2,8 +2,11 @@
#define SSU_DATA_H__
#include <inttypes.h>
#include <string.h>
#include <map>
#include <vector>
#include <set>
#include <boost/asio.hpp>
#include "I2NPProtocol.h"
namespace i2p
@ -11,6 +14,9 @@ namespace i2p
namespace ssu
{
const size_t SSU_MTU = 1484;
const int RESEND_INTERVAL = 3; // in seconds
const int MAX_NUM_RESENDS = 5;
// data flags
const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02;
const uint8_t DATA_FLAG_WANT_REPLY = 0x04;
@ -19,6 +25,45 @@ namespace ssu
const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40;
const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80;
struct Fragment
{
int fragmentNum;
size_t len;
bool isLast;
uint8_t buf[SSU_MTU + 18];
Fragment () = default;
Fragment (int n, const uint8_t * b, int l, bool last):
fragmentNum (n), len (l), isLast (last) { memcpy (buf, b, len); };
};
struct FragmentCmp
{
bool operator() (const Fragment * f1, const Fragment * f2) const
{
return f1->fragmentNum < f2->fragmentNum;
};
};
struct IncompleteMessage
{
I2NPMessage * msg;
int nextFragmentNum;
std::set<Fragment *, FragmentCmp> savedFragments;
IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (0) {};
~IncompleteMessage () { for (auto it: savedFragments) { delete it; }; };
};
struct SentMessage
{
std::vector<Fragment *> fragments;
uint32_t nextResendTime; // in seconds
int numResends;
~SentMessage () { for (auto it: fragments) { delete it; }; };
};
class SSUSession;
class SSUData
{
@ -33,21 +78,20 @@ namespace ssu
private:
void SendMsgAck (uint32_t msgID);
void ProcessSentMessageAck (uint32_t msgID);
void SendFragmentAck (uint32_t msgID, int fragmentNum);
void ProcessAcks (uint8_t *& buf, uint8_t flag);
void ProcessFragments (uint8_t * buf);
void ProcessSentMessageAck (uint32_t msgID);
private:
struct IncompleteMessage
{
I2NPMessage * msg;
uint8_t nextFragmentNum;
IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (1) {};
};
void ScheduleResend ();
void HandleResendTimer (const boost::system::error_code& ecode);
private:
SSUSession& m_Session;
std::map<uint32_t, IncompleteMessage *> m_IncomleteMessages;
std::map<uint32_t, std::vector<uint8_t *> > m_SentMessages; // msgID -> fragments
std::map<uint32_t, SentMessage *> m_SentMessages;
boost::asio::deadline_timer m_ResendTimer;
};
}
}

View File

@ -263,6 +263,23 @@ namespace i2p
}
}
void Transports::CloseSession (const i2p::data::RouterInfo * router)
{
if (!router) return;
m_Service.post (boost::bind (&Transports::PostCloseSession, this, router));
}
void Transports::PostCloseSession (const i2p::data::RouterInfo * router)
{
auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (router) : nullptr;
if (ssuSession) // try SSU first
{
m_SSUServer->DeleteSession (ssuSession);
LogPrint ("SSU session closed");
}
// TODO: delete NTCP
}
void Transports::DetectExternalIP ()
{
for (int i = 0; i < 5; i ++)

View File

@ -63,13 +63,15 @@ namespace i2p
i2p::ntcp::NTCPSession * FindNTCPSession (const i2p::data::IdentHash& ident);
void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void CloseSession (const i2p::data::RouterInfo * router);
private:
void Run ();
void HandleAccept (i2p::ntcp::NTCPServerConnection * conn, const boost::system::error_code& error);
void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void PostCloseSession (const i2p::data::RouterInfo * router);
void DetectExternalIP ();
private:

View File

@ -361,10 +361,12 @@ namespace tunnel
void Tunnels::ManageTunnels ()
{
// check pending tunnel. if something is still there, wipe it out
// because it wouldn't be reponded anyway
// because it wouldn't be responded anyway
for (auto& it : m_PendingTunnels)
{
LogPrint ("Pending tunnel build request ", it.first, " has not been responded. Deleted");
if (it.second->GetTunnelConfig ()->GetFirstHop ()->isGateway) // outbound
i2p::transports.CloseSession (it.second->GetTunnelConfig ()->GetFirstHop ()->router);
delete it.second;
}
m_PendingTunnels.clear ();

View File

@ -188,7 +188,7 @@ namespace tunnel
hops.push_back (hop);
}
std::reverse (hops.begin (), hops.end ());
auto * tunnel = tunnels.CreateTunnel<InboundTunnel> (new TunnelConfig (hops));
auto * tunnel = tunnels.CreateTunnel<InboundTunnel> (new TunnelConfig (hops), outboundTunnel);
tunnel->SetTunnelPool (this);
}

View File

@ -1,85 +0,0 @@
TEMPLATE = app
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qt
TARGET = ./../../i2pd_qt
QMAKE_CXXFLAGS += -std=c++0x
LIBS += -lcrypto++
LIBS += \
-lboost_system\
-lboost_filesystem\
-lboost_regex\
-lboost_program_options\
-lpthread
SOURCES += \
../LeaseSet.cpp \
../i2p.cpp \
../HTTPServer.cpp \
../HTTPProxy.cpp \
../Garlic.cpp \
../base64.cpp \
../AddressBook.cpp \
../util.cpp \
../UPnP.cpp \
../TunnelPool.cpp \
../TunnelGateway.cpp \
../TunnelEndpoint.cpp \
../Tunnel.cpp \
../Transports.cpp \
../TransitTunnel.cpp \
../Streaming.cpp \
../SSU.cpp \
../RouterInfo.cpp \
../RouterContext.cpp \
../Reseed.cpp \
../NTCPSession.cpp \
../NetDb.cpp \
../Log.cpp \
../Identity.cpp \
../I2NPProtocol.cpp \
../SOCKS.cpp
HEADERS += \
../LeaseSet.h \
../Identity.h \
../HTTPServer.h \
../HTTPProxy.h \
../hmac.h \
../Garlic.h \
../ElGamal.h \
../CryptoConst.h \
../base64.h \
../AddressBook.h \
../util.h \
../UPnP.h \
../TunnelPool.h \
../TunnelGateway.h \
../TunnelEndpoint.h \
../TunnelConfig.h \
../TunnelBase.h \
../Tunnel.h \
../Transports.h \
../TransitTunnel.h \
../Timestamp.h \
../Streaming.h \
../SSU.h \
../RouterInfo.h \
../RouterContext.h \
../Reseed.h \
../Queue.h \
../NTCPSession.h \
../NetDb.h \
../Log.h \
../LittleBigEndian.h \
../I2PEndian.h \
../I2NPProtocol.h \
../SOCKS.h
OTHER_FILES += \
../README.md \
../Makefile \
../LICENSE

18
filelist.mk Normal file
View File

@ -0,0 +1,18 @@
CPP_FILES := CryptoConst.cpp base64.cpp NTCPSession.cpp RouterInfo.cpp Transports.cpp \
RouterContext.cpp NetDb.cpp LeaseSet.cpp Tunnel.cpp TunnelEndpoint.cpp TunnelGateway.cpp \
TransitTunnel.cpp I2NPProtocol.cpp Log.cpp Garlic.cpp HTTPServer.cpp Streaming.cpp Identity.cpp \
SSU.cpp util.cpp Reseed.cpp DaemonLinux.cpp SSUData.cpp i2p.cpp aes.cpp SOCKS.cpp UPnP.cpp \
TunnelPool.cpp HTTPProxy.cpp AddressBook.cpp Daemon.cpp
H_FILES := CryptoConst.h base64.h NTCPSession.h RouterInfo.h Transports.h \
RouterContext.h NetDb.h LeaseSet.h Tunnel.h TunnelEndpoint.h TunnelGateway.h \
TransitTunnel.h I2NPProtocol.h Log.h Garlic.h HTTPServer.h Streaming.h Identity.h \
SSU.h util.h Reseed.h DaemonLinux.h SSUData.h i2p.h aes.h SOCKS.h UPnP.h TunnelPool.h \
HTTPProxy.h AddressBook.h Daemon.h
OBJECTS = $(addprefix obj/, $(notdir $(CPP_FILES:.cpp=.o)))