diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 20f5da3d8..660f2b335 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -817,9 +817,6 @@ CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts) , m_iAckedPktsCount(0) , m_iAckedBytesCount(0) , m_uAvgPayloadSz(7 * 188) - , m_bTsbPdMode(false) - , m_tdTsbPdDelay(0) - , m_bTsbPdWrapCheck(false) { m_pUnit = new CUnit*[m_iSize]; for (int i = 0; i < m_iSize; ++i) @@ -909,7 +906,8 @@ int CRcvBuffer::readBuffer(char* data, int len) int rs = len; IF_HEAVY_LOGGING(char* begin = data); - const steady_clock::time_point now = (m_bTsbPdMode ? steady_clock::now() : steady_clock::time_point()); + const bool bTsbPdEnabled = m_tsbpd.isEnabled(); + const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point()); HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack); while ((p != lastack) && (rs > 0)) @@ -922,7 +920,7 @@ int CRcvBuffer::readBuffer(char* data, int len) const CPacket& pkt = m_pUnit[p]->m_Packet; - if (m_bTsbPdMode) + if (bTsbPdEnabled) { HLOGC(brlog.Debug, log << CONID() << "readBuffer: chk if time2play:" @@ -1427,7 +1425,7 @@ bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t& { w_tsbpdtime = steady_clock::time_point(); - if (m_bTsbPdMode) + if (m_tsbpd.isEnabled()) { const CPacket* pkt = getRcvReadyPacket(seqdistance); if (!pkt) @@ -1649,7 +1647,7 @@ void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now) int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan) { timespan = 0; - if (m_bTsbPdMode) + if (m_tsbpd.isEnabled()) { // Get a valid startpos. // Skip invalid entries in the beginning, if any. @@ -1731,306 +1729,45 @@ void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag) m_pUnit[i]->m_iFlag = CUnit::DROPPED; } -steady_clock::time_point CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us) -{ - /* - * Packet timestamps wrap around every 01h11m35s (32-bit in usec) - * When added to the peer start time (base time), - * wrapped around timestamps don't provide a valid local packet delevery time. - * - * A wrap check period starts 30 seconds before the wrap point. - * In this period, timestamps smaller than 30 seconds are considered to have wrapped around (then adjusted). - * The wrap check period ends 30 seconds after the wrap point, afterwhich time base has been adjusted. - */ - int64_t carryover = 0; - - // This function should generally return the timebase for the given timestamp_us. - // It's assumed that the timestamp_us, for which this function is being called, - // is received as monotonic clock. This function then traces the changes in the - // timestamps passed as argument and catches the moment when the 64-bit timebase - // should be increased by a "segment length" (MAX_TIMESTAMP+1). - - // The checks will be provided for the following split: - // [INITIAL30][FOLLOWING30]....[LAST30] <-- == CPacket::MAX_TIMESTAMP - // - // The following actions should be taken: - // 1. Check if this is [LAST30]. If so, ENTER TSBPD-wrap-check state - // 2. Then, it should turn into [INITIAL30] at some point. If so, use carryover MAX+1. - // 3. Then it should switch to [FOLLOWING30]. If this is detected, - // - EXIT TSBPD-wrap-check state - // - save the carryover as the current time base. - - if (m_bTsbPdWrapCheck) - { - // Wrap check period. - - if (timestamp_us < TSBPD_WRAP_PERIOD) - { - carryover = int64_t(CPacket::MAX_TIMESTAMP) + 1; - } - // timestamp_us >= TSBPD_WRAP_PERIOD - else if (timestamp_us <= (TSBPD_WRAP_PERIOD * 2)) - { - /* Exiting wrap check period (if for packet delivery head) */ - m_bTsbPdWrapCheck = false; - m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1); - LOGC(tslog.Debug, - log << "tsbpd wrap period ends with ts=" << timestamp_us << " - NEW TIME BASE: " - << FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us"); - } - } - // Check if timestamp_us is in the last 30 seconds before reaching the MAX_TIMESTAMP. - else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD)) - { - /* Approching wrap around point, start wrap check period (if for packet delivery head) */ - m_bTsbPdWrapCheck = true; - LOGC(tslog.Debug, - log << "tsbpd wrap period begins with ts=" << timestamp_us << " drift: " << m_DriftTracer.drift() - << "us."); - } - - return (m_tsTsbPdTimeBase + microseconds_from(carryover)); -} - void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase, bool wrp, uint32_t delay, const steady_clock::duration& udrift) { - // Same as setRcvTsbPdMode, but predicted to be used for group members. - // This synchronizes the time from the INTERNAL TIMEBASE of an existing - // socket's internal timebase. This is required because the initial time - // base stays always the same, whereas the internal timebase undergoes - // adjustment as the 32-bit timestamps in the sockets wrap. The socket - // newly added to the group must get EXACTLY the same internal timebase - // or otherwise the TsbPd time calculation will ship different results - // on different sockets. - - m_bTsbPdMode = true; - - m_tsTsbPdTimeBase = timebase; - m_bTsbPdWrapCheck = wrp; - m_tdTsbPdDelay = microseconds_from(delay); - m_DriftTracer.forceDrift(count_microseconds(udrift)); + m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift); } void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase, bool wrp, const steady_clock::duration& udrift) { - // This is only when a drift was updated on one of the group members. - HLOGC(brlog.Debug, - log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift) - << " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase)); - - m_tsTsbPdTimeBase = timebase; - m_bTsbPdWrapCheck = wrp; - - m_DriftTracer.forceDrift(count_microseconds(udrift)); + m_tsbpd.applyGroupDrift(timebase, wrp, udrift); } -bool CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, steady_clock::duration& w_udrift) +void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift) { - w_timebase = m_tsTsbPdTimeBase; - w_udrift = microseconds_from(m_DriftTracer.drift()); - return m_bTsbPdWrapCheck; -} - -steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t timestamp) -{ - const steady_clock::time_point time_base = getTsbPdTimeBase(timestamp); - - // Display only ingredients, not the result, as the result will - // be displayed anyway in the next logs. - HLOGC(brlog.Debug, - log << "getPktTsbPdTime: TIMEBASE=" << FormatTime(time_base) << " + dTS=" << timestamp - << "us + LATENCY=" << FormatDuration(m_tdTsbPdDelay) << " + uDRIFT=" << m_DriftTracer.drift()); - return (time_base + m_tdTsbPdDelay + microseconds_from(timestamp + m_DriftTracer.drift())); + return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift); } -int CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay) +steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp) { - m_bTsbPdMode = true; - m_bTsbPdWrapCheck = false; - - // Timebase passed here comes is calculated as: - // >>> CTimer::getTime() - ctrlpkt->m_iTimeStamp - // where ctrlpkt is the packet with SRT_CMD_HSREQ message. - // - // This function is called in the HSREQ reception handler only. - m_tsTsbPdTimeBase = timebase; - // XXX Seems like this may not work correctly. - // At least this solution this way won't work with application-supplied - // timestamps. For that case the timestamps should be taken exclusively - // from the data packets because in case of application-supplied timestamps - // they come from completely different server and undergo different rules - // of network latency and drift. - m_tdTsbPdDelay = delay; - return 0; -} - -#ifdef SRT_DEBUG_TSBPD_DRIFT -void CRcvBuffer::printDriftHistogram(int64_t iDrift) -{ - /* - * Build histogram of drift values - * First line (ms): <=-10.0 -9.0 ... -1.0 - 0.0 + 1.0 ... 9.0 >=10.0 - * Second line (ms): -0.9 ... -0.1 - 0.0 + 0.1 ... 0.9 - * 0 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 1 0 0 0 0 0 0 - * 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 0 0 0 0 0 0 - */ - iDrift /= 100; // uSec to 100 uSec (0.1ms) - if (-10 < iDrift && iDrift < 10) - { - /* Fill 100us histogram -900 .. 900 us 100 us increments */ - m_TsbPdDriftHisto100us[10 + iDrift]++; - } - else - { - /* Fill 1ms histogram <=-10.0, -9.0 .. 9.0, >=10.0 ms in 1 ms increments */ - iDrift /= 10; // 100uSec to 1ms - if (-10 < iDrift && iDrift < 10) - m_TsbPdDriftHisto1ms[10 + iDrift]++; - else if (iDrift <= -10) - m_TsbPdDriftHisto1ms[0]++; - else - m_TsbPdDriftHisto1ms[20]++; - } - ++m_iTsbPdDriftNbSamples; - if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0) - { - int* histo = m_TsbPdDriftHisto1ms; - - fprintf(stderr, - "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ", - histo[0], - histo[1], - histo[2], - histo[3], - histo[4], - histo[5], - histo[6], - histo[7], - histo[8], - histo[9], - histo[10]); - fprintf(stderr, - "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n", - histo[11], - histo[12], - histo[13], - histo[14], - histo[15], - histo[16], - histo[17], - histo[18], - histo[19], - histo[20]); - - histo = m_TsbPdDriftHisto100us; - fprintf(stderr, - " %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ", - histo[1], - histo[2], - histo[3], - histo[4], - histo[5], - histo[6], - histo[7], - histo[8], - histo[9], - histo[10]); - fprintf(stderr, - "%4d %4d %4d %4d %4d %4d %4d %4d %4d\n", - histo[11], - histo[12], - histo[13], - histo[14], - histo[15], - histo[16], - histo[17], - histo[18], - histo[19]); - - m_iTsbPdDriftNbSamples = 0; - } + // Updating TSBPD time here is not very accurate and prevents from making the function constant. + // For now preserving the existing behavior. + m_tsbpd.updateTsbPdTimeBase(usPktTimestamp); + return m_tsbpd.getPktTsbPdTime(usPktTimestamp); } -void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg) +void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay) { - fprintf(stderr, - "%s: tsbpd offset=%d drift=%d usec\n", - FormatTime(steady_clock::now()).c_str(), - tsbPdOffset, - tsbPdDriftAvg); - memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us)); - memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms)); + const bool no_wrap_check = false; + m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay); } -#endif /* SRT_DEBUG_TSBPD_DRIFT */ bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, - Mutex& mutex_to_lock, steady_clock::duration& w_udrift, steady_clock::time_point& w_newtimebase) { - if (!m_bTsbPdMode) // Not checked unless in TSBPD mode - return false; - /* - * TsbPD time drift correction - * TsbPD time slowly drift over long period depleting decoder buffer or raising latency - * Re-evaluate the time adjustment value using a receiver control packet (ACK-ACK). - * ACK-ACK timestamp is RTT/2 ago (in sender's time base) - * Data packet have origin time stamp which is older when retransmitted so not suitable for this. - * - * Every TSBPD_DRIFT_MAX_SAMPLES packets, the average drift is calculated - * if -TSBPD_DRIFT_MAX_VALUE < avgTsbPdDrift < TSBPD_DRIFT_MAX_VALUE uSec, pass drift value to RcvBuffer to adjust - * delevery time. if outside this range, adjust this->TsbPdTimeOffset and RcvBuffer->TsbPdTimeBase by - * +-TSBPD_DRIFT_MAX_VALUE uSec to maintain TsbPdDrift values in reasonable range (-5ms .. +5ms). - */ - - // Note important thing: this function is being called _EXCLUSIVELY_ in the handler - // of UMSG_ACKACK command reception. This means that the timestamp used here comes - // from the CONTROL domain, not DATA domain (timestamps from DATA domain may be - // either schedule time or a time supplied by the application). - - const steady_clock::duration iDrift = - steady_clock::now() - (getTsbPdTimeBase(timestamp_us) + microseconds_from(timestamp_us)); - - enterCS(mutex_to_lock); - - bool updated = m_DriftTracer.update(count_microseconds(iDrift)); - -#ifdef SRT_DEBUG_TSBPD_DRIFT - printDriftHistogram(count_microseconds(iDrift)); -#endif /* SRT_DEBUG_TSBPD_DRIFT */ - - if (updated) - { -#ifdef SRT_DEBUG_TSBPD_DRIFT - printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift()); -#endif /* SRT_DEBUG_TSBPD_DRIFT */ - -#if ENABLE_HEAVY_LOGGING - const steady_clock::time_point oldbase = m_tsTsbPdTimeBase; -#endif - steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift()); - m_tsTsbPdTimeBase += overdrift; - - HLOGC(brlog.Debug, - log << "DRIFT=" << FormatDuration(iDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0) - << "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift) - << " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase)); - } - else - { - HLOGC(brlog.Debug, - log << "DRIFT=" << FormatDuration(iDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase)); - } - - leaveCS(mutex_to_lock); - w_udrift = iDrift; - w_newtimebase = m_tsTsbPdTimeBase; - return updated; + return m_tsbpd.addDriftSample(timestamp_us, w_udrift, w_newtimebase); } int CRcvBuffer::readMsg(char* data, int len) @@ -2088,7 +1825,7 @@ bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playt bool empty = true; - if (m_bTsbPdMode) + if (m_tsbpd.isEnabled()) { w_passack = false; int seq = 0; diff --git a/srtcore/buffer.h b/srtcore/buffer.h index d998fd1b9..6a21df1ce 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -56,8 +56,8 @@ modified by #include "udt.h" #include "list.h" #include "queue.h" +#include "tsbpd_time.h" #include "utilities.h" -#include // The notation used for "circular numbers" in comments: // The "cicrular numbers" are numbers that when increased up to the @@ -392,14 +392,13 @@ class CRcvBuffer /// Set TimeStamp-Based Packet Delivery Rx Mode /// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay /// @param [in] delay aggreed TsbPD delay - /// @return 0 - int setRcvTsbPdMode(const time_point& timebase, const duration& delay); + void setRcvTsbPdMode(const time_point& timebase, const duration& delay); /// Add packet timestamp for drift caclculation and compensation /// @param [in] timestamp packet time stamp - /// @param [ref] lock Mutex that should be locked for the operation + /// @param [out] w_udrift current drift value + /// @param [out] w_newtimebase current TSBPD base time bool addRcvTsbPdDriftSample(uint32_t timestamp, - srt::sync::Mutex& mutex_to_lock, duration& w_udrift, time_point& w_newtimebase); @@ -463,19 +462,12 @@ class CRcvBuffer bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto); public: - /// Get packet delivery local time base (adjusted for wrap around) - /// (Exposed as used publicly in logs) - /// @param [in] timestamp packet timestamp (relative to peer StartTime), wrapping around every ~72 min - /// @return local delivery time (usec) - time_point getTsbPdTimeBase(uint32_t timestamp_us); - - int64_t getDrift() const { return m_DriftTracer.drift(); } + int64_t getDrift() const { return m_tsbpd.drift(); } public: int32_t getTopMsgno() const; - // @return Wrap check value - bool getInternalTimeBase(time_point& w_tb, duration& w_udrift); + void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift); void applyGroupTime(const time_point& timebase, bool wrapcheck, uint32_t delay, const duration& udrift); void applyGroupDrift(const time_point& timebase, bool wrapcheck, const duration& udrift); @@ -540,41 +532,9 @@ class CRcvBuffer int m_iAckedBytesCount; // Number of acknowledged payload bytes in the buffer unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation - bool m_bTsbPdMode; // true: apply TimeStamp-Based Rx Mode - duration m_tdTsbPdDelay; // aggreed delay - time_point m_tsTsbPdTimeBase; // localtime base for TsbPd mode - // Note: m_tsTsbPdTimeBase cumulates values from: - // 1. Initial SRT_CMD_HSREQ packet returned value diff to current time: - // == (NOW - PACKET_TIMESTAMP), at the time of HSREQ reception - // 2. Timestamp overflow (@c CRcvBuffer::getTsbPdTimeBase), when overflow on packet detected - // += CPacket::MAX_TIMESTAMP+1 (it's a hex round value, usually 0x1*e8). - // 3. Time drift (CRcvBuffer::addRcvTsbPdDriftSample, executed exclusively - // from UMSG_ACKACK handler). This is updated with (positive or negative) TSBPD_DRIFT_MAX_VALUE - // once the value of average drift exceeds this value in whatever direction. - // += (+/-)CRcvBuffer::TSBPD_DRIFT_MAX_VALUE - // - // XXX Application-supplied timestamps won't work therefore. This requires separate - // calculation of all these things above. - - bool m_bTsbPdWrapCheck; // true: check packet time stamp wrap around - static const uint32_t TSBPD_WRAP_PERIOD = (30 * 1000000); // 30 seconds (in usec) - - /// Max drift (usec) above which TsbPD Time Offset is adjusted - static const int TSBPD_DRIFT_MAX_VALUE = 5000; - /// Number of samples (UMSG_ACKACK packets) to perform drift caclulation and compensation - static const int TSBPD_DRIFT_MAX_SAMPLES = 1000; - DriftTracer m_DriftTracer; - AvgBufSize m_mavg; -#ifdef SRT_DEBUG_TSBPD_DRIFT - int m_TsbPdDriftHisto100us[22]; // Histogram of 100us TsbPD drift (-1.0 .. +1.0 ms in 0.1ms increment) - int m_TsbPdDriftHisto1ms[22]; // Histogram of TsbPD drift (-10.0 .. +10.0 ms, in 1.0 ms increment) - int m_iTsbPdDriftNbSamples = 0; // Number of samples in sum and histogram - static const int TSBPD_DRIFT_PRT_SAMPLES = 200; // Number of samples (UMSG_ACKACK packets) to print hostogram -#endif /* SRT_DEBUG_TSBPD_DRIFT */ + srt::CTsbpdTime m_tsbpd; -#ifdef SRT_DEBUG_TSBPD_OUTJITTER - unsigned long m_ulPdHisto[4][10]; -#endif /* SRT_DEBUG_TSBPD_OUTJITTER */ + AvgBufSize m_mavg; private: CRcvBuffer(); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index cefb716c5..86c491ee0 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8110,7 +8110,7 @@ void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival { steady_clock::duration udrift(0); steady_clock::time_point newtimebase; - const bool drift_updated ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), m_RecvLock, + const bool drift_updated ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), (udrift), (newtimebase)); #if ENABLE_EXPERIMENTAL_BONDING if (drift_updated && m_parent->m_GroupOf) diff --git a/srtcore/filelist.maf b/srtcore/filelist.maf index 4064c265f..400541499 100644 --- a/srtcore/filelist.maf +++ b/srtcore/filelist.maf @@ -19,12 +19,13 @@ packet.cpp packetfilter.cpp queue.cpp congctl.cpp -srt_c_api.cpp -window.cpp socketconfig.cpp +srt_c_api.cpp srt_compat.c strerror_defs.cpp sync.cpp +tsbpd_time.cpp +window.cpp SOURCES - ENABLE_EXPERIMENTAL_BONDING group.cpp @@ -69,6 +70,7 @@ congctl.h socketconfig.h srt_compat.h threadname.h +tsbpd_time.h utilities.h window.h diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 032ac1580..f15e6ac57 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -48,7 +48,7 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof, if (!master) return false; - w_wp = master->m_pRcvBuffer->getInternalTimeBase((w_tb), (w_dr)); + master->m_pRcvBuffer->getInternalTimeBase((w_tb), (w_wp), (w_dr)); // Sanity check if (is_zero(w_tb)) @@ -2734,7 +2734,8 @@ void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady steady_clock::time_point this_timebase; steady_clock::duration this_udrift(0); - bool wrp = gi->ps->m_pUDT->m_pRcvBuffer->getInternalTimeBase((this_timebase), (this_udrift)); + bool wrp = false; + gi->ps->m_pUDT->m_pRcvBuffer->getInternalTimeBase((this_timebase), (wrp), (this_udrift)); udrift = std::min(udrift, this_udrift); steady_clock::time_point new_newtimebase = std::min(newtimebase, this_timebase); diff --git a/srtcore/tsbpd_time.cpp b/srtcore/tsbpd_time.cpp new file mode 100644 index 000000000..0fe07fb7c --- /dev/null +++ b/srtcore/tsbpd_time.cpp @@ -0,0 +1,164 @@ +/* + * SRT - Secure, Reliable, Transport + * Copyright (c) 2021 Haivision Systems Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + */ +#include "tsbpd_time.h" + +#include "logging.h" +#include "logger_defs.h" +#include "packet.h" + +using namespace srt_logging; +using namespace srt::sync; + +namespace srt +{ + +bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, + steady_clock::duration& w_udrift, + steady_clock::time_point& w_newtimebase) +{ + if (!m_bTsbPdMode) + return false; + + const time_point tsNow = steady_clock::now(); + + ScopedLock lck(m_mtxRW); + const steady_clock::duration tdDrift = tsNow - getPktTsbPdBaseTime(usPktTimestamp); + + const bool updated = m_DriftTracer.update(count_microseconds(tdDrift)); + + if (updated) + { + IF_HEAVY_LOGGING(const steady_clock::time_point oldbase = m_tsTsbPdTimeBase); + steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift()); + m_tsTsbPdTimeBase += overdrift; + + HLOGC(brlog.Debug, + log << "DRIFT=" << FormatDuration(tdDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0) + << "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift) + << " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase)); + } + else + { + HLOGC(brlog.Debug, + log << "DRIFT=" << FormatDuration(tdDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase)); + } + + w_udrift = tdDrift; + w_newtimebase = m_tsTsbPdTimeBase; + + return updated; +} + +void CTsbpdTime::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay) +{ + m_bTsbPdMode = true; + m_bTsbPdWrapCheck = wrap; + + // Timebase passed here comes is calculated as: + // Tnow - hspkt.m_iTimeStamp + // where hspkt is the packet with SRT_CMD_HSREQ message. + // + // This function is called in the HSREQ reception handler only. + m_tsTsbPdTimeBase = timebase; + m_tdTsbPdDelay = delay; +} + +void CTsbpdTime::applyGroupTime(const steady_clock::time_point& timebase, + bool wrp, + uint32_t delay, + const steady_clock::duration& udrift) +{ + // Same as setTsbPdMode, but predicted to be used for group members. + // This synchronizes the time from the INTERNAL TIMEBASE of an existing + // socket's internal timebase. This is required because the initial time + // base stays always the same, whereas the internal timebase undergoes + // adjustment as the 32-bit timestamps in the sockets wrap. The socket + // newly added to the group must get EXACTLY the same internal timebase + // or otherwise the TsbPd time calculation will ship different results + // on different member sockets. + + m_bTsbPdMode = true; + + m_tsTsbPdTimeBase = timebase; + m_bTsbPdWrapCheck = wrp; + m_tdTsbPdDelay = microseconds_from(delay); + m_DriftTracer.forceDrift(count_microseconds(udrift)); +} + +void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase, + bool wrp, + const steady_clock::duration& udrift) +{ + // This is only when a drift was updated on one of the group members. + HLOGC(brlog.Debug, + log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift) + << " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase)); + + m_tsTsbPdTimeBase = timebase; + m_bTsbPdWrapCheck = wrp; + + m_DriftTracer.forceDrift(count_microseconds(udrift)); +} + +CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const +{ + const uint64_t carryover_us = + (m_bTsbPdWrapCheck && timestamp_us < TSBPD_WRAP_PERIOD) ? uint64_t(CPacket::MAX_TIMESTAMP) + 1 : 0; + + return (m_tsTsbPdTimeBase + microseconds_from(carryover_us)); +} + +CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const +{ + return getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); +} + +CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const +{ + return getTsbPdTimeBase(usPktTimestamp) + microseconds_from(usPktTimestamp); +} + +void CTsbpdTime::updateTsbPdTimeBase(uint32_t usPktTimestamp) +{ + if (m_bTsbPdWrapCheck) + { + // Wrap check period. + if ((usPktTimestamp >= TSBPD_WRAP_PERIOD) && (usPktTimestamp <= (TSBPD_WRAP_PERIOD * 2))) + { + /* Exiting wrap check period (if for packet delivery head) */ + m_bTsbPdWrapCheck = false; + m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1); + LOGC(tslog.Debug, + log << "tsbpd wrap period ends with ts=" << usPktTimestamp << " - NEW TIME BASE: " + << FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us"); + } + return; + } + + // Check if timestamp is in the last 30 seconds before reaching the MAX_TIMESTAMP. + if (usPktTimestamp > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD)) + { + // Approching wrap around point, start wrap check period (if for packet delivery head) + m_bTsbPdWrapCheck = true; + LOGC(tslog.Debug, + log << "tsbpd wrap period begins with ts=" << usPktTimestamp << " drift: " << m_DriftTracer.drift() + << "us."); + } +} + +void CTsbpdTime::getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const +{ + ScopedLock lck(m_mtxRW); + w_tb = m_tsTsbPdTimeBase; + w_udrift = microseconds_from(m_DriftTracer.drift()); + w_wrp = m_bTsbPdWrapCheck; +} + +} // namespace srt diff --git a/srtcore/tsbpd_time.h b/srtcore/tsbpd_time.h new file mode 100644 index 000000000..5dcc7ff3a --- /dev/null +++ b/srtcore/tsbpd_time.h @@ -0,0 +1,161 @@ +/* + * SRT - Secure, Reliable, Transport + * Copyright (c) 2021 Haivision Systems Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + */ + +#ifndef INC_SRT_TSBPD_TIME_H +#define INC_SRT_TSBPD_TIME_H + +#include "platform_sys.h" +#include "sync.h" +#include "utilities.h" + +namespace srt +{ + +/// @brief TimeStamp-Based Packet Delivery Mode (TSBPD) time conversion logic. +/// Used by the receiver to calculate delivery time of data packets. +/// See SRT RFC Section "Timestamp-Based Packet Delivery". +class CTsbpdTime +{ + typedef srt::sync::steady_clock steady_clock; + typedef steady_clock::time_point time_point; + typedef steady_clock::duration duration; + typedef srt::sync::Mutex Mutex; + +public: + CTsbpdTime() + : m_bTsbPdMode(false) + , m_tdTsbPdDelay(0) + , m_bTsbPdWrapCheck(false) + { + } + + /// Set TimeStamp-Based Packet Delivery Mode (receiver). + /// @param [in] timebase local time base (uSec) of packet time stamps including buffering delay. + /// @param [in] wrap wrapping period. + /// @param [in] delay negotiated TsbPD delay (buffering latency). + void setTsbPdMode(const time_point& timebase, bool wrap, duration delay); + + /// @brief Check if TSBPD logic is enabled. + /// @return true if TSBPD is enabled. + bool isEnabled() const { return m_bTsbPdMode; } + + /// @brief Apply new state derived from other members of a socket group. + /// @param timebase TSBPD base time. + /// @param wrp wrap period (enabled or not). + /// @param delay TSBPD delay. + /// @param udrift clock drift. + void applyGroupTime(const time_point& timebase, bool wrp, uint32_t delay, const duration& udrift); + + /// @brief Apply new clock state (TSBPD base and drift) derived from other members of a socket group. + /// @param timebase TSBPD base time. + /// @param wrp state of the wrapping period (enabled or disabled). + /// @param udrift clock drift. + void applyGroupDrift(const time_point& timebase, bool wrp, const duration& udrift); + + /// @brief Add new drift sample from an ACK-ACKACK pair. + /// ACKACK packets are sent immediately (except for UDP buffering). + /// + /// @param [in] pktTimestamp Timestamp of the arrived ACKACK packet. + /// @param [out] w_udrift Current clock drift value. + /// @param [out] w_newtimebase Current TSBPD base time. + /// + /// @return true if TSBPD base time has changed, false otherwise. + bool addDriftSample(uint32_t pktTimestamp, + steady_clock::duration& w_udrift, + steady_clock::time_point& w_newtimebase); + + /// @brief Handle timestamp of data packet when 32-bit integer carryover is about to happen. + /// When packet timestamp approaches CPacket::MAX_TIMESTAMP, the TSBPD base time should be + /// shifted accordingly to correctly handle new packets with timestamps starting from zero. + /// @param usPktTimestamp timestamp field value of a data packet. + void updateTsbPdTimeBase(uint32_t usPktTimestamp); + + /// @brief Get TSBPD base time adjusted for carryover, which occurs when + /// a packet's timestamp exceeds the UINT32_MAX and continues from zero. + /// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds). + /// + /// @return TSBPD base time for a provided packet timestamp. + time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const; + + /// @brief Get packet TSBPD time without buffering delay and clock drift, which is + /// the target time for delivering the packet to an upstream application. + /// Essentially: getTsbPdTimeBase(usPktTimestamp) + usPktTimestamp + /// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds). + /// + /// @return Packet TSBPD base time without buffering delay. + time_point getPktTsbPdBaseTime(uint32_t usPktTimestamp) const; + + /// @brief Get packet TSBPD time with buffering delay and clock drift, which is + /// the target time for delivering the packet to an upstream application + /// (including drift and carryover effects, if any). + /// Essentially: getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + drift() + /// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds). + /// + /// @return Packet TSBPD time with buffering delay. + time_point getPktTsbPdTime(uint32_t usPktTimestamp) const; + + /// @brief Get current drift value. + /// @return current drift value. + int64_t drift() const { return m_DriftTracer.drift(); } + + /// @brief Get current overdrift value. + /// @return current overdrift value. + int64_t overdrift() const { return m_DriftTracer.overdrift(); } + + /// @brief Get internal state to apply to another member of a socket group. + /// @param w_tb TsbPd base time. + /// @param w_udrift drift value. + /// @param w_wrp wrap check. + void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const; + +private: + bool m_bTsbPdMode; //< Rreceiver buffering and TSBPD is active when true. + duration m_tdTsbPdDelay; //< Negotiated buffering delay. + + /// @brief Local time base for TsbPd. + /// @note m_tsTsbPdTimeBase is changed in the following cases: + /// 1. Initialized upon SRT_CMD_HSREQ packet as the difference with the current time: + /// = (NOW - PACKET_TIMESTAMP), at the time of HSREQ reception. + /// 2. Shifted forward on timestamp overflow (@see CTsbpdTime::updateTsbPdTimeBase), when overflow + /// of the timestamp field value of a data packet is detected. + /// += CPacket::MAX_TIMESTAMP + 1 + /// 3. Clock drift (@see CTsbpdTime::addDriftSample, executed exclusively + /// from ACKACK handler). This is updated with (positive or negative) TSBPD_DRIFT_MAX_VALUE + /// once the value of average drift exceeds this value in either direction. + /// += (+/-)TSBPD_DRIFT_MAX_VALUE + /// + /// @note The TSBPD base time is expected to hold the following condition: + /// (PACKET_TIMESTAMP + m_tsTsbPdTimeBase + drift) == NOW. + /// Then it can be used to estimate the origin time of a data packet, and calculate its delivery time + /// with buffering delay applied. + time_point m_tsTsbPdTimeBase; + + /// @note Packet timestamps wrap around every 01h11m35s (32-bit in usec). + /// A wrap check period starts 30 seconds (TSBPD_WRAP_PERIOD) before the wrap point. + /// During the wrap check period, packet timestamps smaller than 30 seconds + /// are considered to have been wrapped around. + /// The wrap check period ends 30 seconds after the wrap point, + /// after which the TSBPD base time is adjusted. + bool m_bTsbPdWrapCheck; // true: check packet time stamp wraparound (overflow). + static const uint32_t TSBPD_WRAP_PERIOD = (30 * 1000000); // 30 seconds (in usec) for timestamp wrapping period. + + /// Maximum clock drift (microseconds) above which TsbPD base time is already adjusted. + static const int TSBPD_DRIFT_MAX_VALUE = 5000; + /// Number of samples (ACKACK packets) on which to perform drift calculation and compensation. + static const int TSBPD_DRIFT_MAX_SAMPLES = 1000; + DriftTracer m_DriftTracer; + + /// Protect simultaneous change of state (read/write). + mutable Mutex m_mtxRW; +}; + +} // namespace srt + +#endif // INC_SRT_TSBPD_TIME_H