Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSBPD logic extracted from CRcvBuffer. #1968

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 19 additions & 282 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,6 @@ CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts)
, m_iAckedPktsCount(0)
, m_iAckedBytesCount(0)
, m_uAvgPayloadSz(7 * 188)
, m_bTsbPdMode(false)
, m_tdTsbPdDelay(0)
, m_bTsbPdWrapCheck(false)
{
m_pUnit = new CUnit*[m_iSize];
for (int i = 0; i < m_iSize; ++i)
Expand Down Expand Up @@ -909,7 +906,8 @@ int CRcvBuffer::readBuffer(char* data, int len)
int rs = len;
IF_HEAVY_LOGGING(char* begin = data);

const steady_clock::time_point now = (m_bTsbPdMode ? steady_clock::now() : steady_clock::time_point());
const bool bTsbPdEnabled = m_tsbpd.isEnabled();
const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point());

HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
Expand All @@ -922,7 +920,7 @@ int CRcvBuffer::readBuffer(char* data, int len)

const CPacket& pkt = m_pUnit[p]->m_Packet;

if (m_bTsbPdMode)
if (bTsbPdEnabled)
{
HLOGC(brlog.Debug,
log << CONID() << "readBuffer: chk if time2play:"
Expand Down Expand Up @@ -1427,7 +1425,7 @@ bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t&
{
w_tsbpdtime = steady_clock::time_point();

if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
const CPacket* pkt = getRcvReadyPacket(seqdistance);
if (!pkt)
Expand Down Expand Up @@ -1649,7 +1647,7 @@ void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now)
int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan)
{
timespan = 0;
if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
// Get a valid startpos.
// Skip invalid entries in the beginning, if any.
Expand Down Expand Up @@ -1731,306 +1729,45 @@ void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
m_pUnit[i]->m_iFlag = CUnit::DROPPED;
}

steady_clock::time_point CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us)
{
/*
* Packet timestamps wrap around every 01h11m35s (32-bit in usec)
* When added to the peer start time (base time),
* wrapped around timestamps don't provide a valid local packet delevery time.
*
* A wrap check period starts 30 seconds before the wrap point.
* In this period, timestamps smaller than 30 seconds are considered to have wrapped around (then adjusted).
* The wrap check period ends 30 seconds after the wrap point, afterwhich time base has been adjusted.
*/
int64_t carryover = 0;

// This function should generally return the timebase for the given timestamp_us.
// It's assumed that the timestamp_us, for which this function is being called,
// is received as monotonic clock. This function then traces the changes in the
// timestamps passed as argument and catches the moment when the 64-bit timebase
// should be increased by a "segment length" (MAX_TIMESTAMP+1).

// The checks will be provided for the following split:
// [INITIAL30][FOLLOWING30]....[LAST30] <-- == CPacket::MAX_TIMESTAMP
//
// The following actions should be taken:
// 1. Check if this is [LAST30]. If so, ENTER TSBPD-wrap-check state
// 2. Then, it should turn into [INITIAL30] at some point. If so, use carryover MAX+1.
// 3. Then it should switch to [FOLLOWING30]. If this is detected,
// - EXIT TSBPD-wrap-check state
// - save the carryover as the current time base.

if (m_bTsbPdWrapCheck)
{
// Wrap check period.

if (timestamp_us < TSBPD_WRAP_PERIOD)
{
carryover = int64_t(CPacket::MAX_TIMESTAMP) + 1;
}
// timestamp_us >= TSBPD_WRAP_PERIOD
else if (timestamp_us <= (TSBPD_WRAP_PERIOD * 2))
{
/* Exiting wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = false;
m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1);
LOGC(tslog.Debug,
log << "tsbpd wrap period ends with ts=" << timestamp_us << " - NEW TIME BASE: "
<< FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us");
}
}
// Check if timestamp_us is in the last 30 seconds before reaching the MAX_TIMESTAMP.
else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
{
/* Approching wrap around point, start wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = true;
LOGC(tslog.Debug,
log << "tsbpd wrap period begins with ts=" << timestamp_us << " drift: " << m_DriftTracer.drift()
<< "us.");
}

return (m_tsTsbPdTimeBase + microseconds_from(carryover));
}

void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase,
bool wrp,
uint32_t delay,
const steady_clock::duration& udrift)
{
// Same as setRcvTsbPdMode, but predicted to be used for group members.
// This synchronizes the time from the INTERNAL TIMEBASE of an existing
// socket's internal timebase. This is required because the initial time
// base stays always the same, whereas the internal timebase undergoes
// adjustment as the 32-bit timestamps in the sockets wrap. The socket
// newly added to the group must get EXACTLY the same internal timebase
// or otherwise the TsbPd time calculation will ship different results
// on different sockets.

m_bTsbPdMode = true;

m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;
m_tdTsbPdDelay = microseconds_from(delay);
m_DriftTracer.forceDrift(count_microseconds(udrift));
m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift);
}

void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase,
bool wrp,
const steady_clock::duration& udrift)
{
// This is only when a drift was updated on one of the group members.
HLOGC(brlog.Debug,
log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift)
<< " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase));

m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;

m_DriftTracer.forceDrift(count_microseconds(udrift));
m_tsbpd.applyGroupDrift(timebase, wrp, udrift);
}

bool CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, steady_clock::duration& w_udrift)
void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift)
{
w_timebase = m_tsTsbPdTimeBase;
w_udrift = microseconds_from(m_DriftTracer.drift());
return m_bTsbPdWrapCheck;
}

steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t timestamp)
{
const steady_clock::time_point time_base = getTsbPdTimeBase(timestamp);

// Display only ingredients, not the result, as the result will
// be displayed anyway in the next logs.
HLOGC(brlog.Debug,
log << "getPktTsbPdTime: TIMEBASE=" << FormatTime(time_base) << " + dTS=" << timestamp
<< "us + LATENCY=" << FormatDuration<DUNIT_MS>(m_tdTsbPdDelay) << " + uDRIFT=" << m_DriftTracer.drift());
return (time_base + m_tdTsbPdDelay + microseconds_from(timestamp + m_DriftTracer.drift()));
return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift);
}

int CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp)
{
m_bTsbPdMode = true;
m_bTsbPdWrapCheck = false;

// Timebase passed here comes is calculated as:
// >>> CTimer::getTime() - ctrlpkt->m_iTimeStamp
// where ctrlpkt is the packet with SRT_CMD_HSREQ message.
//
// This function is called in the HSREQ reception handler only.
m_tsTsbPdTimeBase = timebase;
// XXX Seems like this may not work correctly.
// At least this solution this way won't work with application-supplied
// timestamps. For that case the timestamps should be taken exclusively
// from the data packets because in case of application-supplied timestamps
// they come from completely different server and undergo different rules
// of network latency and drift.
m_tdTsbPdDelay = delay;
return 0;
}

#ifdef SRT_DEBUG_TSBPD_DRIFT
void CRcvBuffer::printDriftHistogram(int64_t iDrift)
{
/*
* Build histogram of drift values
* First line (ms): <=-10.0 -9.0 ... -1.0 - 0.0 + 1.0 ... 9.0 >=10.0
* Second line (ms): -0.9 ... -0.1 - 0.0 + 0.1 ... 0.9
* 0 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 1 0 0 0 0 0 0
* 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 0 0 0 0 0 0
*/
iDrift /= 100; // uSec to 100 uSec (0.1ms)
if (-10 < iDrift && iDrift < 10)
{
/* Fill 100us histogram -900 .. 900 us 100 us increments */
m_TsbPdDriftHisto100us[10 + iDrift]++;
}
else
{
/* Fill 1ms histogram <=-10.0, -9.0 .. 9.0, >=10.0 ms in 1 ms increments */
iDrift /= 10; // 100uSec to 1ms
if (-10 < iDrift && iDrift < 10)
m_TsbPdDriftHisto1ms[10 + iDrift]++;
else if (iDrift <= -10)
m_TsbPdDriftHisto1ms[0]++;
else
m_TsbPdDriftHisto1ms[20]++;
}
++m_iTsbPdDriftNbSamples;
if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0)
{
int* histo = m_TsbPdDriftHisto1ms;

fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[0],
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19],
histo[20]);

histo = m_TsbPdDriftHisto100us;
fprintf(stderr,
" %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19]);

m_iTsbPdDriftNbSamples = 0;
}
// Updating TSBPD time here is not very accurate and prevents from making the function constant.
// For now preserving the existing behavior.
m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
return m_tsbpd.getPktTsbPdTime(usPktTimestamp);
}

void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
{
fprintf(stderr,
"%s: tsbpd offset=%d drift=%d usec\n",
FormatTime(steady_clock::now()).c_str(),
tsbPdOffset,
tsbPdDriftAvg);
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
const bool no_wrap_check = false;
m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
}
#endif /* SRT_DEBUG_TSBPD_DRIFT */

bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us,
Mutex& mutex_to_lock,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
{
if (!m_bTsbPdMode) // Not checked unless in TSBPD mode
return false;
/*
* TsbPD time drift correction
* TsbPD time slowly drift over long period depleting decoder buffer or raising latency
* Re-evaluate the time adjustment value using a receiver control packet (ACK-ACK).
* ACK-ACK timestamp is RTT/2 ago (in sender's time base)
* Data packet have origin time stamp which is older when retransmitted so not suitable for this.
*
* Every TSBPD_DRIFT_MAX_SAMPLES packets, the average drift is calculated
* if -TSBPD_DRIFT_MAX_VALUE < avgTsbPdDrift < TSBPD_DRIFT_MAX_VALUE uSec, pass drift value to RcvBuffer to adjust
* delevery time. if outside this range, adjust this->TsbPdTimeOffset and RcvBuffer->TsbPdTimeBase by
* +-TSBPD_DRIFT_MAX_VALUE uSec to maintain TsbPdDrift values in reasonable range (-5ms .. +5ms).
*/

// Note important thing: this function is being called _EXCLUSIVELY_ in the handler
// of UMSG_ACKACK command reception. This means that the timestamp used here comes
// from the CONTROL domain, not DATA domain (timestamps from DATA domain may be
// either schedule time or a time supplied by the application).

const steady_clock::duration iDrift =
steady_clock::now() - (getTsbPdTimeBase(timestamp_us) + microseconds_from(timestamp_us));

enterCS(mutex_to_lock);

bool updated = m_DriftTracer.update(count_microseconds(iDrift));

#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftHistogram(count_microseconds(iDrift));
#endif /* SRT_DEBUG_TSBPD_DRIFT */

if (updated)
{
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif /* SRT_DEBUG_TSBPD_DRIFT */

#if ENABLE_HEAVY_LOGGING
const steady_clock::time_point oldbase = m_tsTsbPdTimeBase;
#endif
steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift());
m_tsTsbPdTimeBase += overdrift;

HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0)
<< "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift)
<< " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase));
}
else
{
HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
}

leaveCS(mutex_to_lock);
w_udrift = iDrift;
w_newtimebase = m_tsTsbPdTimeBase;
return updated;
return m_tsbpd.addDriftSample(timestamp_us, w_udrift, w_newtimebase);
}

int CRcvBuffer::readMsg(char* data, int len)
Expand Down Expand Up @@ -2088,7 +1825,7 @@ bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playt

bool empty = true;

if (m_bTsbPdMode)
if (m_tsbpd.isEnabled())
{
w_passack = false;
int seq = 0;
Expand Down
Loading