Skip to content

Commit

Permalink
[core] Update TSBPD base time and clock drift on an idle connection. (#…
Browse files Browse the repository at this point in the history
…2408)

Only with the new receiver buffer.
  • Loading branch information
maxsharabayko authored Aug 17, 2022
1 parent 286b3aa commit eae2749
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 24 deletions.
4 changes: 2 additions & 2 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1886,9 +1886,9 @@ void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const
m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
}

bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt)
bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, const time_point& tsPktArrival, int rtt)
{
return m_tsbpd.addDriftSample(timestamp_us, rtt);
return m_tsbpd.addDriftSample(timestamp_us, tsPktArrival, rtt);
}

int CRcvBuffer::readMsg(char* data, int len)
Expand Down
3 changes: 2 additions & 1 deletion srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,9 @@ class CRcvBuffer

/// Add packet timestamp for drift caclculation and compensation
/// @param [in] timestamp packet time stamp
/// @param [in] tsPktArrival arrival time of the packet used to extract the drift sample.
/// @param [in] rtt RTT sample
bool addRcvTsbPdDriftSample(uint32_t timestamp, int rtt);
bool addRcvTsbPdDriftSample(uint32_t timestamp, const time_point& tsPktArrival, int rtt);

#ifdef SRT_DEBUG_TSBPD_DRIFT
void printDriftHistogram(int64_t iDrift);
Expand Down
4 changes: 2 additions & 2 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,9 @@ int CRcvBufferNew::scanNotInOrderMessageLeft(const int startPos, int msgNo) cons
return -1;
}

bool CRcvBufferNew::addRcvTsbPdDriftSample(uint32_t usTimestamp, int usRTTSample)
bool CRcvBufferNew::addRcvTsbPdDriftSample(uint32_t usTimestamp, const time_point& tsPktArrival, int usRTTSample)
{
return m_tsbpd.addDriftSample(usTimestamp, usRTTSample);
return m_tsbpd.addDriftSample(usTimestamp, tsPktArrival, usRTTSample);
}

void CRcvBufferNew::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class CRcvBufferNew

void applyGroupDrift(const time_point& timebase, bool wrp, const duration& udrift);

bool addRcvTsbPdDriftSample(uint32_t usTimestamp, int usRTTSample);
bool addRcvTsbPdDriftSample(uint32_t usTimestamp, const time_point& tsPktArrival, int usRTTSample);

time_point getPktTsbPdTime(uint32_t usPktTimestamp) const;

Expand Down
15 changes: 11 additions & 4 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8628,7 +8628,7 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), rtt);
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
Expand Down Expand Up @@ -9039,7 +9039,7 @@ void srt::CUDT::processCtrl(const CPacket &ctrlpkt)
break;

case UMSG_KEEPALIVE: // 001 - Keep-alive
handleKeepalive(ctrlpkt.m_pcData, ctrlpkt.getLength());
processKeepalive(ctrlpkt, currtime);
break;

case UMSG_HANDSHAKE: // 000 - Handshake
Expand Down Expand Up @@ -11730,7 +11730,7 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak
return true;
}

void srt::CUDT::handleKeepalive(const char* /*data*/, size_t /*size*/)
void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival)
{
// Here can be handled some protocol definition
// for extra data sent through keepalive.
Expand All @@ -11750,8 +11750,15 @@ void srt::CUDT::handleKeepalive(const char* /*data*/, size_t /*size*/)
// Whether anything is to be done with this socket
// about the fact that keepalive arrived, let the
// group handle it
pg->handleKeepalive(m_parent->m_GroupMemberData);
pg->processKeepalive(m_parent->m_GroupMemberData);
}
}
#endif

#if ENABLE_NEW_RCVBUFFER
ScopedLock lck(m_RcvBufferLock);
m_pRcvBuffer->updateTsbPdTimeBase(ctrlpkt.getMsgTimeStamp());
if (m_config.bDriftTracer)
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, -1);
#endif
}
10 changes: 5 additions & 5 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -828,13 +828,13 @@ class CUDT
duration m_tdNAKInterval; // NAK interval
SRT_ATTR_GUARDED_BY(m_RecvAckLock)
atomic_time_point m_tsLastRspTime; // Timestamp of last response from the peer
time_point m_tsLastRspAckTime; // Timestamp of last ACK from the peer
time_point m_tsLastRspAckTime; // (SND) Timestamp of last ACK from the peer
atomic_time_point m_tsLastSndTime; // Timestamp of last data/ctrl sent (in system ticks)
time_point m_tsLastWarningTime; // Last time that a warning message is sent
atomic_time_point m_tsLastReqTime; // last time when a connection request is sent
time_point m_tsRcvPeerStartTime;
time_point m_tsLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer)
time_point m_tsLastAckTime; // Timestamp of last ACK
time_point m_tsLastAckTime; // (RCV) Timestamp of last ACK
duration m_tdMinNakInterval; // NAK timeout lower bound; too small value can cause unnecessary retransmission
duration m_tdMinExpInterval; // Timeout lower bound threshold: too small timeout can cause problem

Expand Down Expand Up @@ -914,9 +914,9 @@ class CUDT
int32_t m_iDebugPrevLastAck;
#endif
int32_t m_iRcvLastSkipAck; // Last dropped sequence ACK
int32_t m_iRcvLastAckAck; // Last sent ACK that has been acknowledged
int32_t m_iRcvLastAckAck; // (RCV) Latest packet seqno in a sent ACK acknowledged by ACKACK. RcvQTh (sendCtrlAck {r}, processCtrlAckAck {r}, processCtrlAck {r}, connection {w}).
int32_t m_iAckSeqNo; // Last ACK sequence number
sync::atomic<int32_t> m_iRcvCurrSeqNo; // Largest received sequence number
sync::atomic<int32_t> m_iRcvCurrSeqNo; // (RCV) Largest received sequence number. RcvQTh, TSBPDTh.
int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter)

int32_t m_iPeerISN; // Initial Sequence Number of the peer side
Expand Down Expand Up @@ -1086,7 +1086,7 @@ class CUDT
void dropToGroupRecvBase();
#endif

void handleKeepalive(const char* data, size_t lenghth);
void processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival);

/// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.
SRT_ATTR_EXCLUDES(m_RcvBufferLock)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4460,7 +4460,7 @@ void CUDTGroup::ackMessage(int32_t msgno)
m_iSndAckedMsgNo = msgno;
}

void CUDTGroup::handleKeepalive(CUDTGroup::SocketData* gli)
void CUDTGroup::processKeepalive(CUDTGroup::SocketData* gli)
{
// received keepalive for that group member
// In backup group it means that the link went IDLE.
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class CUDTGroup
#endif

void ackMessage(int32_t msgno);
void handleKeepalive(SocketData*);
void processKeepalive(SocketData*);
void internalKeepalive(SocketData*);

private:
Expand Down
8 changes: 3 additions & 5 deletions srtcore/tsbpd_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,11 @@ drift_logger g_drift_logger;

#endif // SRT_DEBUG_TRACE_DRIFT

bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, int usRTTSample)
bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPktArrival, int usRTTSample)
{
if (!m_bTsbPdMode)
return false;

const time_point tsNow = steady_clock::now();

ScopedLock lck(m_mtxRW);

// Remember the first RTT sample measured. Ideally we need RTT0 - the one from the handshaking phase,
Expand All @@ -123,9 +121,9 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, int usRTTSample)
// A change in network delay has to be taken into account. The only way to get some estimation of it
// is to estimate RTT change and assume that the change of the one way network delay is
// approximated by the half of the RTT change.
const duration tdRTTDelta = microseconds_from((usRTTSample - m_iFirstRTT) / 2);
const duration tdRTTDelta = usRTTSample >= 0 ? microseconds_from((usRTTSample - m_iFirstRTT) / 2) : duration(0);
const time_point tsPktBaseTime = getPktTsbPdBaseTime(usPktTimestamp);
const steady_clock::duration tdDrift = tsNow - tsPktBaseTime - tdRTTDelta;
const steady_clock::duration tdDrift = tsPktArrival - tsPktBaseTime - tdRTTDelta;

const bool updated = m_DriftTracer.update(count_microseconds(tdDrift));

Expand Down
5 changes: 3 additions & 2 deletions srtcore/tsbpd_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ class CTsbpdTime
/// and can be used to estimate clock drift.
///
/// @param [in] pktTimestamp Timestamp of the arrived ACKACK packet.
/// @param [in] usRTTSample RTT sample from an ACK-ACKACK pair.
/// @param [in] tsPktArrival packet arrival time.
/// @param [in] usRTTSample RTT sample from an ACK-ACKACK pair. If no sample, pass '-1'.
///
/// @return true if TSBPD base time has changed, false otherwise.
bool addDriftSample(uint32_t pktTimestamp, int usRTTSample);
bool addDriftSample(uint32_t pktTimestamp, const time_point& tsPktArrival, int usRTTSample);

/// @brief Handle timestamp of data packet when 32-bit integer carryover is about to happen.
/// When packet timestamp approaches CPacket::MAX_TIMESTAMP, the TSBPD base time should be
Expand Down

0 comments on commit eae2749

Please sign in to comment.