From 616b96d8018346c2c1754291e58c456cfcc45df8 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 27 Jan 2022 16:06:19 +0700 Subject: [PATCH] [core] SND prioritize original packets in live configuration --- srtcore/buffer.cpp | 9 +++ srtcore/buffer.h | 5 ++ srtcore/core.cpp | 136 ++++++++++++++++++++++++++++++++++++++++++++- srtcore/core.h | 6 ++ 4 files changed, 155 insertions(+), 1 deletion(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 1cd983be2..febd12f58 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -455,6 +455,15 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, return readlen; } +CSndBuffer::time_point CSndBuffer::peekNextOriginal() const +{ + ScopedLock bufferguard(m_BufLock); + if (m_pCurrBlock == m_pLastBlock) + return time_point(); + + return m_pCurrBlock->m_tsOriginTime; +} + int32_t CSndBuffer::getMsgNoAt(const int offset) { ScopedLock bufferguard(m_BufLock); diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 693858fe3..80a2354ff 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -152,6 +152,11 @@ class CSndBuffer SRT_ATTR_EXCLUDES(m_BufLock) int readData(CPacket& w_packet, time_point& w_origintime, int kflgs, int& w_seqnoinc); + /// Peek an information on the next original data packet to send. + /// @return origin time stamp of the next packet; epoch start time otherwise. + SRT_ATTR_EXCLUDES(m_BufLock) + time_point peekNextOriginal() const; + /// Find data position to pack a DATA packet for a retransmission. /// @param [in] offset offset from the last ACK point (backward sequence number difference) /// @param [out] packet the packet to read. diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d893c671d..6f228c138 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9178,6 +9178,131 @@ int srt::CUDT::packLostData(CPacket& w_packet, steady_clock::time_point& w_origi return 0; } +#if SRT_DEBUG_TRACE_SND +class snd_logger +{ + typedef srt::sync::steady_clock steady_clock; + +public: + snd_logger() {} + + ~snd_logger() + { + ScopedLock lck(m_mtx); + m_fout.close(); + } + + struct + { + typedef srt::sync::steady_clock steady_clock; + long long usElapsed; + steady_clock::time_point tsNow; + int usSRTT; + int usRTTVar; + int msSndBuffSpan; + int msTimespanTh; + int msNextUniqueToSend; + long long usElapsedLastDrop; + bool canRexmit; + int iPktSeqno; + bool isRetransmitted; + } state; + + void trace() + { + using namespace srt::sync; + ScopedLock lck(m_mtx); + create_file(); + + m_fout << state.usElapsed << ","; + m_fout << state.usSRTT << ","; + m_fout << state.usRTTVar << ","; + m_fout << state.msSndBuffSpan << ","; + m_fout << state.msTimespanTh << ","; + m_fout << state.msNextUniqueToSend << ","; + m_fout << state.usElapsedLastDrop << ","; + m_fout << state.canRexmit << ","; + m_fout << state.iPktSeqno << ','; + m_fout << state.isRetransmitted << '\n'; + + m_fout.flush(); + } + +private: + void print_header() + { + m_fout << "usElapsed,usSRTT,usRTTVar,msSndBuffTimespan,msTimespanTh,msNextUniqueToSend,usDLastDrop,canRexmit,sndPktSeqno,isRexmit"; + m_fout << "\n"; + } + + void create_file() + { + if (m_fout.is_open()) + return; + + m_start_time = srt::sync::steady_clock::now(); + std::string str_tnow = srt::sync::FormatTimeSys(m_start_time); + str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part + while (str_tnow.find(':') != std::string::npos) + { + str_tnow.replace(str_tnow.find(':'), 1, 1, '_'); + } + const std::string fname = "snd_trace_" + str_tnow + ".csv"; + m_fout.open(fname, std::ofstream::out); + if (!m_fout) + std::cerr << "IPE: Failed to open " << fname << "!!!\n"; + + print_header(); + } + +private: + srt::sync::Mutex m_mtx; + std::ofstream m_fout; + srt::sync::steady_clock::time_point m_start_time; +}; + +snd_logger g_snd_logger; +#endif // SRT_DEBUG_TRACE_SND + +bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED) +{ + // Prioritization of original packets only applies to Live CC. + if (!m_bPeerTLPktDrop || !m_config.bMessageAPI) + return true; + + // TODO: lock sender buffer? + const time_point tsNextPacket = m_pSndBuffer->peekNextOriginal(); + +#if SRT_DEBUG_TRACE_SND + const int buffdelay_ms = count_milliseconds(m_pSndBuffer->getBufferingDelay(tnow)); + // If there is a small loss, still better to retransmit. If timespan is already big, + // then consider sending original packets. + const int threshold_ms_min = (2 * m_iSRTT + 4 * m_iRTTVar + COMM_SYN_INTERVAL_US) / 1000; + const int msNextUniqueToSend = count_milliseconds(tnow - tsNextPacket) + m_iPeerTsbPdDelay_ms; + + g_snd_logger.state.tsNow = tnow; + g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime); + g_snd_logger.state.usSRTT = m_iSRTT; + g_snd_logger.state.usRTTVar = m_iRTTVar; + g_snd_logger.state.msSndBuffSpan = buffdelay_ms; + g_snd_logger.state.msTimespanTh = threshold_ms_min; + g_snd_logger.state.msNextUniqueToSend = msNextUniqueToSend; + g_snd_logger.state.usElapsedLastDrop = count_microseconds(tnow - m_tsLastTLDrop); + g_snd_logger.state.canRexmit = false; +#endif + + if (tsNextPacket != time_point()) + { + // Can send original packet, so just send it + return false; + } + +#if SRT_DEBUG_TRACE_SND + g_snd_logger.state.canRexmit = true; +#endif + return true; +} + std::pair srt::CUDT::packData(CPacket& w_packet) { int payload = 0; @@ -9206,7 +9331,10 @@ std::pair srt::CUDT::packData(CPacket& w_packet) if (!m_bOpened) return std::make_pair(false, enter_time); - payload = packLostData((w_packet), (origintime)); + payload = isRetransmissionAllowed(enter_time) + ? packLostData((w_packet), (origintime)) + : 0; + if (payload > 0) { reason = "reXmit"; @@ -9459,6 +9587,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet, time_point& w_origintime) } } +#if SRT_DEBUG_TRACE_SND + g_snd_logger.state.iPktSeqno = w_packet.m_iSeqNo; + g_snd_logger.state.isRetransmitted = w_packet.getRexmitFlag(); + g_snd_logger.trace(); +#endif + return true; } diff --git a/srtcore/core.h b/srtcore/core.h index caf1f9837..c660f6732 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -548,6 +548,12 @@ class CUDT SRT_ATTR_REQUIRES(m_RecvAckLock, m_StatsLock) int sndDropTooLate(); + /// @bried Allow packet retransmission. + /// Depending on the configuration mode (live / file), retransmission + /// can be blocked if e.g. there are original packets pending to be sent. + /// @return true if retransmission is allowed; false otherwise. + bool isRetransmissionAllowed(const time_point& tnow); + /// Connect to a UDT entity as per hs request. This will update /// required data in the entity, then update them also in the hs structure, /// and then send the response back to the caller.