From 207212557ebb5a2b63f1037d0b3a61d89b4f2b5f Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 15 Jan 2018 08:19:57 -0500 Subject: [PATCH 1/3] fix sam race conditions --- libi2pd/Streaming.h | 21 +-- libi2pd_client/SAM.cpp | 320 ++++++++++++++++++++++++++--------------- libi2pd_client/SAM.h | 22 +-- 3 files changed, 227 insertions(+), 136 deletions(-) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 93cf2a9f..a9e5dcdd 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -69,7 +69,7 @@ namespace stream */ const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000; - struct Packet + struct Packet { size_t len, offset; uint8_t buf[MAX_PACKET_SIZE]; @@ -276,8 +276,8 @@ namespace stream /** set max connections per minute per destination */ void SetMaxConnsPerMinute(const uint32_t conns); - Packet * NewPacket () { return m_PacketsPool.Acquire (); } - void DeletePacket (Packet * p) { m_PacketsPool.Release (p); } + Packet * NewPacket () { return new Packet; } + void DeletePacket (Packet * p) { delete p; } private: @@ -316,7 +316,7 @@ namespace stream std::vector m_Banned; uint64_t m_LastBanClear; - i2p::util::MemoryPool m_PacketsPool; + //i2p::util::MemoryPool m_PacketsPool; bool m_EnableDrop; public: @@ -334,16 +334,21 @@ namespace stream void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) { auto s = shared_from_this(); - m_Service.post ([=](void) + m_Service.post ([s, buffer, handler, timeout](void) { if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset) s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0); else { - int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; + int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); - s->m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode) - { s->HandleReceiveTimer (ecode, buffer, handler, timeout - t); }); + int left = timeout - t; + auto self = s->shared_from_this(); + self->m_ReceiveTimer.async_wait ( + [self, buffer, handler, left](const boost::system::error_code & ec) + { + self->HandleReceiveTimer(ec, buffer, handler, left); + }); } }); } diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 140140fc..4ea6b61b 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -15,31 +15,22 @@ namespace i2p { namespace client { - SAMSocket::SAMSocket (SAMBridge& owner): - m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Timer (m_Owner.GetService ()), - m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), - m_IsAccepting (false), m_Stream (nullptr), m_Session (nullptr) + SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr socket): + m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()), + m_BufferOffset (0), + m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), + m_IsAccepting (false), m_Stream (nullptr) { } SAMSocket::~SAMSocket () { - Terminate ("~SAMSocket()"); - } - - void SAMSocket::CloseStream (const char* reason) - { - LogPrint (eLogDebug, "SAMSocket::CloseStream, reason: ", reason); - if (m_Stream) + if(m_Stream) { m_Stream->Close (); m_Stream.reset (); - } - } - - void SAMSocket::Terminate (const char* reason) - { - CloseStream (reason); + } + auto Session = m_Owner.FindSession(m_ID); switch (m_SocketType) { @@ -48,17 +39,17 @@ namespace client break; case eSAMSocketTypeStream: { - if (m_Session) - m_Session->DelSocket (shared_from_this ()); + if (Session) + Session->DelSocket (this); break; } case eSAMSocketTypeAcceptor: { - if (m_Session) + if (Session) { - m_Session->DelSocket (shared_from_this ()); - if (m_IsAccepting && m_Session->localDestination) - m_Session->localDestination->StopAcceptingStreams (); + Session->DelSocket (this); + if (m_IsAccepting && Session->localDestination) + Session->localDestination->StopAcceptingStreams (); } break; } @@ -66,15 +57,54 @@ namespace client ; } m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket.is_open()) m_Socket.close (); - m_Session = nullptr; + if (m_Socket && m_Socket->is_open()) m_Socket->close (); + m_Socket.reset (); + } + + void SAMSocket::Terminate (const char* reason) + { + if(m_Stream) + { + m_Stream->Close (); + m_Stream.reset (); + } + auto Session = m_Owner.FindSession(m_ID); + + switch (m_SocketType) + { + case eSAMSocketTypeSession: + m_Owner.CloseSession (m_ID); + break; + case eSAMSocketTypeStream: + { + if (Session) + Session->DelSocket (this); + break; + } + case eSAMSocketTypeAcceptor: + { + if (Session) + { + Session->DelSocket (this); + if (m_IsAccepting && Session->localDestination) + Session->localDestination->StopAcceptingStreams (); + } + break; + } + default: + ; + } + m_SocketType = eSAMSocketTypeTerminated; + if (m_Socket && m_Socket->is_open()) m_Socket->close (); + m_Socket.reset (); } void SAMSocket::ReceiveHandshake () { - m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), - std::placeholders::_1, std::placeholders::_2)); + if(m_Socket) + m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), + std::placeholders::_1, std::placeholders::_2)); } void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -121,7 +151,7 @@ namespace client #else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); #endif - boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), + boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -144,9 +174,9 @@ namespace client if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: handshake reply send error"); } - else + else if(m_Socket) { - m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -157,7 +187,7 @@ namespace client LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); if (!m_IsSilent) - boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), + boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, close)); else @@ -306,19 +336,19 @@ namespace client } // create destination - m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms); - if (m_Session) + auto session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms); + if (session) { m_SocketType = eSAMSocketTypeSession; if (style == SAM_VALUE_DATAGRAM) { - m_Session->UDPEndpoint = forward; - auto dest = m_Session->localDestination->CreateDatagramDestination (); + session->UDPEndpoint = forward; + auto dest = session->localDestination->CreateDatagramDestination (); dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } - if (m_Session->localDestination->IsReady ()) + if (session->localDestination->IsReady ()) SendSessionCreateReplyOk (); else { @@ -335,30 +365,38 @@ namespace client { if (ecode != boost::asio::error::operation_aborted) { - if (m_Session->localDestination->IsReady ()) - SendSessionCreateReplyOk (); - else + auto session = m_Owner.FindSession(m_ID); + if(session) { - m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); - m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, - shared_from_this (), std::placeholders::_1)); + if (session->localDestination->IsReady ()) + SendSessionCreateReplyOk (); + else + { + m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); + m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, + shared_from_this (), std::placeholders::_1)); + } } } } void SAMSocket::SendSessionCreateReplyOk () { - uint8_t buf[1024]; - char priv[1024]; - size_t l = m_Session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024); - size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024); - priv[l1] = 0; + auto session = m_Owner.FindSession(m_ID); + if (session) + { + uint8_t buf[1024]; + char priv[1024]; + size_t l = session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024); + size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024); + priv[l1] = 0; #ifdef _MSC_VER - size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); + size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); #else - size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); + size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); #endif - SendMessageReply (m_Buffer, l2, false); + SendMessageReply (m_Buffer, l2, false); + } } void SAMSocket::ProcessStreamConnect (char * buf, size_t len, size_t rem) @@ -371,8 +409,8 @@ namespace client std::string& silent = params[SAM_PARAM_SILENT]; if (silent == SAM_VALUE_TRUE) m_IsSilent = true; m_ID = id; - m_Session = m_Owner.FindSession (id); - if (m_Session) + auto session = m_Owner.FindSession (id); + if (session) { if (rem > 0) // handle follow on data { @@ -387,12 +425,12 @@ namespace client if (l > 0) { context.GetAddressBook().InsertAddress(dest); - auto leaseSet = m_Session->localDestination->FindLeaseSet(dest->GetIdentHash()); + auto leaseSet = session->localDestination->FindLeaseSet(dest->GetIdentHash()); if (leaseSet) Connect(leaseSet); else { - m_Session->localDestination->RequestDestination(dest->GetIdentHash(), + session->localDestination->RequestDestination(dest->GetIdentHash(), std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete, shared_from_this(), std::placeholders::_1)); } @@ -406,13 +444,17 @@ namespace client void SAMSocket::Connect (std::shared_ptr remote) { - m_SocketType = eSAMSocketTypeStream; - m_Session->AddSocket (shared_from_this ()); - m_Stream = m_Session->localDestination->CreateStream (remote); - m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send - m_BufferOffset = 0; - I2PReceive (); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + auto session = m_Owner.FindSession(m_ID); + if(session) + { + m_SocketType = eSAMSocketTypeStream; + session->AddSocket (shared_from_this ()); + m_Stream = session->localDestination->CreateStream (remote); + m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send + m_BufferOffset = 0; + I2PReceive (); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + } } void SAMSocket::HandleConnectLeaseSetRequestComplete (std::shared_ptr leaseSet) @@ -435,15 +477,15 @@ namespace client std::string& silent = params[SAM_PARAM_SILENT]; if (silent == SAM_VALUE_TRUE) m_IsSilent = true; m_ID = id; - m_Session = m_Owner.FindSession (id); - if (m_Session) + auto session = m_Owner.FindSession (id); + if (session) { m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - if (!m_Session->localDestination->IsAcceptingStreams ()) + session->AddSocket (shared_from_this ()); + if (!session->localDestination->IsAcceptingStreams ()) { m_IsAccepting = true; - m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); } SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } @@ -459,9 +501,10 @@ namespace client size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf; if (offset + size <= len) { - if (m_Session) + auto session = m_Owner.FindSession(m_ID); + if (session) { - auto d = m_Session->localDestination->GetDatagramDestination (); + auto d = session->localDestination->GetDatagramDestination (); if (d) { i2p::data::IdentityEx dest; @@ -516,7 +559,8 @@ namespace client std::string& name = params[SAM_PARAM_NAME]; std::shared_ptr identity; i2p::data::IdentHash ident; - auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination; + auto session = m_Owner.FindSession(m_ID); + auto dest = session == nullptr ? context.GetSharedLocalDestination() : session->localDestination; if (name == "ME") SendNamingLookupReply (dest->GetIdentity ()); else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr) @@ -612,16 +656,18 @@ namespace client LogPrint (eLogError, "SAM: Buffer is full, terminate"); Terminate ("Buffer is full"); return; - } - m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), - std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, - shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } else if (m_Socket) + m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), + std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + else + LogPrint(eLogError, "SAM: receive with no native socket"); } void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("read error"); @@ -637,7 +683,7 @@ namespace client [s](const boost::system::error_code& ecode) { if (!ecode) - s->Receive (); + s->m_Owner.GetService ().post ([s] { s->Receive (); }); else s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); }); }); @@ -650,21 +696,21 @@ namespace client if (m_Stream) { if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || - m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular + m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular { m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), - std::bind (&SAMSocket::HandleI2PReceive, shared_from_this (), + std::bind (&SAMSocket::HandleI2PReceive, shared_from_this(), std::placeholders::_1, std::placeholders::_2), - SAM_SOCKET_CONNECTION_MAX_IDLE); + SAM_SOCKET_CONNECTION_MAX_IDLE); } else // closed by peer { + uint8_t * buff = new uint8_t[SAM_SOCKET_BUFFER_SIZE]; // get remaning data - auto len = m_Stream->ReadSome (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE); + auto len = m_Stream->ReadSome (buff, SAM_SOCKET_BUFFER_SIZE); if (len > 0) // still some data { - boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len), - std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); + WriteI2PDataImmediate(buff, len); } else // no more data Terminate ("no more data"); @@ -672,6 +718,30 @@ namespace client } } + void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz) + { + if(m_Socket) + boost::asio::async_write ( + *m_Socket, + boost::asio::buffer (buff, sz), + boost::asio::transfer_all(), + std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination + else + LogPrint(eLogError, "SAM: no native socket"); + } + + void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff) + { + delete [] buff; + } + + void SAMSocket::WriteI2PData(size_t sz) + { + uint8_t * sendbuff = new uint8_t[sz]; + memcpy(sendbuff, m_StreamBuffer, sz); + WriteI2PDataImmediate(sendbuff, sz); + } + void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) @@ -680,8 +750,9 @@ namespace client if (ecode != boost::asio::error::operation_aborted) { if (bytes_transferred > 0) - boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), - std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); // postpone termination + { + WriteI2PData(bytes_transferred); + } else { auto s = shared_from_this (); @@ -696,13 +767,18 @@ namespace client } else { - if (m_SocketType != eSAMSocketTypeTerminated) // check for possible race condition with Terminate() - boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), - std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); + if (m_SocketType != eSAMSocketTypeTerminated) + { + if (bytes_transferred > 0) + { + WriteI2PData(bytes_transferred); + } + I2PReceive(); + } } } - void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode) + void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode, size_t bytes_transferred) { if (ecode) { @@ -711,7 +787,9 @@ namespace client Terminate ("socket write error at HandleWriteI2PData"); } else + { I2PReceive (); + } } void SAMSocket::HandleI2PAccept (std::shared_ptr stream) @@ -760,39 +838,42 @@ namespace client { LogPrint (eLogDebug, "SAM: datagram received ", len); auto base64 = from.ToBase64 (); - auto ep = m_Session->UDPEndpoint; - if (ep) + auto session = m_Owner.FindSession(m_ID); + if(session) { - // udp forward enabled - size_t bsz = base64.size(); - size_t sz = bsz + 1 + len; - // build datagram body - uint8_t * data = new uint8_t[sz]; - // Destination - memcpy(data, base64.c_str(), bsz); - // linefeed - data[bsz] = '\n'; - // Payload - memcpy(data+bsz+1, buf, len); - // send to remote endpoint - m_Owner.SendTo(data, sz, ep); - delete [] data; - } - else - { -#ifdef _MSC_VER - size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); -#else - size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); -#endif - if (len < SAM_SOCKET_BUFFER_SIZE - l) + auto ep = session->UDPEndpoint; + if (ep) { - memcpy (m_StreamBuffer + l, buf, len); - boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l), - std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); + // udp forward enabled + size_t bsz = base64.size(); + size_t sz = bsz + 1 + len; + // build datagram body + uint8_t * data = new uint8_t[sz]; + // Destination + memcpy(data, base64.c_str(), bsz); + // linefeed + data[bsz] = '\n'; + // Payload + memcpy(data+bsz+1, buf, len); + // send to remote endpoint + m_Owner.SendTo(data, sz, ep); + delete [] data; } else - LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer"); + { +#ifdef _MSC_VER + size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); +#else + size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); +#endif + if (len < SAM_SOCKET_BUFFER_SIZE - l) + { + memcpy (m_StreamBuffer + l, buf, len); + WriteI2PData(len + l); + } + else + LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer"); + } } } @@ -875,8 +956,9 @@ namespace client void SAMBridge::Accept () { - auto newSocket = std::make_shared (*this); - m_Acceptor.async_accept (newSocket->GetSocket (), std::bind (&SAMBridge::HandleAccept, this, + auto native = std::make_shared(m_Service); + auto newSocket = std::make_shared (*this, native); + m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this, std::placeholders::_1, newSocket)); } diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index f2c84e3c..9282fe1d 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -79,11 +79,11 @@ namespace client { public: - SAMSocket (SAMBridge& owner); + typedef boost::asio::ip::tcp::socket Socket_t; + SAMSocket (SAMBridge& owner, std::shared_ptr socket); ~SAMSocket (); - void CloseStream (const char* reason); // TODO: implement it better - boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; + boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; }; void ReceiveHandshake (); void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; @@ -103,7 +103,7 @@ namespace client void I2PReceive (); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleI2PAccept (std::shared_ptr stream); - void HandleWriteI2PData (const boost::system::error_code& ecode); + void HandleWriteI2PData (const boost::system::error_code& ecode, size_t sz); void HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void ProcessSessionCreate (char * buf, size_t len); @@ -122,10 +122,15 @@ namespace client void HandleSessionReadinessCheckTimer (const boost::system::error_code& ecode); void SendSessionCreateReplyOk (); + void WriteI2PData(size_t sz); + void WriteI2PDataImmediate(uint8_t * ptr, size_t sz); + + void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff); + private: SAMBridge& m_Owner; - boost::asio::ip::tcp::socket m_Socket; + std::shared_ptr m_Socket; boost::asio::deadline_timer m_Timer; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; size_t m_BufferOffset; @@ -135,7 +140,6 @@ namespace client bool m_IsSilent; bool m_IsAccepting; // for eSAMSocketTypeAcceptor only std::shared_ptr m_Stream; - std::shared_ptr m_Session; }; struct SAMSession @@ -146,15 +150,15 @@ namespace client std::mutex m_SocketsMutex; /** safely add a socket to this session */ - void AddSocket(const std::shared_ptr & sock) { + void AddSocket(std::shared_ptr sock) { std::lock_guard lock(m_SocketsMutex); m_Sockets.push_back(sock); } /** safely remove a socket from this session */ - void DelSocket(const std::shared_ptr & sock) { + void DelSocket(SAMSocket * sock) { std::lock_guard lock(m_SocketsMutex); - m_Sockets.remove(sock); + m_Sockets.remove_if([sock](const std::shared_ptr s) -> bool { return s.get() == sock; }); } /** get a list holding a copy of all sam sockets from this session */ From b3b38015c28a3b7e052023f116c0ca99c170ab9c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 20 Jan 2018 07:31:05 -0500 Subject: [PATCH 2/3] check max buffer size in Stream::Send --- libi2pd/Streaming.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 37d9d2c4..9942d8e2 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -378,9 +378,15 @@ namespace stream size_t Stream::Send (const uint8_t * buf, size_t len) { - // TODO: check max buffer size + size_t sent = len; + while(len > MAX_PACKET_SIZE) + { + AsyncSend (buf, MAX_PACKET_SIZE, nullptr); + buf += MAX_PACKET_SIZE; + len -= MAX_PACKET_SIZE; + } AsyncSend (buf, len, nullptr); - return len; + return sent; } void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) From 06020b8f544ae09734992a47c95597e9998629c2 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 20 Jan 2018 13:06:08 -0500 Subject: [PATCH 3/3] re-enable packet pool --- libi2pd/Streaming.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 9410df9d..a114844d 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -276,8 +276,8 @@ namespace stream /** set max connections per minute per destination */ void SetMaxConnsPerMinute(const uint32_t conns); - Packet * NewPacket () { return new Packet; } - void DeletePacket (Packet * p) { delete p; } + Packet * NewPacket () { return m_PacketsPool.Acquire(); } + void DeletePacket (Packet * p) { return m_PacketsPool.Release(p); } void AcceptOnceAcceptor (std::shared_ptr stream, Acceptor acceptor, Acceptor prev); @@ -315,7 +315,7 @@ namespace stream std::vector m_Banned; uint64_t m_LastBanClear; - //i2p::util::MemoryPool m_PacketsPool; + i2p::util::MemoryPool m_PacketsPool; bool m_EnableDrop; public: