diff --git a/HTTPProxy.cpp b/HTTPProxy.cpp new file mode 100644 index 00000000..810e5a18 --- /dev/null +++ b/HTTPProxy.cpp @@ -0,0 +1,270 @@ +#include +#include + +#include "base64.h" +#include "Log.h" +#include "Tunnel.h" +#include "TransitTunnel.h" +#include "Transports.h" +#include "NetDb.h" +#include "Streaming.h" +#include "HTTPProxy.h" + +namespace i2p +{ +namespace proxy +{ + namespace misc_strings + { + + const char name_value_separator[] = { ':', ' ' }; + const char crlf[] = { '\r', '\n' }; + + } // namespace misc_strings + + std::vector HTTPConnection::reply::to_buffers() + { + std::vector buffers; + if (headers.size () > 0) + { + buffers.push_back (boost::asio::buffer ("HTTP/1.0 200 OK\r\n")); // always OK + for (std::size_t i = 0; i < headers.size(); ++i) + { + header& h = headers[i]; + buffers.push_back(boost::asio::buffer(h.name)); + buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator)); + buffers.push_back(boost::asio::buffer(h.value)); + buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } + buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } + buffers.push_back(boost::asio::buffer(content)); + return buffers; + } + + void HTTPConnection::Terminate () + { + m_Socket->close (); + delete this; + } + + void HTTPConnection::Receive () + { + m_Socket->async_read_some (boost::asio::buffer (m_Buffer, 8192), + boost::bind(&HTTPConnection::HandleReceive, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + + void HTTPConnection::HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) + { + if (!ecode) + { + m_Buffer[bytes_transferred] = 0; + + std::pair requestInfo = ExtractRequest (); + request m_Request; + parseHeaders (m_Buffer, m_Request.headers); + + LogPrint("Requesting ", requestInfo.first, " with path ", requestInfo.second); + HandleDestinationRequest (requestInfo.first, requestInfo.second); + + boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), + boost::bind (&HTTPConnection::HandleWrite, this, + boost::asio::placeholders::error)); + //Receive (); + } + else if (ecode != boost::asio::error::operation_aborted) + Terminate (); + } + + void HTTPConnection::parseHeaders(const std::string& h, std::vector
& hm) { + std::string str (h); + std::string::size_type idx; + std::string t; + int i = 0; + while( (idx=str.find ("\r\n")) != std::string::npos) { + t=str.substr (0,idx); + str.erase (0,idx+2); + if (t == "") + break; + idx=t.find(": "); + if (idx == std::string::npos) + { + std::cout << "Bad header line: " << t << std::endl; + break; + } + LogPrint ("Name: ", t.substr (0,idx), " Value: ", t.substr (idx+2)); + hm[i].name = t.substr (0,idx); + hm[i].value = t.substr (idx+2); + i++; + } + } + + // TODO: Support other requests than GET. + std::pair HTTPConnection::ExtractRequest () + { + char * get = strstr (m_Buffer, "GET"); + if (get) + { + char * http = strstr (get, "HTTP"); + if (http) + { + std::string url (get + 4, http - get - 5); + size_t sp = url.find_first_of ('/', 7 /* skip http:// part */ ); + if (sp != std::string::npos) + { + std::string base_url (url.begin()+7, url.begin()+sp); + LogPrint ("Base URL is: ", base_url, "\n"); + if ( sp != std::string::npos ) + { + std::string query (url.begin ()+sp+1, url.end ()); + LogPrint ("Query is: ", "/" + query); + + return std::make_pair (base_url, "/" + query); + } + return std::make_pair (base_url, "/"); + } + } + } + return std::make_pair ("",""); + } + + void HTTPConnection::HandleWrite (const boost::system::error_code& ecode) + { + Terminate (); + } + + void HTTPConnection::HandleDestinationRequest (const std::string& address, const std::string& uri) + { + i2p::data::IdentHash destination; + std::string fullAddress; + if (address.find (".b32.i2p") != std::string::npos) + { + int li = address.find_first_of ("."); + std::string newaddress = address.substr (0, li); + if (i2p::data::Base32ToByteStream (newaddress.c_str (), newaddress.length (), (uint8_t *)destination, 32) != 32) + { + LogPrint ("Invalid Base32 address ", newaddress); + return; + } + fullAddress = newaddress + ".b32.i2p"; + } + else + { + auto addr = i2p::data::netdb.FindAddress (address); + if (!addr) + { + LogPrint ("Unknown address ", address); + return; + } + destination = *addr; + fullAddress = address; + } + + auto leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) + { + i2p::data::netdb.Subscribe(destination); + std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds + leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet + { + m_Reply.content = leaseSet ? ""+ i2p::proxy::itoopieImage +"
Leases expired" : ""+ i2p::proxy::itoopieImage +"LeaseSet not found"; + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + return; + } + } + auto s = i2p::stream::CreateStream (*leaseSet); + if (s) + { + std::string request = "GET " + uri + " HTTP/1.1\n Host:" + fullAddress + "\n"; + s->Send ((uint8_t *)request.c_str (), request.length (), 10); + std::stringstream ss; + uint8_t buf[8192]; + size_t r = s->Receive (buf, 8192, 30); // 30 seconds + if (!r && s->IsEstablished ()) // nothing received but connection is established + r = s->Receive (buf, 8192, 30); // wait for another 30 secondd + if (r) // we recieved data + { + ss << std::string ((char *)buf, r); + while (s->IsOpen () && (r = s->Receive (buf, 8192, 30)) > 0) + ss << std::string ((char *)buf,r); + + m_Reply.content = ss.str (); // send "as is" + m_Reply.headers.resize(0); // no headers + return; + } + else // nothing received + ss << ""+ i2p::proxy::itoopieImage +"
Not responding"; + s->Close (); + DeleteStream (s); + + m_Reply.content = ss.str (); + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + } + } + + + HTTPProxy::HTTPProxy (int port): + m_Thread (nullptr), m_Work (m_Service), + m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), + m_NewSocket (nullptr) + { + + } + + HTTPProxy::~HTTPProxy () + { + Stop (); + } + + void HTTPProxy::Start () + { + m_Thread = new std::thread (std::bind (&HTTPProxy::Run, this)); + m_Acceptor.listen (); + Accept (); + } + + void HTTPProxy::Stop () + { + m_Acceptor.close(); + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = nullptr; + } + } + + void HTTPProxy::Run () + { + m_Service.run (); + } + + void HTTPProxy::Accept () + { + m_NewSocket = new boost::asio::ip::tcp::socket (m_Service); + m_Acceptor.async_accept (*m_NewSocket, boost::bind (&HTTPProxy::HandleAccept, this, + boost::asio::placeholders::error)); + } + + void HTTPProxy::HandleAccept(const boost::system::error_code& ecode) + { + if (!ecode) + { + new HTTPConnection (m_NewSocket); + Accept (); + } + } +} +} + diff --git a/HTTPProxy.h b/HTTPProxy.h new file mode 100644 index 00000000..d607d917 --- /dev/null +++ b/HTTPProxy.h @@ -0,0 +1,93 @@ +#ifndef HTTP_PROXY_H__ +#define HTTP_PROXY_H__ + +#include +#include +#include +#include + +namespace i2p +{ +namespace proxy +{ + const std::string itoopieImage = "\"\""; + + class HTTPConnection + { + struct header + { + std::string name; + std::string value; + }; + + struct request + { + std::string method; + std::string uri; + int http_version_major; + int http_version_minor; + std::vector
headers; + }; + + struct reply + { + std::vector
headers; + std::string content; + + std::vector to_buffers(); + }; + + public: + + HTTPConnection (boost::asio::ip::tcp::socket * socket): m_Socket (socket) { Receive (); }; + ~HTTPConnection () { delete m_Socket; } + + private: + + void Terminate (); + void Receive (); + void HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void HandleWrite(const boost::system::error_code& ecode); + + void HandleDestinationRequest (const std::string& address, const std::string& uri); + std::pair ExtractRequest (); + void parseHeaders(const std::string& h, std::vector
& hm); + + private: + + boost::asio::ip::tcp::socket * m_Socket; + char m_Buffer[8192]; + request m_Request; + reply m_Reply; + }; + + class HTTPProxy + { + public: + + HTTPProxy (int port); + ~HTTPProxy (); + + void Start (); + void Stop (); + + private: + + void Run (); + void Accept (); + void HandleAccept(const boost::system::error_code& ecode); + + private: + + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + boost::asio::ip::tcp::acceptor m_Acceptor; + boost::asio::ip::tcp::socket * m_NewSocket; + }; +} +} + +#endif + + diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 8990da3b..5b2c66f1 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -425,11 +425,6 @@ namespace i2p int size = be16toh (header->size); switch (header->typeID) { - case eI2NPDeliveryStatus: - LogPrint ("DeliveryStatus"); - // we assume DeliveryStatusMessage is sent with garlic only - i2p::garlic::routing.HandleDeliveryStatusMessage (buf, size); - break; case eI2NPVariableTunnelBuild: LogPrint ("VariableTunnelBuild"); HandleVariableTunnelBuildMsg (msgID, buf, size); @@ -461,6 +456,10 @@ namespace i2p LogPrint ("TunnelGateway"); HandleTunnelGatewayMsg (msg); break; + case eI2NPGarlic: + LogPrint ("Garlic"); + i2p::garlic::routing.HandleGarlicMessage (msg); + break; case eI2NPDatabaseStore: LogPrint ("DatabaseStore"); i2p::data::netdb.PostI2NPMsg (msg); @@ -468,11 +467,17 @@ namespace i2p case eI2NPDatabaseSearchReply: LogPrint ("DatabaseSearchReply"); i2p::data::netdb.PostI2NPMsg (msg); + break; + case eI2NPDeliveryStatus: + LogPrint ("DeliveryStatus"); + if (msg->from && msg->from->GetTunnelPool ()) + msg->from->GetTunnelPool ()->ProcessDeliveryStatus (msg); + else + { + i2p::garlic::routing.HandleDeliveryStatusMessage (msg->GetPayload (), msg->GetLength ()); + DeleteI2NPMessage (msg); + } break; - case eI2NPGarlic: - LogPrint ("Garlic"); - i2p::garlic::routing.HandleGarlicMessage (msg); - break; default: HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ()); DeleteI2NPMessage (msg); diff --git a/Makefile b/Makefile index 84a1acc0..d13d41b7 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ OBJECTS = obj/i2p.o obj/base64.o obj/NTCPSession.o obj/RouterInfo.o obj/Transpor obj/RouterContext.o obj/NetDb.o obj/LeaseSet.o obj/Tunnel.o obj/TunnelEndpoint.o \ obj/TunnelGateway.o obj/TransitTunnel.o obj/I2NPProtocol.o obj/Log.o obj/Garlic.o \ obj/HTTPServer.o obj/Streaming.o obj/Identity.o obj/SSU.o obj/util.o obj/Reseed.o \ - obj/UPnP.o obj/TunnelPool.o + obj/UPnP.o obj/TunnelPool.o obj/HTTPProxy.o INCFLAGS = LDFLAGS = -Wl,-rpath,/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem -lboost_regex -lboost_program_options -lpthread LIBS = diff --git a/README.md b/README.md index 20d84e5b..d19a666c 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,6 @@ Options * --httpport= - The http port to listen on * --log= - Enable or disable logging to file. 1 for yes, 0 for no. * --daemon= - Eanble or disable daemon mode. 1 for yes, 0 for no. - +* --httpproxyport= - The port to listen on (HTTP Proxy) diff --git a/Streaming.cpp b/Streaming.cpp index d365e57c..a537def8 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -15,8 +15,9 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), - m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), + m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -167,19 +168,8 @@ namespace stream memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); - - if (!m_OutboundTunnel) - m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (m_OutboundTunnel && !leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - } - else - DeleteI2NPMessage (msg); + + SendPacket (packet, size); } void Stream::SendQuickAck () @@ -202,25 +192,8 @@ namespace stream *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size - I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size)); - if (m_OutboundTunnel) - { - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (!leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - LogPrint ("Quick Ack sent"); - } - else - { - LogPrint ("All leases are expired"); - DeleteI2NPMessage (msg); - } - } - else - DeleteI2NPMessage (msg); + if (SendPacket (packet, size)) + LogPrint ("Quick Ack sent"); } void Stream::Close () @@ -250,17 +223,8 @@ namespace stream size += 40; // signature m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size)); - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (m_OutboundTunnel && !leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - LogPrint ("FIN sent"); - } - else - DeleteI2NPMessage (msg); + if (SendPacket (packet, size)) + LogPrint ("FIN sent"); m_ReceiveQueue.WakeUp (); } } @@ -297,6 +261,41 @@ namespace stream } return pos; } + + bool Stream::SendPacket (uint8_t * packet, size_t size) + { + I2NPMessage * leaseSet = nullptr; + if (m_LeaseSetUpdated) + { + leaseSet = m_LocalDestination->GetLeaseSet (); + m_LeaseSetUpdated = false; + } + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size), leaseSet); + if (!m_OutboundTunnel) + m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); + if (m_OutboundTunnel) + { + auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); + if (!leases.empty ()) + { + auto& lease = *leases.begin (); // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + return true; + } + else + { + LogPrint ("All leases are expired"); + DeleteI2NPMessage (msg); + } + } + else + { + LogPrint ("No outbound tunnels in the pool"); + DeleteI2NPMessage (msg); + } + return false; + } StreamingDestination * sharedLocalDestination = nullptr; @@ -356,6 +355,8 @@ namespace stream m_LeaseSet = newLeaseSet; if (oldLeaseSet) DeleteI2NPMessage (oldLeaseSet); + for (auto it: m_Streams) + it.second->SetLeaseSetUpdated (); } I2NPMessage * StreamingDestination::GetLeaseSet () @@ -450,7 +451,7 @@ namespace stream uncompressed->len = decompressor.MaxRetrievable (); if (uncompressed->len > MAX_PACKET_SIZE) { - LogPrint ("Recieved packet size exceeds mac packer size"); + LogPrint ("Recieved packet size exceeds mac packet size"); uncompressed->len = MAX_PACKET_SIZE; } decompressor.Get (uncompressed->buf, uncompressed->len); diff --git a/Streaming.h b/Streaming.h index b0f7fda9..973c2a55 100644 --- a/Streaming.h +++ b/Streaming.h @@ -78,11 +78,14 @@ namespace stream size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired void Close (); + + void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; }; private: void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); + bool SendPacket (uint8_t * packet, size_t size); void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); @@ -90,7 +93,7 @@ namespace stream private: uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; - bool m_IsOpen; + bool m_IsOpen, m_LeaseSetUpdated; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet& m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue; diff --git a/Tunnel.cpp b/Tunnel.cpp index 5d93c50a..0ebd6345 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -502,7 +502,10 @@ namespace tunnel void Tunnels::ManageTunnelPools () { for (auto& it: m_Pools) + { it->CreateTunnels (); + it->TestTunnels (); + } } void Tunnels::PostTunnelData (I2NPMessage * msg) diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 6d6516ec..f16b389d 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -1,9 +1,11 @@ #include +#include "I2PEndian.h" #include "CryptoConst.h" #include "Tunnel.h" #include "NetDb.h" #include "Timestamp.h" #include "RouterContext.h" +#include "Garlic.h" #include "TunnelPool.h" namespace i2p @@ -48,6 +50,8 @@ namespace tunnel void TunnelPool::TunnelExpired (OutboundTunnel * expiredTunnel) { m_OutboundTunnels.erase (expiredTunnel); + if (expiredTunnel == m_LastOutboundTunnel) + m_LastOutboundTunnel = nullptr; } std::vector TunnelPool::GetInboundTunnels (int num) const @@ -90,6 +94,43 @@ namespace tunnel CreateOutboundTunnel (); } + void TunnelPool::TestTunnels () + { + auto& rnd = i2p::context.GetRandomNumberGenerator (); + for (auto it: m_Tests) + { + LogPrint ("Tunnel test ", (int)it.first, " failed"); + // both outbound and inbound tunnels considered as invalid + TunnelExpired (it.second.first); + TunnelExpired (it.second.second); + } + m_Tests.clear (); + auto it1 = m_OutboundTunnels.begin (); + auto it2 = m_InboundTunnels.begin (); + while (it1 != m_OutboundTunnels.end () && it2 != m_InboundTunnels.end ()) + { + uint32_t msgID = rnd.GenerateWord32 (); + m_Tests[msgID] = std::make_pair (*it1, *it2); + (*it1)->SendTunnelDataMsg ((*it2)->GetNextIdentHash (), (*it2)->GetNextTunnelID (), + CreateDeliveryStatusMsg (msgID)); + it1++; it2++; + } + } + + void TunnelPool::ProcessDeliveryStatus (I2NPMessage * msg) + { + I2NPDeliveryStatusMsg * deliveryStatus = (I2NPDeliveryStatusMsg *)msg->GetPayload (); + auto it = m_Tests.find (be32toh (deliveryStatus->msgID)); + if (it != m_Tests.end ()) + { + LogPrint ("Tunnel test ", it->first, " successive. ", i2p::util::GetMillisecondsSinceEpoch () - be64toh (deliveryStatus->timestamp), " milliseconds"); + m_Tests.erase (it); + } + else + i2p::garlic::routing.HandleDeliveryStatusMessage (msg->GetPayload (), msg->GetLength ()); // TODO: + DeleteI2NPMessage (msg); + } + void TunnelPool::CreateInboundTunnel () { OutboundTunnel * outboundTunnel = m_OutboundTunnels.size () > 0 ? diff --git a/TunnelPool.h b/TunnelPool.h index f8b6aab5..8cef8375 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -1,8 +1,10 @@ #ifndef TUNNEL_POOL__ #define TUNNEL_POOL__ +#include #include #include +#include #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" @@ -34,6 +36,9 @@ namespace tunnel std::vector GetInboundTunnels (int num) const; OutboundTunnel * GetNextOutboundTunnel (); + void TestTunnels (); + void ProcessDeliveryStatus (I2NPMessage * msg); + private: void CreateInboundTunnel (); @@ -46,6 +51,7 @@ namespace tunnel int m_NumTunnels; std::set m_InboundTunnels; // recent tunnel appears first std::set m_OutboundTunnels; + std::map > m_Tests; OutboundTunnel * m_LastOutboundTunnel; }; } diff --git a/i2p.cpp b/i2p.cpp index b13b1791..cc2e5bcf 100644 --- a/i2p.cpp +++ b/i2p.cpp @@ -22,6 +22,7 @@ #include "Tunnel.h" #include "NetDb.h" #include "HTTPServer.h" +#include "HTTPProxy.h" #include "Garlic.h" #include "util.h" #include "Streaming.h" @@ -157,6 +158,9 @@ int main( int argc, char* argv[] ) i2p::garlic::routing.Start (); i2p::stream::StartStreaming (); + i2p::proxy::HTTPProxy httpProxy (i2p::util::config::GetArg("-httpproxyport", 4446)); + httpProxy.Start(); + while (running) { //TODO Meeh: Find something better to do here.