create streams through ClientDestination

This commit is contained in:
orignal 2014-10-22 14:01:23 -04:00
parent 4d97b0e206
commit b11877d002
8 changed files with 63 additions and 21 deletions

View File

@ -244,7 +244,7 @@ namespace client
uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4;
// we assume I2CP payload
if (buf[9] == 6) // streaming protocol
if (buf[9] == PROTOCOL_TYPE_STREAMING && m_StreamingDestination) // streaming protocol
{
// unzip it
CryptoPP::Gunzip decompressor;
@ -261,7 +261,6 @@ namespace client
else
{
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped");
decompressor.Skip ();
delete uncompressed;
}
}
@ -285,11 +284,37 @@ namespace client
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = 6; // streaming protocol
buf[9] = PROTOCOL_TYPE_STREAMING; // streaming protocol. TODO:
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
}
i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote)
{
if (m_StreamingDestination)
return m_StreamingDestination->CreateNewOutgoingStream (remote);
return nullptr;
}
void ClientDestination::AcceptStreams (const std::function<void (i2p::stream::Stream *)>& acceptor)
{
if (m_StreamingDestination)
m_StreamingDestination->SetAcceptor (acceptor);
}
void ClientDestination::StopAcceptingStreams ()
{
if (m_StreamingDestination)
m_StreamingDestination->ResetAcceptor ();
}
bool ClientDestination::IsAcceptingStreams () const
{
if (m_StreamingDestination)
return m_StreamingDestination->IsAcceptorSet ();
return false;
}
}
}

View File

@ -14,6 +14,10 @@ namespace i2p
{
namespace client
{
const uint8_t PROTOCOL_TYPE_STREAMING = 6;
const uint8_t PROTOCOL_TYPE_DATAGRAM = 17;
const uint8_t PROTOCOL_TYPE_RAW = 18;
class ClientDestination: public i2p::garlic::GarlicDestination
{
public:
@ -28,13 +32,19 @@ namespace client
bool IsRunning () const { return m_IsRunning; };
boost::asio::io_service * GetService () { return m_Service; };
i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; };
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); };
void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; };
const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident);
void SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs);
// streaming
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote);
void AcceptStreams (const std::function<void (i2p::stream::Stream *)>& acceptor);
void StopAcceptingStreams ();
bool IsAcceptingStreams () const;
// implements LocalDestination
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; };

View File

@ -517,7 +517,7 @@ namespace util
if (m_Stream)
{
m_Stream->Close ();
i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr;
}
m_Socket->close ();
@ -880,7 +880,7 @@ namespace util
void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len)
{
if (!m_Stream)
m_Stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*remote);
m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote);
if (m_Stream)
{
m_Stream->Send ((uint8_t *)buf, len);

View File

@ -14,7 +14,7 @@ namespace client
boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet):
m_Socket (socket), m_Owner (owner)
{
m_Stream = m_Owner->GetLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*leaseSet);
m_Stream = m_Owner->GetLocalDestination ()->CreateStream (*leaseSet);
m_Stream->Send (m_Buffer, 0); // connect
StreamReceive ();
Receive ();
@ -39,7 +39,7 @@ namespace client
if (m_Stream)
{
m_Stream->Close ();
m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr;
}
m_Socket->close ();
@ -115,7 +115,7 @@ namespace client
if (ecode != boost::asio::error::operation_aborted)
{
if (m_Stream) m_Stream->Close ();
m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr;
}
}
@ -270,7 +270,7 @@ namespace client
{
auto localDestination = GetLocalDestination ();
if (localDestination)
localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1));
localDestination->AcceptStreams (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1));
else
LogPrint ("Local destination not set for server tunnel");
}

17
SAM.cpp
View File

@ -25,8 +25,8 @@ namespace client
if (m_Stream)
{
m_Stream->Close ();
if (m_Session && m_Session->localDestination)
m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream);
i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr;
}
}
@ -35,8 +35,7 @@ namespace client
if (m_Stream)
{
m_Stream->Close ();
if (m_Session && m_Session->localDestination)
m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream);
i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr;
}
switch (m_SocketType)
@ -55,7 +54,7 @@ namespace client
if (m_Session)
{
m_Session->sockets.remove (this);
m_Session->localDestination->GetStreamingDestination ()->ResetAcceptor ();
m_Session->localDestination->StopAcceptingStreams ();
}
break;
}
@ -295,7 +294,7 @@ namespace client
{
m_SocketType = eSAMSocketTypeStream;
m_Session->sockets.push_back (this);
m_Stream = m_Session->localDestination->GetStreamingDestination ()->CreateNewOutgoingStream (remote);
m_Stream = m_Session->localDestination->CreateStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect
I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
@ -344,11 +343,11 @@ namespace client
m_Session = m_Owner.FindSession (id);
if (m_Session)
{
if (!m_Session->localDestination->GetStreamingDestination ()->IsAcceptorSet ())
if (!m_Session->localDestination->IsAcceptingStreams ())
{
m_SocketType = eSAMSocketTypeAcceptor;
m_Session->sockets.push_back (this);
m_Session->localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
else
@ -507,7 +506,7 @@ namespace client
m_Stream = stream;
auto session = m_Owner.FindSession (m_ID);
if (session)
session->localDestination->GetStreamingDestination ()->ResetAcceptor ();
session->localDestination->StopAcceptingStreams ();
if (!m_IsSilent)
{
// send remote peer address

View File

@ -224,7 +224,7 @@ namespace proxy
void SOCKS4AHandler::SentConnectionSuccess(const boost::system::error_code & ecode)
{
LogPrint("--- socks4a making connection");
m_stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream(*m_ls);
m_stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream(*m_ls);
m_state = OKAY;
LogPrint("--- socks4a state is ", m_state);
AsyncSockRead();

View File

@ -605,5 +605,11 @@ namespace stream
}
}
}
void DeleteStream (Stream * stream)
{
if (stream)
stream->GetLocalDestination ().DeleteStream (stream);
}
}
}

View File

@ -182,6 +182,8 @@ namespace stream
const decltype(m_Streams)& GetStreams () const { return m_Streams; };
};
void DeleteStream (Stream * stream);
//-------------------------------------------------
template<typename Buffer, typename ReceiveHandler>