handle STREAM ACCEPT

This commit is contained in:
orignal 2014-09-26 15:40:57 -04:00
parent 9cd62d8873
commit 6af5fa3d27
4 changed files with 89 additions and 26 deletions

87
SAM.cpp
View File

@ -33,13 +33,30 @@ namespace stream
DeleteStream (m_Stream); DeleteStream (m_Stream);
m_Stream = nullptr; m_Stream = nullptr;
} }
if (m_SocketType == eSAMSocketTypeSession) switch (m_SocketType)
m_Owner.CloseSession (m_ID);
else if (m_SocketType == eSAMSocketTypeStream)
{ {
auto session = m_Owner.FindSession (m_ID); case eSAMSocketTypeSession:
if (session) m_Owner.CloseSession (m_ID);
session->sockets.remove (this); break;
case eSAMSocketTypeStream:
{
auto session = m_Owner.FindSession (m_ID);
if (session)
session->sockets.remove (this);
break;
}
case eSAMSocketTypeAcceptor:
{
auto session = m_Owner.FindSession (m_ID);
if (session)
{
session->sockets.remove (this);
session->localDestination->ResetAcceptor ();
}
break;
}
default:
;
} }
delete this; delete this;
} }
@ -137,6 +154,8 @@ namespace stream
ProcessSessionCreate (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); ProcessSessionCreate (eol + 1, bytes_transferred - (eol - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT)) else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT))
ProcessStreamConnect (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); ProcessStreamConnect (eol + 1, bytes_transferred - (eol - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT))
ProcessStreamAccept (eol + 1, bytes_transferred - (eol - m_Buffer) - 1);
else else
{ {
LogPrint ("SAM unexpected message ", m_Buffer); LogPrint ("SAM unexpected message ", m_Buffer);
@ -191,20 +210,43 @@ namespace stream
if (leaseSet) if (leaseSet)
{ {
m_SocketType = eSAMSocketTypeStream; m_SocketType = eSAMSocketTypeStream;
m_Stream = i2p::stream::CreateStream (*leaseSet);
m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect
StreamReceive ();
session->sockets.push_back (this); session->sockets.push_back (this);
SendMessageReply (SAM_STREAM_CONNECT_REPLY_OK, sizeof(SAM_STREAM_CONNECT_REPLY_OK), false); m_Stream = session->localDestination->CreateNewOutgoingStream (*leaseSet);
m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect
I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false);
} }
else else
{ {
i2p::data::netdb.Subscribe (dest.GetIdentHash ()); i2p::data::netdb.Subscribe (dest.GetIdentHash ());
SendMessageReply (SAM_STREAM_CONNECT_CANT_REACH_PEER, sizeof(SAM_STREAM_CONNECT_CANT_REACH_PEER), true); SendMessageReply (SAM_STREAM_STATUS_CANT_REACH_PEER, sizeof(SAM_STREAM_STATUS_CANT_REACH_PEER), true);
} }
} }
else else
SendMessageReply (SAM_STREAM_CONNECT_INVALID_ID, sizeof(SAM_STREAM_CONNECT_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true);
}
void SAMSocket::ProcessStreamAccept (char * buf, size_t len)
{
std::map<std::string, std::string> params;
ExtractParams (buf, len, params);
std::string& id = params[SAM_PARAM_ID];
m_ID = id;
auto session = m_Owner.FindSession (id);
if (session)
{
if (!session->localDestination->IsAcceptorSet ())
{
m_SocketType = eSAMSocketTypeAcceptor;
session->sockets.push_back (this);
session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false);
}
else
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, sizeof(SAM_STREAM_STATUS_I2P_ERROR), true);
}
else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true);
} }
void SAMSocket::ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params) void SAMSocket::ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params)
@ -246,16 +288,16 @@ namespace stream
} }
} }
void SAMSocket::StreamReceive () void SAMSocket::I2PReceive ()
{ {
if (m_Stream) if (m_Stream)
m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE),
boost::bind (&SAMSocket::HandleStreamReceive, this, boost::bind (&SAMSocket::HandleI2PReceive, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
SAM_SOCKET_CONNECTION_MAX_IDLE); SAM_SOCKET_CONNECTION_MAX_IDLE);
} }
void SAMSocket::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
if (ecode) if (ecode)
{ {
@ -265,11 +307,11 @@ namespace stream
else else
{ {
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred),
boost::bind (&SAMSocket::HandleWriteStreamData, this, boost::asio::placeholders::error)); boost::bind (&SAMSocket::HandleWriteI2PData, this, boost::asio::placeholders::error));
} }
} }
void SAMSocket::HandleWriteStreamData (const boost::system::error_code& ecode) void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode)
{ {
if (ecode) if (ecode)
{ {
@ -278,9 +320,18 @@ namespace stream
Terminate (); Terminate ();
} }
else else
StreamReceive (); I2PReceive ();
} }
void SAMSocket::HandleI2PAccept (i2p::stream::Stream * stream)
{
m_Stream = stream;
auto session = m_Owner.FindSession (m_ID);
if (session)
session->localDestination->ResetAcceptor ();
I2PReceive ();
}
SAMBridge::SAMBridge (int port): SAMBridge::SAMBridge (int port):
m_IsRunning (false), m_Thread (nullptr), m_IsRunning (false), m_Thread (nullptr),
m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),

19
SAM.h
View File

@ -21,9 +21,11 @@ namespace stream
const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION="; const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION=";
const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID"; const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID";
const char SAM_STREAM_CONNECT[] = "STREAM CONNECT"; const char SAM_STREAM_CONNECT[] = "STREAM CONNECT";
const char SAM_STREAM_CONNECT_REPLY_OK[] = "STREAM STATUS RESULT=OK"; const char SAM_STREAM_STATUS_OK[] = "STREAM STATUS RESULT=OK";
const char SAM_STREAM_CONNECT_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID"; const char SAM_STREAM_STATUS_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID";
const char SAM_STREAM_CONNECT_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER"; const char SAM_STREAM_STATUS_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER";
const char SAM_STREAM_STATUS_I2P_ERROR[] = "STREAM STATUS RESULT=I2P_ERROR";
const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT";
const char SAM_PARAM_STYLE[] = "STYLE"; const char SAM_PARAM_STYLE[] = "STYLE";
const char SAM_PARAM_ID[] = "ID"; const char SAM_PARAM_ID[] = "ID";
const char SAM_PARAM_DESTINATION[] = "DESTINATION"; const char SAM_PARAM_DESTINATION[] = "DESTINATION";
@ -33,7 +35,8 @@ namespace stream
{ {
eSAMSocketTypeUnknown, eSAMSocketTypeUnknown,
eSAMSocketTypeSession, eSAMSocketTypeSession,
eSAMSocketTypeStream eSAMSocketTypeStream,
eSAMSocketTypeAcceptor
}; };
class SAMBridge; class SAMBridge;
@ -58,12 +61,14 @@ namespace stream
void Receive (); void Receive ();
void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void StreamReceive (); void I2PReceive ();
void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleWriteStreamData (const boost::system::error_code& ecode); void HandleI2PAccept (i2p::stream::Stream * stream);
void HandleWriteI2PData (const boost::system::error_code& ecode);
void ProcessSessionCreate (char * buf, size_t len); void ProcessSessionCreate (char * buf, size_t len);
void ProcessStreamConnect (char * buf, size_t len); void ProcessStreamConnect (char * buf, size_t len);
void ProcessStreamAccept (char * buf, size_t len);
void ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params); void ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params);
private: private:

View File

@ -568,7 +568,7 @@ namespace stream
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet; delete m_LeaseSet;
} }
void StreamingDestination::HandleNextPacket (Packet * packet) void StreamingDestination::HandleNextPacket (Packet * packet)
{ {
uint32_t sendStreamID = packet->GetSendStreamID (); uint32_t sendStreamID = packet->GetSendStreamID ();
@ -589,6 +589,11 @@ namespace stream
incomingStream->HandleNextPacket (packet); incomingStream->HandleNextPacket (packet);
if (m_Acceptor != nullptr) if (m_Acceptor != nullptr)
m_Acceptor (incomingStream); m_Acceptor (incomingStream);
else
{
LogPrint ("Acceptor for incoming stream is not set");
DeleteStream (incomingStream);
}
} }
} }

View File

@ -152,6 +152,8 @@ namespace stream
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; }; void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
// implements LocalDestination // implements LocalDestination