From a31e618e93ffa80afe57dfef0ea7d590ac380c3a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 21 Jan 2022 16:45:17 +0700 Subject: [PATCH] [core] Refactoring: added packUniqueData(..) func --- srtcore/buffer.cpp | 37 +++---- srtcore/core.cpp | 256 +++++++++++++++++++++---------------------- srtcore/core.h | 16 ++- srtcore/queue.cpp | 4 +- test/test_buffer.cpp | 2 +- 5 files changed, 154 insertions(+), 161 deletions(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index d07080981..4d95fe6ca 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -410,30 +410,19 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, w_packet.setLength(readlen); w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo; - // XXX This is probably done because the encryption should happen - // just once, and so this sets the encryption flags to both msgno bitset - // IN THE PACKET and IN THE BLOCK. This is probably to make the encryption - // happen at the time when scheduling a new packet to send, but the packet - // must remain in the send buffer until it's ACKed. For the case of rexmit - // the packet will be taken "as is" (that is, already encrypted). - // - // The problem is in the order of things: - // 0. When the application stores the data, some of the flags for PH_MSGNO are set. - // 1. The readData() is called to get the original data sent by the application. - // 2. The data are original and must be encrypted. They WILL BE encrypted, later. - // 3. So far we are in readData() so the encryption flags must be updated NOW because - // later we won't have access to the block's data. - // 4. After exiting from readData(), the packet is being encrypted. It's immediately - // sent, however the data must remain in the sending buffer until they are ACKed. - // 5. In case when rexmission is needed, the second overloaded version of readData - // is being called, and the buffer + PH_MSGNO value is extracted. All interesting - // flags must be present and correct at that time. - // - // The only sensible way to fix this problem is to encrypt the packet not after - // extracting from here, but when the packet is stored into CSndBuffer. The appropriate - // flags for PH_MSGNO will be applied directly there. Then here the value for setting - // PH_MSGNO will be set as is. - + // 1. On submission (addBuffer), the KK flag is set to EK_NOENC (0). + // 2. The readData() is called to get the original (unique) payload not ever sent yet. + // The payload must be encrypted for the first time if the encryption + // is enabled (arg kflgs != EK_NOENC). The KK encryption flag of the data packet + // header must be set and remembered accordingly (see EncryptionKeySpec). + // 3. The next time this packet is read (only for retransmission), the payload is already + // encrypted, and the proper flag value is already stored. + + // TODO: Alternatively, encryption could happen before the packet is submitted to the buffer + // (before the addBuffer() call), and corresponding flags could be set accordingly. + // This may also put an encryption burden on the application thread, rather than the sending thread, + // which could be more efficient. Note that packet sequence number must be properly set in that case, + // as it is used as a counter for the AES encryption. if (kflgs == -1) { HLOGC(bslog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING."); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 4a5486b82..483836599 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9189,7 +9189,7 @@ int srt::CUDT::packLostData(CPacket& w_packet, steady_clock::time_point& w_origi return 0; } -std::pair srt::CUDT::packData(CPacket& w_packet) +std::pair srt::CUDT::packData(CPacket& w_packet) { int payload = 0; bool probe = false; @@ -9197,8 +9197,6 @@ std::pair srt::CUDT::packData(CPacket& w_packet) bool new_packet_packed = false; bool filter_ctl_pkt = false; - int kflg = EK_NOENC; - const steady_clock::time_point enter_time = steady_clock::now(); if (!is_zero(m_tsNextSendTime) && enter_time > m_tsNextSendTime) @@ -9217,7 +9215,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) // start the dissolving process, this process will // not be started until this function is finished. if (!m_bOpened) - return std::make_pair(0, enter_time); + return std::make_pair(false, enter_time); payload = packLostData((w_packet), (origintime)); if (payload > 0) @@ -9233,122 +9231,24 @@ std::pair srt::CUDT::packData(CPacket& w_packet) filter_ctl_pkt = true; // Mark that this packet ALREADY HAS timestamp field and it should not be set // Stats - { - ScopedLock lg(m_StatsLock); - m_stats.sndr.sentFilterExtra.count(1); - } + ScopedLock lg(m_StatsLock); + m_stats.sndr.sentFilterExtra.count(1); } else { - // If no loss, and no packetfilter control packet, pack a new packet. - - // Check the congestion/flow window limit - const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); - const int flightspan = getFlightSpan(); - if (cwnd > flightspan) - { - // XXX Here it's needed to set kflg to msgno_bitset in the block stored in the - // send buffer. This should be somehow avoided, the crypto flags should be set - // together with encrypting, and the packet should be sent as is, when rexmitting. - // It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field - // isn't a useless redundant state copy. If it is, then taking the flags here can be removed. - kflg = m_pCryptoControl->getSndCryptoFlags(); - int pktskipseqno = 0; - payload = m_pSndBuffer->readData((w_packet), (origintime), kflg, (pktskipseqno)); - if (pktskipseqno) - { - // Some packets were skipped due to TTL expiry. - m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno); - } - - if (payload) - { - // A CHANGE. The sequence number is currently added to the packet - // when scheduling, not when extracting. This is a inter-migration form, - // so still override the value, but trace it. - m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); - - // Do this checking only for groups and only at the very first moment, - // when there's still nothing in the buffer. Otherwise there will be - // a serious data discrepancy between the agent and the peer. - // After increasing by 1, but being previously set as ISN-1, this should be == ISN, - // if this is the very first packet to send. -#if ENABLE_EXPERIMENTAL_BONDING - // Fortunately here is only the procedure that verifies if the extraction - // sequence is moved due to the difference between ISN caught during the existing - // transmission and the first sequence possible to be used at the first sending - // instruction. The group itself isn't being accessed. - if (m_parent->m_GroupOf && m_iSndCurrSeqNo != w_packet.m_iSeqNo && m_iSndCurrSeqNo == m_iISN) - { - const int packetspan = CSeqNo::seqcmp(w_packet.m_iSeqNo, m_iSndCurrSeqNo); - - HLOGC(qslog.Debug, log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo - << " from SCHEDULING sequence " << w_packet.m_iSeqNo - << " DIFF: " << packetspan << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); - - // This is the very first packet to be sent; so there's nothing in - // the sending buffer yet, and therefore we are in a situation as just - // after connection. No packets in the buffer, no packets are sent, - // no ACK to be awaited. We can screw up all the variables that are - // initialized from ISN just after connection. - // - // Additionally send the drop request to the peer so that it - // won't stupidly request the packets to be retransmitted. - // Don't do it if the difference isn't positive or exceeds the threshold. - if (packetspan > 0) - { - int32_t seqpair[2]; - seqpair[0] = m_iSndCurrSeqNo; - seqpair[1] = w_packet.m_iSeqNo; - HLOGC(qslog.Debug, log << "... sending INITIAL DROP (ISN FIX): " - << "msg=" << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " SEQ:" - << seqpair[0] << " - " << seqpair[1] << "(" << packetspan << " packets)"); - sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair)); - - // In case when this message is lost, the peer will still get the - // UMSG_DROPREQ message when the agent realizes that the requested - // packet are not present in the buffer (preadte the send buffer). - } - } - else -#endif - { - HLOGC(qslog.Debug, log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo - << " over SCHEDULING sequence " << w_packet.m_iSeqNo - << " DIFF: " << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo) - << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); - -#if ENABLE_EXPERIMENTAL_BONDING - HLOGC(qslog.Debug, log << "... CONDITION: IN GROUP: " << (m_parent->m_GroupOf ? "yes":"no") - << " extraction-seq=" << m_iSndCurrSeqNo << " scheduling-seq=" << w_packet.m_iSeqNo << " ISN=" << m_iISN); -#endif - - // Do this always when not in a group, - w_packet.m_iSeqNo = m_iSndCurrSeqNo; - } - - // every 16 (0xF) packets, a packet pair is sent - if ((w_packet.m_iSeqNo & PUMASK_SEQNO_PROBE) == 0) - probe = true; - - new_packet_packed = true; - } - else - { - m_tsNextSendTime = steady_clock::time_point(); - m_tdSendTimeDiff = steady_clock::duration(); - return std::make_pair(0, enter_time); - } - } - else + if (!packUniqueData(w_packet, origintime)) { - HLOGC(qslog.Debug, log << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow - << ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan); m_tsNextSendTime = steady_clock::time_point(); m_tdSendTimeDiff = steady_clock::duration(); - return std::make_pair(0, enter_time); + return std::make_pair(false, enter_time); } + new_packet_packed = true; + + // every 16 (0xF) packets, a packet pair is sent + if ((w_packet.m_iSeqNo & PUMASK_SEQNO_PROBE) == 0) + probe = true; + payload = (int) w_packet.getLength(); reason = "normal"; } @@ -9389,23 +9289,6 @@ std::pair srt::CUDT::packData(CPacket& w_packet) w_packet.m_iID = m_PeerID; - /* Encrypt if 1st time this packet is sent and crypto is enabled */ - if (kflg) - { - // XXX Encryption flags are already set on the packet before calling this. - // See readData() above. - if (m_pCryptoControl->encrypt((w_packet))) - { - // Encryption failed - //>>Add stats for crypto failure - LOGC(qslog.Warn, log << "ENCRYPT FAILED - packet won't be sent, size=" << payload); - // Encryption failed - return std::make_pair(-1, enter_time); - } - payload = (int) w_packet.getLength(); /* Cipher may change length */ - reason += " (encrypted)"; - } - if (new_packet_packed && m_PacketFilter) { HLOGC(qslog.Debug, log << "filter: Feeding packet for source clip"); @@ -9472,7 +9355,120 @@ std::pair srt::CUDT::packData(CPacket& w_packet) #endif } - return std::make_pair(payload, m_tsNextSendTime); + return std::make_pair(payload >= 0, m_tsNextSendTime); +} + +bool srt::CUDT::packUniqueData(CPacket& w_packet, time_point& w_origintime) +{ + // Check the congestion/flow window limit + const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); + const int flightspan = getFlightSpan(); + if (cwnd <= flightspan) + { + HLOGC(qslog.Debug, log << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow + << ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan); + return false; + } + + // XXX Here it's needed to set kflg to msgno_bitset in the block stored in the + // send buffer. This should be somehow avoided, the crypto flags should be set + // together with encrypting, and the packet should be sent as is, when rexmitting. + // It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field + // isn't a useless redundant state copy. If it is, then taking the flags here can be removed. + const int kflg = m_pCryptoControl->getSndCryptoFlags(); + int pktskipseqno = 0; + const int pld_size = m_pSndBuffer->readData((w_packet), (w_origintime), kflg, (pktskipseqno)); + if (pktskipseqno) + { + // Some packets were skipped due to TTL expiry. + m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno); + } + + if (pld_size == 0) + { + return false; + } + + // A CHANGE. The sequence number is currently added to the packet + // when scheduling, not when extracting. This is a inter-migration form, + // so still override the value, but trace it. + m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); + + // Do this checking only for groups and only at the very first moment, + // when there's still nothing in the buffer. Otherwise there will be + // a serious data discrepancy between the agent and the peer. + // After increasing by 1, but being previously set as ISN-1, this should be == ISN, + // if this is the very first packet to send. +#if ENABLE_EXPERIMENTAL_BONDING + // Fortunately here is only the procedure that verifies if the extraction + // sequence is moved due to the difference between ISN caught during the existing + // transmission and the first sequence possible to be used at the first sending + // instruction. The group itself isn't being accessed. + if (m_parent->m_GroupOf && m_iSndCurrSeqNo != w_packet.m_iSeqNo && m_iSndCurrSeqNo == m_iISN) + { + const int packetspan = CSeqNo::seqcmp(w_packet.m_iSeqNo, m_iSndCurrSeqNo); + + HLOGC(qslog.Debug, log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo + << " from SCHEDULING sequence " << w_packet.m_iSeqNo + << " DIFF: " << packetspan << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); + + // This is the very first packet to be sent; so there's nothing in + // the sending buffer yet, and therefore we are in a situation as just + // after connection. No packets in the buffer, no packets are sent, + // no ACK to be awaited. We can screw up all the variables that are + // initialized from ISN just after connection. + // + // Additionally send the drop request to the peer so that it + // won't stupidly request the packets to be retransmitted. + // Don't do it if the difference isn't positive or exceeds the threshold. + if (packetspan > 0) + { + int32_t seqpair[2]; + seqpair[0] = m_iSndCurrSeqNo; + seqpair[1] = w_packet.m_iSeqNo; + HLOGC(qslog.Debug, log << "... sending INITIAL DROP (ISN FIX): " + << "msg=" << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " SEQ:" + << seqpair[0] << " - " << seqpair[1] << "(" << packetspan << " packets)"); + sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair)); + + // In case when this message is lost, the peer will still get the + // UMSG_DROPREQ message when the agent realizes that the requested + // packet are not present in the buffer (preadte the send buffer). + } + } + else +#endif + { + HLOGC(qslog.Debug, log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo + << " over SCHEDULING sequence " << w_packet.m_iSeqNo + << " DIFF: " << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo) + << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); + +#if ENABLE_EXPERIMENTAL_BONDING + HLOGC(qslog.Debug, log << "... CONDITION: IN GROUP: " << (m_parent->m_GroupOf ? "yes":"no") + << " extraction-seq=" << m_iSndCurrSeqNo << " scheduling-seq=" << w_packet.m_iSeqNo << " ISN=" << m_iISN); +#endif + + // Do this always when not in a group, + w_packet.m_iSeqNo = m_iSndCurrSeqNo; + } + + // Encrypt if 1st time this packet is sent and crypto is enabled + if (kflg != EK_NOENC) + { + // Note that the packet header must have a valid seqno set, as it is used as a counter for encryption. + // Other fields of the data packet header (e.g. timestamp, destination socket ID) are not used for the counter. + // Cypher may change packet length! + if (m_pCryptoControl->encrypt((w_packet))) + { + // Encryption failed + //>>Add stats for crypto failure + LOGC(qslog.Warn, log << "ENCRYPT FAILED - packet won't be sent, size=" << pld_size); + return -1; + } + } + + return true; } // This is a close request, but called from the diff --git a/srtcore/core.h b/srtcore/core.h index adf35b35b..c25e9c8c1 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -1030,15 +1030,23 @@ class CUDT /// @return payload size on success, <=0 on failure int packLostData(CPacket &packet, time_point &origintime); + /// Pack a unique data packet (never sent so far) in CPacket for sending. + /// + /// @param packet [in, out] a CPacket structure to fill. + /// @param origintime [in, out] origin timestamp of the packet. + /// + /// @return true if a packet has been packets; false otherwise. + bool packUniqueData(CPacket& packet, time_point& origintime); + /// Pack in CPacket the next data to be send. /// /// @param packet [in, out] a CPacket structure to fill /// - /// @return A pair of values is returned (payload, timestamp). - /// The payload tells the size of the payload, packed in CPacket. + /// @return A pair of values is returned (is_payload_valid, timestamp). + /// If is_payload_valid is false, there was nothing packed for sending, + /// and the timestamp value should be ignored. /// The timestamp is the full source/origin timestamp of the data. - /// If payload is <= 0, consider the timestamp value invalid. - std::pair packData(CPacket& packet); + std::pair packData(CPacket& packet); int processData(CUnit* unit); void processClose(); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 62d158af7..1e57b3021 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -635,10 +635,10 @@ void* srt::CSndQueue::worker(void* param) // pack a packet from the socket CPacket pkt; - const std::pair res_time = u->packData((pkt)); + const std::pair res_time = u->packData((pkt)); // Check if payload size is invalid. - if (res_time.first <= 0) + if (res_time.first == false) { #if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lNotReadyPop++; diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp index 2ba636525..edef007b6 100644 --- a/test/test_buffer.cpp +++ b/test/test_buffer.cpp @@ -465,7 +465,7 @@ TEST_F(CRcvBufferReadMsg, SmallReadBuffer) } // BUG!!! -// Checks signalling of read-readiness of a half-acknowledged message. +// Checks signaling of read-readiness of a half-acknowledged message. // The RCV buffer implementation has an issue here: when only half of the message is // acknowledged, the RCV buffer signals read-readiness, even though // the message can't be read, and reading returns 0.