Skip to content

Commit

Permalink
[core] Stats: do not count discarded packets as dropped (#2932).
Browse files Browse the repository at this point in the history
Valid for a broadcast group member dropping existing packets from the RCV buffer because they were read from another member.

Co-authored-by: Sektor van Skijlen <ethouris@gmail.com>
  • Loading branch information
maxsharabayko and ethouris authored Apr 22, 2024
1 parent f6c2315 commit 4f925fb
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 15 deletions.
18 changes: 13 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit)
return 0;
}

int CRcvBuffer::dropUpTo(int32_t seqno)
std::pair<int, int> CRcvBuffer::dropUpTo(int32_t seqno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);
Expand All @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
return 0;
return std::make_pair(0, 0);
}

m_iMaxPosOff -= len;
if (m_iMaxPosOff < 0)
m_iMaxPosOff = 0;

const int iDropCnt = len;
int iNumDropped = 0; // Number of dropped packets that were missing.
int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer.
while (len > 0)
{
// Note! Dropping a EntryState_Read must not be counted as a drop because it was read.
// Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier.
if (m_entries[m_iStartPos].status == EntryState_Avail)
++iNumDiscarded;
else if (m_entries[m_iStartPos].status == EntryState_Empty)
++iNumDropped;
dropUnitInPos(m_iStartPos);
m_entries[m_iStartPos].status = EntryState_Empty;
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
Expand All @@ -246,7 +253,7 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
return std::make_pair(iNumDropped, iNumDiscarded);
}

int CRcvBuffer::dropAll()
Expand All @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll()
return 0;

const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff);
return dropUpTo(end_seqno);
const std::pair<int, int> numDropped = dropUpTo(end_seqno);
return numDropped.first + numDropped.second;
}

int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting)
Expand Down
4 changes: 2 additions & 2 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class CRcvBuffer

/// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
/// @param [in] seqno drop units up to this sequence number
/// @return number of dropped packets.
int dropUpTo(int32_t seqno);
/// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded).
std::pair<int, int> dropUpTo(int32_t seqno);

/// @brief Drop all the packets in the receiver buffer.
/// The starting position and seqno are shifted right after the last packet in the buffer.
Expand Down
19 changes: 13 additions & 6 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5576,24 +5576,30 @@ void * srt::CUDT::tsbpd(void* param)
return NULL;
}

int srt::CUDT::rcvDropTooLateUpTo(int seqno)
int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason)
{
// Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders.
if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
seqno = CSeqNo::incseq(m_iRcvCurrSeqNo);

dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno));

const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno);
if (iDropCnt > 0)
const std::pair<int, int> iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno);
const int iDropCnt = iDropDiscardedPkts.first;
const int iDiscardedCnt = iDropDiscardedPkts.second;
const int iDropCntTotal = iDropCnt + iDiscardedCnt;

// In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket.
const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal;
if (iDropStatCnt > 0)
{
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt));
leaveCS(m_StatsLock);
}
return iDropCnt;
return iDropCntTotal;
}

void srt::CUDT::setInitialRcvSeq(int32_t isn)
Expand Down Expand Up @@ -7835,7 +7841,7 @@ void srt::CUDT::dropToGroupRecvBase()
return;

ScopedLock lck(m_RcvBufferLock);
int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base));
const int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base), DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
Expand Down Expand Up @@ -8063,6 +8069,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
string reason; // just for "a reason" of giving particular % for ACK

#if ENABLE_BONDING
// TODO: The group drops other members upon reading, maybe no longer needed here?
dropToGroupRecvBase();
#endif

Expand Down
9 changes: 8 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -755,11 +755,18 @@ class CUDT
// TSBPD thread main function.
static void* tsbpd(void* param);

enum DropReason
{
DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP).
DROP_DISCARD //< Drop because another group member already provided these packets.
};

/// Drop too late packets (receiver side). Update loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @param reason A reason for dropping (see @a DropReason).
/// @return The number of packets dropped.
int rcvDropTooLateUpTo(int seqno);
int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE);

static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2338,7 +2338,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
Expand Down

0 comments on commit 4f925fb

Please sign in to comment.