diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index a3a48f6b..17c90f31 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -69,13 +69,14 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), - m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsWinDropped (true), + m_IsTimeOutResend (false), m_LocalDestination (local), + m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), - m_RTT (INITIAL_RTT), m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO), - m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), - m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) + m_RTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_RTO (INITIAL_RTO), + m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrewRTTSample (INITIAL_RTT), m_PrewRTT (INITIAL_RTT), m_Jitter (0), + m_LastWindowSizeIncreaseTime (0), m_PacingTime (INITIAL_PACING_TIME), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); m_RemoteIdentity = remote->GetIdentity (); @@ -83,12 +84,14 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), - m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsWinDropped (true), + m_IsTimeOutResend (false), m_LocalDestination (local), + m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), - m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), - m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) + m_WindowSize (INITIAL_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), + m_PrewRTTSample (INITIAL_RTT), m_PrewRTT (INITIAL_RTT), m_Jitter (0), + m_LastWindowSizeIncreaseTime (0), m_PacingTime (INITIAL_PACING_TIME), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); } @@ -203,6 +206,12 @@ namespace stream { // we have received duplicate LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); + if (receivedSeqn <= m_PreviousReceivedSequenceNumber || receivedSeqn == m_LastReceivedSequenceNumber) + { + m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + UpdateCurrentRemoteLease (); + } + m_PreviousReceivedSequenceNumber = receivedSeqn; SendQuickAck (); // resend ack for previous message again m_LocalDestination.DeletePacket (packet); // packet dropped } @@ -410,6 +419,7 @@ namespace stream } int rttSample = INT_MAX; bool firstRttSample = false; + m_IsNAcked = false; int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -422,6 +432,7 @@ namespace stream for (int i = 0; i < nackCount; i++) if (seqn == packet->GetNACK (i)) { + m_IsNAcked = true; nacked = true; break; } @@ -447,8 +458,8 @@ namespace stream m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; - if (m_WindowSize < WINDOW_SIZE) - m_WindowSize++; // slow start + if (m_WindowSize < MAX_WINDOW_SIZE) + m_WindowSize++; } else break; @@ -456,30 +467,58 @@ namespace stream if (rttSample != INT_MAX) { if (firstRttSample) + { m_RTT = rttSample; + m_PrewRTTSample = rttSample; + } else m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT; + // calculate jitter + int jitter = 0; + if (rttSample > m_PrewRTTSample) + jitter = rttSample - m_PrewRTTSample; + else if (rttSample < m_PrewRTTSample) + jitter = m_PrewRTTSample - rttSample; + else + jitter = std::round (rttSample / 10); // 10% + m_Jitter = std::round (RTT_EWMA_ALPHA * m_Jitter + (1.0 - RTT_EWMA_ALPHA) * jitter); + m_PrewRTTSample = rttSample; + // + // delay-based CC + if ((m_RTT > m_PrewRTT) && !m_IsWinDropped) // Drop window if RTT grows too fast, late detection + { + m_WindowSize >>= 1; // /2 + m_IsWinDropped = true; // don't drop window twice + } + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + int pacTime = std::round (m_RTT*1000/m_WindowSize); + m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + m_PrewRTT = m_RTT * 1.1 + m_Jitter; + // bool wasInitial = m_RTO == INITIAL_RTO; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter)); // TODO: implement it better + if (wasInitial) ScheduleResend (); } - if (acknowledged && m_WindowSize >= WINDOW_SIZE) + if (m_WindowSize > int(m_SentPackets.size ())) + m_IsWinDropped = false; + if (acknowledged || m_IsNAcked) { - // linear growth - if (ts > m_LastWindowSizeIncreaseTime + m_RTT) - { - m_WindowSize++; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_LastWindowSizeIncreaseTime = ts; - } + ScheduleResend (); } + if ((m_SendBuffer.IsEmpty () && m_SentPackets.size () > 0) // tail loss + || int(m_SentPackets.size ()) > m_WindowSize) // or we drop window + m_IsNAcked = true; if (firstRttSample && m_RoutingSession) m_RoutingSession->SetSharedRoutingPath ( std::make_shared ( i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); if (m_SentPackets.empty ()) + { m_ResendTimer.cancel (); + m_SendTimer.cancel (); + } if (acknowledged) { m_NumResendAttempts = 0; @@ -557,9 +596,10 @@ namespace stream void Stream::SendBuffer () { + ScheduleSend (); int numMsgs = m_WindowSize - m_SentPackets.size (); if (numMsgs <= 0) return; // window is full - + else numMsgs = 1; bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) @@ -873,6 +913,7 @@ namespace stream m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); } + if (!packet->sendTime) packet->sendTime = i2p::util::GetMillisecondsSinceEpoch (); SendPackets (std::vector { packet }); bool isEmpty = m_SentPackets.empty (); m_SentPackets.insert (packet); @@ -906,7 +947,7 @@ namespace stream m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentRemoteLease = routingPath->remoteLease; m_RTT = routingPath->rtt; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter)); // TODO: implement it better } } @@ -935,7 +976,7 @@ namespace stream if (freshTunnel) { m_RTO = INITIAL_RTO; - m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely +// m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely } std::vector msgs; @@ -988,6 +1029,40 @@ namespace stream SendQuickAck (); } + void Stream::ScheduleSend () + { + if (m_Status != eStreamStatusTerminated) + { + m_SendTimer.cancel (); + m_SendTimer.expires_from_now (boost::posix_time::microseconds(m_PacingTime)); + m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer, + shared_from_this (), std::placeholders::_1)); + } + } + + void Stream::HandleSendTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + if (m_IsNAcked) // || m_WindowSize < int(m_SentPackets.size ())) // resend one packet + ResendPacket (); + // delay-based CC + else if (!m_IsWinDropped && int(m_SentPackets.size ()) == m_WindowSize) // we sending packets too fast, early detection + { + m_WindowSize >>= 1; // /2 + m_IsWinDropped = true; // don't drop window twice + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + int pacTime = std::round (m_RTT*1000/m_WindowSize); + m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + } + // + else if (m_WindowSize > int(m_SentPackets.size ())) // send one packet + SendBuffer (); + else // pass + ScheduleSend (); + } + } + void Stream::ScheduleResend () { if (m_Status != eStreamStatusTerminated) @@ -1005,6 +1080,16 @@ namespace stream { if (ecode != boost::asio::error::operation_aborted) { + if (m_RTO > INITIAL_RTO) m_RTO = INITIAL_RTO; + m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit + m_IsTimeOutResend = true; + m_IsNAcked = false; + ResendPacket (); // send one packet per RTO, waiting for ack + } + } + + void Stream::ResendPacket () + { // check for resend attempts if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) { @@ -1021,27 +1106,42 @@ namespace stream { if (ts >= it->sendTime + m_RTO) { - it->resent = true; + if (ts < it->sendTime + m_RTO*2) + it->resent = true; + else + it->resent = false; it->sendTime = ts; packets.push_back (it); + if (packets.size () >= 1) break; } } - + // select tunnels if necessary and send if (packets.size () > 0) { - m_NumResendAttempts++; + if (m_IsNAcked) m_NumResendAttempts = 1; + else if (m_IsTimeOutResend) m_NumResendAttempts++; if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO) { - // congestion avoidance - m_RTO *= 2; - m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1 - if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + // loss-based CC + if (!m_IsWinDropped) + { + m_WindowSize >>= 1; // /2 + m_IsWinDropped = true; // don't drop window twice + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + int pacTime = std::round (m_RTT*1000/m_WindowSize); + m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + } + // } - else + else if (m_IsTimeOutResend) { - m_TunnelsChangeSequenceNumber = m_SequenceNumber; + m_IsTimeOutResend = false; m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change + m_WindowSize = INITIAL_WINDOW_SIZE; + m_IsWinDropped = true; + int pacTime = std::round (m_RTT*1000/m_WindowSize); + m_PacingTime = std::max (MIN_PACING_TIME, pacTime); if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); if (m_NumResendAttempts & 1) { @@ -1058,9 +1158,11 @@ namespace stream } } SendPackets (packets); + if (m_IsNAcked) ScheduleSend (); } - ScheduleResend (); - } + else + ScheduleSend (); + if (!m_IsNAcked) ScheduleResend (); } void Stream::ScheduleAck (int timeout) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 94a21240..16b7c087 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -52,15 +52,17 @@ namespace stream const size_t STREAMING_MTU_RATCHETS = 1812; const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; - const int MAX_NUM_RESEND_ATTEMPTS = 9; - const int WINDOW_SIZE = 6; // in messages + const int MAX_NUM_RESEND_ATTEMPTS = 10; + const int MAX_STREAM_SPEED = 1730000000; // 1 - 1730000000 // in bytes/sec // no more than 1.73 Gbytes/s + const int MIN_PACING_TIME = 1000000 * STREAMING_MTU / MAX_STREAM_SPEED; // in microseconds + const int INITIAL_WINDOW_SIZE = 10; const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; - const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 - const double RTT_EWMA_ALPHA = 0.125; + const double RTT_EWMA_ALPHA = 0.9; const int MIN_RTO = 20; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds + const int INITIAL_PACING_TIME = 1000 * INITIAL_RTT / INITIAL_WINDOW_SIZE; // in microseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds const int SYN_TIMEOUT = 200; // how long we wait for SYN after follow-on, in milliseconds const size_t MAX_PENDING_INCOMING_BACKLOG = 128; @@ -231,8 +233,11 @@ namespace stream template void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout); + void ScheduleSend (); + void HandleSendTimer (const boost::system::error_code& ecode); void ScheduleResend (); void HandleResendTimer (const boost::system::error_code& ecode); + void ResendPacket (); void ScheduleAck (int timeout); void HandleAckSendTimer (const boost::system::error_code& ecode); @@ -242,8 +247,12 @@ namespace stream uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; uint32_t m_TunnelsChangeSequenceNumber; int32_t m_LastReceivedSequenceNumber; + int32_t m_PreviousReceivedSequenceNumber; StreamStatus m_Status; bool m_IsAckSendScheduled; + bool m_IsNAcked; + bool m_IsWinDropped; + bool m_IsTimeOutResend; StreamingDestination& m_LocalDestination; std::shared_ptr m_RemoteIdentity; std::shared_ptr m_TransientVerifier; // in case of offline key @@ -254,14 +263,14 @@ namespace stream std::queue m_ReceiveQueue; std::set m_SavedPackets; std::set m_SentPackets; - boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer; + boost::asio::deadline_timer m_ReceiveTimer, m_SendTimer, m_ResendTimer, m_AckSendTimer; size_t m_NumSentBytes, m_NumReceivedBytes; uint16_t m_Port; SendBufferQueue m_SendBuffer; double m_RTT; - int m_WindowSize, m_RTO, m_AckDelay; - uint64_t m_LastWindowSizeIncreaseTime; + int m_WindowSize, m_RTO, m_AckDelay, m_PrewRTTSample, m_PrewRTT, m_Jitter; + uint64_t m_LastWindowSizeIncreaseTime, m_PacingTime; int m_NumResendAttempts; size_t m_MTU; };