diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 6f95525e..9285b05d 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -980,6 +980,7 @@ namespace client LeaseSetDestination (service, isPublic, params), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY), m_StreamingOutboundSpeed (DEFAULT_MAX_OUTBOUND_SPEED), + m_StreamingInboundSpeed (DEFAULT_MAX_INBOUND_SPEED), m_IsStreamingAnswerPings (DEFAULT_ANSWER_PINGS), m_LastPort (0), m_DatagramDestination (nullptr), m_RefCounter (0), m_LastPublishedTimestamp (0), m_ReadyChecker(service) @@ -1051,6 +1052,9 @@ namespace client it = params->find (I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED); if (it != params->end ()) m_StreamingOutboundSpeed = std::stoi(it->second); + it = params->find (I2CP_PARAM_STREAMING_MAX_INBOUND_SPEED); + if (it != params->end ()) + m_StreamingInboundSpeed = std::stoi(it->second); it = params->find (I2CP_PARAM_STREAMING_ANSWER_PINGS); if (it != params->end ()) m_IsStreamingAnswerPings = std::stoi (it->second); // 1 for true diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 43c9ed81..9dcc64c6 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -86,6 +86,8 @@ namespace client const int DEFAULT_INITIAL_ACK_DELAY = 200; // milliseconds const char I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED[] = "i2p.streaming.maxOutboundSpeed"; // bytes/sec const int DEFAULT_MAX_OUTBOUND_SPEED = 1730000000; // no more than 1.73 Gbytes/s + const char I2CP_PARAM_STREAMING_MAX_INBOUND_SPEED[] = "i2p.streaming.maxInboundSpeed"; // bytes/sec + const int DEFAULT_MAX_INBOUND_SPEED = 1730000000; // no more than 1.73 Gbytes/s const char I2CP_PARAM_STREAMING_ANSWER_PINGS[] = "i2p.streaming.answerPings"; const int DEFAULT_ANSWER_PINGS = true; @@ -262,6 +264,7 @@ namespace client void AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor); int GetStreamingAckDelay () const { return m_StreamingAckDelay; } int GetStreamingOutboundSpeed () const { return m_StreamingOutboundSpeed; } + int GetStreamingInboundSpeed () const { return m_StreamingInboundSpeed; } bool IsStreamingAnswerPings () const { return m_IsStreamingAnswerPings; } // datagram @@ -300,6 +303,7 @@ namespace client int m_StreamingAckDelay; int m_StreamingOutboundSpeed; + int m_StreamingInboundSpeed; bool m_IsStreamingAnswerPings; std::shared_ptr m_StreamingDestination; // default std::map > m_StreamingDestinationsByPorts; diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index e010ac83..bbb9d511 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -70,6 +70,7 @@ namespace stream std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), + m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsTimeOutResend (false), m_LocalDestination (local), @@ -78,18 +79,25 @@ namespace stream m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_PrevRTT (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_DropWindowDelayTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_DropWindowDelayTime (0), + m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed + m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); m_RemoteIdentity = remote->GetIdentity (); auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed (); if (outboundSpeed) m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed; + + auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed + if (inboundSpeed) + m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed; } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), + m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsTimeOutResend (false), m_LocalDestination (local), @@ -98,12 +106,18 @@ namespace stream m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_PrevRTT (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_DropWindowDelayTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_DropWindowDelayTime (0), + m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed + m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed (); if (outboundSpeed) - m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed; + m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed; + + auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed + if (inboundSpeed) + m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed; } Stream::~Stream () @@ -754,10 +768,26 @@ namespace stream void Stream::SendQuickAck () { int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber; + // for limit inbound speed + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + int numPackets = 0; + int64_t passedTime = m_PacketACKInterval * INITIAL_WINDOW_SIZE; // in microseconds // while m_LastACKSendTime == 0 + if (m_LastACKSendTime) + passedTime = (ts - m_LastACKSendTime)*1000; // in microseconds + numPackets = (passedTime + m_PacketACKIntervalRem) / m_PacketACKInterval; + m_PacketACKIntervalRem = (passedTime + m_PacketACKIntervalRem) - (numPackets * m_PacketACKInterval); + if (m_LastConfirmedReceivedSequenceNumber + numPackets < m_LastReceivedSequenceNumber) + lastReceivedSeqn = m_LastConfirmedReceivedSequenceNumber + numPackets; + if (numPackets == 0) return; + // for limit inbound speed if (!m_SavedPackets.empty ()) { - int32_t seqn = (*m_SavedPackets.rbegin ())->GetSeqn (); - if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn; + for (auto it: m_SavedPackets) + { + auto seqn = it->GetSeqn (); + if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn)) break; // for limit inbound speed + if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn; + } } if (lastReceivedSeqn < 0) { @@ -786,6 +816,11 @@ namespace stream for (auto it: m_SavedPackets) { auto seqn = it->GetSeqn (); + if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn)) // for limit inbound speed + { + htobe32buf (packet + 12, nextSeqn - 1); + break; + } if (numNacks + (seqn - nextSeqn) >= 256) { LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); @@ -827,6 +862,8 @@ namespace stream p.len = size; SendPackets (std::vector { &p }); + m_LastACKSendTime = ts; // for limit inbound speed + m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 22dad0e3..b1efcb60 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -249,6 +249,7 @@ namespace stream uint32_t m_TunnelsChangeSequenceNumber; int32_t m_LastReceivedSequenceNumber; int32_t m_PreviousReceivedSequenceNumber; + int32_t m_LastConfirmedReceivedSequenceNumber; // for limit inbound speed StreamStatus m_Status; bool m_IsAckSendScheduled; bool m_IsNAcked; @@ -277,6 +278,7 @@ namespace stream float m_WindowSize, m_LastWindowDropSize; int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample, m_PrevRTT, m_Jitter; uint64_t m_MinPacingTime, m_PacingTime, m_PacingTimeRem, m_DropWindowDelayTime, m_LastSendTime; // microseconds + uint64_t m_LastACKSendTime, m_PacketACKInterval, m_PacketACKIntervalRem; // for limit inbound speed int m_NumResendAttempts, m_NumPacketsToSend; size_t m_MTU; }; diff --git a/libi2pd_client/ClientContext.cpp b/libi2pd_client/ClientContext.cpp index 3dfc6040..4f91c564 100644 --- a/libi2pd_client/ClientContext.cpp +++ b/libi2pd_client/ClientContext.cpp @@ -472,6 +472,7 @@ namespace client options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MAX_TUNNEL_LATENCY, DEFAULT_MAX_TUNNEL_LATENCY); options[I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY] = GetI2CPOption(section, I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY); options[I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED] = GetI2CPOption(section, I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED, DEFAULT_MAX_OUTBOUND_SPEED); + options[I2CP_PARAM_STREAMING_MAX_INBOUND_SPEED] = GetI2CPOption(section, I2CP_PARAM_STREAMING_MAX_INBOUND_SPEED, DEFAULT_MAX_INBOUND_SPEED); options[I2CP_PARAM_STREAMING_ANSWER_PINGS] = GetI2CPOption(section, I2CP_PARAM_STREAMING_ANSWER_PINGS, isServer ? DEFAULT_ANSWER_PINGS : false); options[I2CP_PARAM_LEASESET_TYPE] = GetI2CPOption(section, I2CP_PARAM_LEASESET_TYPE, DEFAULT_LEASESET_TYPE); std::string encType = GetI2CPStringOption(section, I2CP_PARAM_LEASESET_ENCRYPTION_TYPE, "0,4");