re-send loacl LeaseSet if updated

This commit is contained in:
orignal 2014-03-17 22:55:02 -04:00
parent bd89d444b1
commit 4b90e8112e
2 changed files with 51 additions and 47 deletions

View File

@ -15,8 +15,9 @@ namespace i2p
namespace stream
{
Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote):
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false),
m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr)
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
}
@ -167,19 +168,8 @@ namespace stream
memcpy (packet + size, buf, len);
size += len; // payload
m_LocalDestination->Sign (packet, size, signature);
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ());
if (!m_OutboundTunnel)
m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases ();
if (m_OutboundTunnel && !leases.empty ())
{
auto& lease = *leases.begin (); // TODO:
m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg);
}
else
DeleteI2NPMessage (msg);
SendPacket (packet, size);
}
void Stream::SendQuickAck ()
@ -202,25 +192,8 @@ namespace stream
*(uint16_t *)(packet + size) = 0; // no options
size += 2; // options size
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size));
if (m_OutboundTunnel)
{
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases ();
if (!leases.empty ())
{
auto& lease = *leases.begin (); // TODO:
m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg);
LogPrint ("Quick Ack sent");
}
else
{
LogPrint ("All leases are expired");
DeleteI2NPMessage (msg);
}
}
else
DeleteI2NPMessage (msg);
if (SendPacket (packet, size))
LogPrint ("Quick Ack sent");
}
void Stream::Close ()
@ -250,17 +223,8 @@ namespace stream
size += 40; // signature
m_LocalDestination->Sign (packet, size, signature);
I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size));
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases ();
if (m_OutboundTunnel && !leases.empty ())
{
auto& lease = *leases.begin (); // TODO:
m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg);
LogPrint ("FIN sent");
}
else
DeleteI2NPMessage (msg);
if (SendPacket (packet, size))
LogPrint ("FIN sent");
m_ReceiveQueue.WakeUp ();
}
}
@ -297,6 +261,41 @@ namespace stream
}
return pos;
}
bool Stream::SendPacket (uint8_t * packet, size_t size)
{
I2NPMessage * leaseSet = nullptr;
if (m_LeaseSetUpdated)
{
leaseSet = m_LocalDestination->GetLeaseSet ();
m_LeaseSetUpdated = false;
}
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size), leaseSet);
if (!m_OutboundTunnel)
m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
if (m_OutboundTunnel)
{
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases ();
if (!leases.empty ())
{
auto& lease = *leases.begin (); // TODO:
m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg);
return true;
}
else
{
LogPrint ("All leases are expired");
DeleteI2NPMessage (msg);
}
}
else
{
LogPrint ("No outbound tunnels in the pool");
DeleteI2NPMessage (msg);
}
return false;
}
StreamingDestination * sharedLocalDestination = nullptr;
@ -356,6 +355,8 @@ namespace stream
m_LeaseSet = newLeaseSet;
if (oldLeaseSet)
DeleteI2NPMessage (oldLeaseSet);
for (auto it: m_Streams)
it.second->SetLeaseSetUpdated ();
}
I2NPMessage * StreamingDestination::GetLeaseSet ()
@ -450,7 +451,7 @@ namespace stream
uncompressed->len = decompressor.MaxRetrievable ();
if (uncompressed->len > MAX_PACKET_SIZE)
{
LogPrint ("Recieved packet size exceeds mac packer size");
LogPrint ("Recieved packet size exceeds mac packet size");
uncompressed->len = MAX_PACKET_SIZE;
}
decompressor.Get (uncompressed->buf, uncompressed->len);

View File

@ -78,11 +78,14 @@ namespace stream
size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds
size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired
void Close ();
void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; };
private:
void ConnectAndSend (uint8_t * buf, size_t len);
void SendQuickAck ();
bool SendPacket (uint8_t * packet, size_t size);
void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet);
@ -90,7 +93,7 @@ namespace stream
private:
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber;
bool m_IsOpen;
bool m_IsOpen, m_LeaseSetUpdated;
StreamingDestination * m_LocalDestination;
const i2p::data::LeaseSet& m_RemoteLeaseSet;
i2p::util::Queue<Packet> m_ReceiveQueue;