From 297afeb07b0a92e666d1e604e70c0ce55bd190ab Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 11 Aug 2014 11:31:01 -0400 Subject: [PATCH] split stream to packets --- Streaming.cpp | 113 +++++++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 0df28a7e..d6a31886 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -202,59 +202,68 @@ namespace stream size_t Stream::Send (const uint8_t * buf, size_t len, int timeout) { bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet - Packet * p = new Packet (); - uint8_t * packet = p->GetBuffer (); - // TODO: implement setters - size_t size = 0; - *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); - size += 4; // sendStreamID - *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); - size += 4; // receiveStreamID - *(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber++); - size += 4; // sequenceNum - if (isNoAck) - *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); - else - *(uint32_t *)(packet + size) = 0; - size += 4; // ack Through - packet[size] = 0; - size++; // NACK count - size++; // resend delay - if (!m_IsOpen) - { - // initial packet - m_IsOpen = true; - uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | - PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; - if (isNoAck) flags |= PACKET_FLAG_NO_ACK; - *(uint16_t *)(packet + size) = htobe16 (flags); - size += 2; // flags - *(uint16_t *)(packet + size) = htobe16 (i2p::data::DEFAULT_IDENTITY_SIZE + 40 + 2); // identity + signature + packet size - size += 2; // options size - memcpy (packet + size, &m_LocalDestination->GetIdentity (), i2p::data::DEFAULT_IDENTITY_SIZE); - size += i2p::data::DEFAULT_IDENTITY_SIZE; // from - *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); - size += 2; // max packet size - uint8_t * signature = packet + size; // set it later - memset (signature, 0, 40); // zeroes for now - size += 40; // signature - memcpy (packet + size, buf, len); - size += len; // payload - m_LocalDestination->Sign (packet, size, signature); - } - else + while (len > 0) { - // follow on packet - *(uint16_t *)(packet + size) = 0; - size += 2; // flags - *(uint16_t *)(packet + size) = 0; // no options - size += 2; // options size - memcpy (packet + size, buf, len); - size += len; // payload - } - p->len = size; - m_Service.post (boost::bind (&Stream::SendPacket, this, p)); - + Packet * p = new Packet (); + uint8_t * packet = p->GetBuffer (); + // TODO: implement setters + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber++); + size += 4; // sequenceNum + if (isNoAck) + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + else + *(uint32_t *)(packet + size) = 0; + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + if (!m_IsOpen) + { + // initial packet + m_IsOpen = true; + uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | + PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; + if (isNoAck) flags |= PACKET_FLAG_NO_ACK; + *(uint16_t *)(packet + size) = htobe16 (flags); + size += 2; // flags + *(uint16_t *)(packet + size) = htobe16 (i2p::data::DEFAULT_IDENTITY_SIZE + 40 + 2); // identity + signature + packet size + size += 2; // options size + memcpy (packet + size, &m_LocalDestination->GetIdentity (), i2p::data::DEFAULT_IDENTITY_SIZE); + size += i2p::data::DEFAULT_IDENTITY_SIZE; // from + *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); + size += 2; // max packet size + uint8_t * signature = packet + size; // set it later + memset (signature, 0, 40); // zeroes for now + size += 40; // signature + size_t sentLen = STREAMING_MTU - size; + if (len < sentLen) sentLen = len; + memcpy (packet + size, buf, sentLen); + len -= sentLen; + size += sentLen; // payload + m_LocalDestination->Sign (packet, size, signature); + } + else + { + // follow on packet + *(uint16_t *)(packet + size) = 0; + size += 2; // flags + *(uint16_t *)(packet + size) = 0; // no options + size += 2; // options size + size_t sentLen = STREAMING_MTU - size; + if (len < sentLen) sentLen = len; + memcpy (packet + size, buf, sentLen); + len -= sentLen; + size += sentLen; // payload + } + p->len = size; + m_Service.post (boost::bind (&Stream::SendPacket, this, p)); + } + return len; }