acks and resends

This commit is contained in:
orignal 2014-08-10 18:27:23 -04:00
parent e7126908f9
commit f8ced20d1c
2 changed files with 72 additions and 8 deletions

View File

@ -21,7 +21,7 @@ namespace stream
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service)
m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease ();
@ -30,7 +30,7 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination * local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service)
m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
}
@ -38,6 +38,7 @@ namespace stream
Stream::~Stream ()
{
m_ReceiveTimer.cancel ();
m_ResendTimer.cancel ();
while (!m_ReceiveQueue.empty ())
{
auto packet = m_ReceiveQueue.front ();
@ -46,6 +47,8 @@ namespace stream
}
for (auto it: m_SavedPackets)
delete it;
for (auto it: m_SentPackets)
delete it;
}
void Stream::HandleNextPacket (Packet * packet)
@ -53,6 +56,9 @@ namespace stream
if (!m_SendStreamID)
m_SendStreamID = packet->GetReceiveStreamID ();
if (!packet->IsNoAck ()) // ack received
ProcessAck (packet);
int32_t receivedSeqn = packet->GetSeqn ();
bool isSyn = packet->IsSYN ();
if (!receivedSeqn && !isSyn)
@ -173,8 +179,29 @@ namespace stream
}
}
void Stream::ProcessAck (Packet * packet)
{
uint32_t ackThrough = packet->GetAckThrough ();
// TODO: handle NACKs
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();)
{
if ((*it)->GetSeqn () <= ackThrough)
{
auto sentPacket = *it;
LogPrint ("Packet ", sentPacket->GetSeqn (), " acknowledged");
m_SentPackets.erase (it++);
delete sentPacket;
}
else
break;
}
if (m_SentPackets.empty ())
m_ResendTimer.cancel ();
}
size_t Stream::Send (const uint8_t * buf, size_t len, int timeout)
{
bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet
Packet * p = new Packet ();
uint8_t * packet = p->GetBuffer ();
// TODO: implement setters
@ -185,7 +212,10 @@ namespace stream
size += 4; // receiveStreamID
*(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber++);
size += 4; // sequenceNum
*(uint32_t *)(packet + size) = 0; // TODO
if (isNoAck)
*(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber);
else
*(uint32_t *)(packet + size) = 0;
size += 4; // ack Through
packet[size] = 0;
size++; // NACK count
@ -194,9 +224,10 @@ namespace stream
{
// initial packet
m_IsOpen = true;
*(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE |
PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED |
PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK);
uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED |
PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED;
if (isNoAck) flags |= PACKET_FLAG_NO_ACK;
*(uint16_t *)(packet + size) = htobe16 (flags);
size += 2; // flags
*(uint16_t *)(packet + size) = htobe16 (i2p::data::DEFAULT_IDENTITY_SIZE + 40 + 2); // identity + signature + packet size
size += 2; // options size
@ -310,7 +341,15 @@ namespace stream
if (packet)
{
bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ());
delete packet;
if (ret)
{
bool isEmpty = m_SentPackets.empty ();
m_SentPackets.insert (packet);
if (isEmpty)
ScheduleResend ();
}
else
delete packet;
return ret;
}
else
@ -364,6 +403,24 @@ namespace stream
return false;
}
void Stream::ScheduleResend ()
{
m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT));
m_ResendTimer.async_wait (boost::bind (&Stream::HandleResendTimer,
this, boost::asio::placeholders::error));
}
void Stream::HandleResendTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
for (auto it : m_SentPackets)
SendPacket (it->GetBuffer (), it->GetLength ());
ScheduleResend ();
}
}
void Stream::UpdateCurrentRemoteLease ()
{
if (!m_RemoteLeaseSet)

View File

@ -36,6 +36,7 @@ namespace stream
const size_t STREAMING_MTU = 1730;
const size_t MAX_PACKET_SIZE = 4096;
const size_t COMPRESSION_THRESHOLD_SIZE = 66;
const int RESEND_TIMEOUT = 10; // in seconds
struct Packet
{
@ -58,6 +59,7 @@ namespace stream
const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); };
bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; };
bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; };
};
struct PacketCmp
@ -102,6 +104,7 @@ namespace stream
void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet);
void ProcessAck (Packet * packet);
size_t ConcatenatePackets (uint8_t * buf, size_t len);
void UpdateCurrentRemoteLease ();
@ -109,6 +112,9 @@ namespace stream
template<typename Buffer, typename ReceiveHandler>
void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler);
void ScheduleResend ();
void HandleResendTimer (const boost::system::error_code& ecode);
private:
boost::asio::io_service& m_Service;
@ -121,7 +127,8 @@ namespace stream
i2p::data::Lease m_CurrentRemoteLease;
std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets;
boost::asio::deadline_timer m_ReceiveTimer;
std::set<Packet *, PacketCmp> m_SentPackets;
boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer;
};
class StreamingDestination: public i2p::data::LocalDestination