Skip to content

Commit

Permalink
[core] TSBPD logic extracted from CRcvBuffer. (#1968)
Browse files Browse the repository at this point in the history
Refactoring, no functional changes are expected.
New CTsbpdTime class to operate with TSBPD timing.
  • Loading branch information
maxsharabayko authored Apr 29, 2021
1 parent 61dda69 commit 9848d68
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 335 deletions.
301 changes: 19 additions & 282 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,6 @@ CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts)
, m_iAckedPktsCount(0)
, m_iAckedBytesCount(0)
, m_uAvgPayloadSz(7 * 188)
, m_bTsbPdMode(false)
, m_tdTsbPdDelay(0)
, m_bTsbPdWrapCheck(false)
{
m_pUnit = new CUnit*[m_iSize];
for (int i = 0; i < m_iSize; ++i)
Expand Down Expand Up @@ -909,7 +906,8 @@ int CRcvBuffer::readBuffer(char* data, int len)
int rs = len;
IF_HEAVY_LOGGING(char* begin = data);

const steady_clock::time_point now = (m_bTsbPdMode ? steady_clock::now() : steady_clock::time_point());
const bool bTsbPdEnabled = m_tsbpd.isEnabled();
const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point());

HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
Expand All @@ -922,7 +920,7 @@ int CRcvBuffer::readBuffer(char* data, int len)

const CPacket& pkt = m_pUnit[p]->m_Packet;

if (m_bTsbPdMode)
if (bTsbPdEnabled)
{
HLOGC(brlog.Debug,
log << CONID() << "readBuffer: chk if time2play:"
Expand Down Expand Up @@ -1427,7 +1425,7 @@ bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t&
{
w_tsbpdtime = steady_clock::time_point();

if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
const CPacket* pkt = getRcvReadyPacket(seqdistance);
if (!pkt)
Expand Down Expand Up @@ -1649,7 +1647,7 @@ void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now)
int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan)
{
timespan = 0;
if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
// Get a valid startpos.
// Skip invalid entries in the beginning, if any.
Expand Down Expand Up @@ -1731,306 +1729,45 @@ void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
m_pUnit[i]->m_iFlag = CUnit::DROPPED;
}

steady_clock::time_point CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us)
{
/*
* Packet timestamps wrap around every 01h11m35s (32-bit in usec)
* When added to the peer start time (base time),
* wrapped around timestamps don't provide a valid local packet delevery time.
*
* A wrap check period starts 30 seconds before the wrap point.
* In this period, timestamps smaller than 30 seconds are considered to have wrapped around (then adjusted).
* The wrap check period ends 30 seconds after the wrap point, afterwhich time base has been adjusted.
*/
int64_t carryover = 0;

// This function should generally return the timebase for the given timestamp_us.
// It's assumed that the timestamp_us, for which this function is being called,
// is received as monotonic clock. This function then traces the changes in the
// timestamps passed as argument and catches the moment when the 64-bit timebase
// should be increased by a "segment length" (MAX_TIMESTAMP+1).

// The checks will be provided for the following split:
// [INITIAL30][FOLLOWING30]....[LAST30] <-- == CPacket::MAX_TIMESTAMP
//
// The following actions should be taken:
// 1. Check if this is [LAST30]. If so, ENTER TSBPD-wrap-check state
// 2. Then, it should turn into [INITIAL30] at some point. If so, use carryover MAX+1.
// 3. Then it should switch to [FOLLOWING30]. If this is detected,
// - EXIT TSBPD-wrap-check state
// - save the carryover as the current time base.

if (m_bTsbPdWrapCheck)
{
// Wrap check period.

if (timestamp_us < TSBPD_WRAP_PERIOD)
{
carryover = int64_t(CPacket::MAX_TIMESTAMP) + 1;
}
// timestamp_us >= TSBPD_WRAP_PERIOD
else if (timestamp_us <= (TSBPD_WRAP_PERIOD * 2))
{
/* Exiting wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = false;
m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1);
LOGC(tslog.Debug,
log << "tsbpd wrap period ends with ts=" << timestamp_us << " - NEW TIME BASE: "
<< FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us");
}
}
// Check if timestamp_us is in the last 30 seconds before reaching the MAX_TIMESTAMP.
else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
{
/* Approching wrap around point, start wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = true;
LOGC(tslog.Debug,
log << "tsbpd wrap period begins with ts=" << timestamp_us << " drift: " << m_DriftTracer.drift()
<< "us.");
}

return (m_tsTsbPdTimeBase + microseconds_from(carryover));
}

void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase,
bool wrp,
uint32_t delay,
const steady_clock::duration& udrift)
{
// Same as setRcvTsbPdMode, but predicted to be used for group members.
// This synchronizes the time from the INTERNAL TIMEBASE of an existing
// socket's internal timebase. This is required because the initial time
// base stays always the same, whereas the internal timebase undergoes
// adjustment as the 32-bit timestamps in the sockets wrap. The socket
// newly added to the group must get EXACTLY the same internal timebase
// or otherwise the TsbPd time calculation will ship different results
// on different sockets.

m_bTsbPdMode = true;

m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;
m_tdTsbPdDelay = microseconds_from(delay);
m_DriftTracer.forceDrift(count_microseconds(udrift));
m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift);
}

void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase,
bool wrp,
const steady_clock::duration& udrift)
{
// This is only when a drift was updated on one of the group members.
HLOGC(brlog.Debug,
log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift)
<< " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase));

m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;

m_DriftTracer.forceDrift(count_microseconds(udrift));
m_tsbpd.applyGroupDrift(timebase, wrp, udrift);
}

bool CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, steady_clock::duration& w_udrift)
void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift)
{
w_timebase = m_tsTsbPdTimeBase;
w_udrift = microseconds_from(m_DriftTracer.drift());
return m_bTsbPdWrapCheck;
}

steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t timestamp)
{
const steady_clock::time_point time_base = getTsbPdTimeBase(timestamp);

// Display only ingredients, not the result, as the result will
// be displayed anyway in the next logs.
HLOGC(brlog.Debug,
log << "getPktTsbPdTime: TIMEBASE=" << FormatTime(time_base) << " + dTS=" << timestamp
<< "us + LATENCY=" << FormatDuration<DUNIT_MS>(m_tdTsbPdDelay) << " + uDRIFT=" << m_DriftTracer.drift());
return (time_base + m_tdTsbPdDelay + microseconds_from(timestamp + m_DriftTracer.drift()));
return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift);
}

int CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp)
{
m_bTsbPdMode = true;
m_bTsbPdWrapCheck = false;

// Timebase passed here comes is calculated as:
// >>> CTimer::getTime() - ctrlpkt->m_iTimeStamp
// where ctrlpkt is the packet with SRT_CMD_HSREQ message.
//
// This function is called in the HSREQ reception handler only.
m_tsTsbPdTimeBase = timebase;
// XXX Seems like this may not work correctly.
// At least this solution this way won't work with application-supplied
// timestamps. For that case the timestamps should be taken exclusively
// from the data packets because in case of application-supplied timestamps
// they come from completely different server and undergo different rules
// of network latency and drift.
m_tdTsbPdDelay = delay;
return 0;
}

#ifdef SRT_DEBUG_TSBPD_DRIFT
void CRcvBuffer::printDriftHistogram(int64_t iDrift)
{
/*
* Build histogram of drift values
* First line (ms): <=-10.0 -9.0 ... -1.0 - 0.0 + 1.0 ... 9.0 >=10.0
* Second line (ms): -0.9 ... -0.1 - 0.0 + 0.1 ... 0.9
* 0 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 1 0 0 0 0 0 0
* 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 0 0 0 0 0 0
*/
iDrift /= 100; // uSec to 100 uSec (0.1ms)
if (-10 < iDrift && iDrift < 10)
{
/* Fill 100us histogram -900 .. 900 us 100 us increments */
m_TsbPdDriftHisto100us[10 + iDrift]++;
}
else
{
/* Fill 1ms histogram <=-10.0, -9.0 .. 9.0, >=10.0 ms in 1 ms increments */
iDrift /= 10; // 100uSec to 1ms
if (-10 < iDrift && iDrift < 10)
m_TsbPdDriftHisto1ms[10 + iDrift]++;
else if (iDrift <= -10)
m_TsbPdDriftHisto1ms[0]++;
else
m_TsbPdDriftHisto1ms[20]++;
}
++m_iTsbPdDriftNbSamples;
if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0)
{
int* histo = m_TsbPdDriftHisto1ms;

fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[0],
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19],
histo[20]);

histo = m_TsbPdDriftHisto100us;
fprintf(stderr,
" %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19]);

m_iTsbPdDriftNbSamples = 0;
}
// Updating TSBPD time here is not very accurate and prevents from making the function constant.
// For now preserving the existing behavior.
m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
return m_tsbpd.getPktTsbPdTime(usPktTimestamp);
}

void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
{
fprintf(stderr,
"%s: tsbpd offset=%d drift=%d usec\n",
FormatTime(steady_clock::now()).c_str(),
tsbPdOffset,
tsbPdDriftAvg);
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
const bool no_wrap_check = false;
m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
}
#endif /* SRT_DEBUG_TSBPD_DRIFT */

bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us,
Mutex& mutex_to_lock,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
{
if (!m_bTsbPdMode) // Not checked unless in TSBPD mode
return false;
/*
* TsbPD time drift correction
* TsbPD time slowly drift over long period depleting decoder buffer or raising latency
* Re-evaluate the time adjustment value using a receiver control packet (ACK-ACK).
* ACK-ACK timestamp is RTT/2 ago (in sender's time base)
* Data packet have origin time stamp which is older when retransmitted so not suitable for this.
*
* Every TSBPD_DRIFT_MAX_SAMPLES packets, the average drift is calculated
* if -TSBPD_DRIFT_MAX_VALUE < avgTsbPdDrift < TSBPD_DRIFT_MAX_VALUE uSec, pass drift value to RcvBuffer to adjust
* delevery time. if outside this range, adjust this->TsbPdTimeOffset and RcvBuffer->TsbPdTimeBase by
* +-TSBPD_DRIFT_MAX_VALUE uSec to maintain TsbPdDrift values in reasonable range (-5ms .. +5ms).
*/

// Note important thing: this function is being called _EXCLUSIVELY_ in the handler
// of UMSG_ACKACK command reception. This means that the timestamp used here comes
// from the CONTROL domain, not DATA domain (timestamps from DATA domain may be
// either schedule time or a time supplied by the application).

const steady_clock::duration iDrift =
steady_clock::now() - (getTsbPdTimeBase(timestamp_us) + microseconds_from(timestamp_us));

enterCS(mutex_to_lock);

bool updated = m_DriftTracer.update(count_microseconds(iDrift));

#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftHistogram(count_microseconds(iDrift));
#endif /* SRT_DEBUG_TSBPD_DRIFT */

if (updated)
{
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif /* SRT_DEBUG_TSBPD_DRIFT */

#if ENABLE_HEAVY_LOGGING
const steady_clock::time_point oldbase = m_tsTsbPdTimeBase;
#endif
steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift());
m_tsTsbPdTimeBase += overdrift;

HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0)
<< "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift)
<< " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase));
}
else
{
HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
}

leaveCS(mutex_to_lock);
w_udrift = iDrift;
w_newtimebase = m_tsTsbPdTimeBase;
return updated;
return m_tsbpd.addDriftSample(timestamp_us, w_udrift, w_newtimebase);
}

int CRcvBuffer::readMsg(char* data, int len)
Expand Down Expand Up @@ -2088,7 +1825,7 @@ bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playt

bool empty = true;

if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
w_passack = false;
int seq = 0;
Expand Down
Loading

0 comments on commit 9848d68

Please sign in to comment.