Skip to content

Commit

Permalink
[core] Fix RCV drop count when dropping on SND DROP REQ.
Browse files Browse the repository at this point in the history
Extended RCVBUF trace logging.
  • Loading branch information
maxsharabayko committed Feb 21, 2022
1 parent 0c5bf7a commit 81a31da
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
26 changes: 19 additions & 7 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ int CRcvBufferNew::insert(CUnit* unit)
const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno);

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::insert: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::insert: seqno " << seqno);
IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << unit->m_Packet.getMsgSeq(m_bPeerRexmitFlag));
IF_RCVBUF_DEBUG(scoped_log.ss << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset);

if (offset < 0)
{
Expand Down Expand Up @@ -198,15 +200,17 @@ int CRcvBufferNew::dropUpTo(int32_t seqno)
return iDropCnt;
}

void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
int CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo);
// TODO: count bytes as removed?
const int end_pos = incPos(m_iStartPos, m_iMaxPosInc);
if (msgno != 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << msgno);
int minDroppedOffset = -1;
int iDropCnt = 0;
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
// TODO: Maybe check status?
Expand All @@ -216,12 +220,14 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
const int32_t msgseq = m_entries[i].pUnit->m_Packet.getMsgSeq(m_bPeerRexmitFlag);
if (msgseq == msgno)
{
++iDropCnt;
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}
}
IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt);
// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
releaseNextFillerEntries();
Expand All @@ -236,7 +242,7 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
return;
return iDropCnt;
}

// Drop by packet seqno range.
Expand All @@ -246,15 +252,17 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
{
LOGC(rbuflog.Debug, log << "CRcvBufferNew.dropMessage(): nothing to drop. Requested [" << seqnolo << "; "
<< seqnohi << "]. Buffer start " << m_iStartSeqNo << ".");
return;
return 0;
}

const int start_off = max(0, offset_a);
const int last_pos = incPos(m_iStartPos, offset_b);
int minDroppedOffset = -1;
int iDropCnt = 0;
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != last_pos; i = incPos(i))
{
dropUnitInPos(i);
++iDropCnt;
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
Expand All @@ -277,6 +285,8 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}

return iDropCnt;
}

int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
Expand All @@ -288,11 +298,11 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
return 0;
}

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo);

const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder;

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo << " m_iStartPos " << m_iStartPos << " readPos " << readPos);

size_t remain = len;
char* dst = data;
int pkts_read = 0;
Expand Down Expand Up @@ -381,6 +391,8 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
LOGC(rbuflog.Error, log << "readMessage: small dst buffer, copied only " << bytes_read << "/" << bytes_extracted << " bytes.");
}

IF_RCVBUF_DEBUG(scoped_log.ss << " pldi64 " << *reinterpret_cast<uint64_t*>(data));

return bytes_read;
}

Expand Down
3 changes: 2 additions & 1 deletion srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class CRcvBufferNew
/// @param seqnolo sequence number of the first packet in the dropping range.
/// @param seqnohi sequence number of the last packet in the dropping range.
/// @param msgno message number to drop (0 if unknown)
void dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno);
/// @return the number of packets actually dropped.
int dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno);

/// Read the whole message from one or several packets.
///
Expand Down
15 changes: 14 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8814,7 +8814,20 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
{
ScopedLock rblock(m_RcvBufferLock);
#if ENABLE_NEW_RCVBUFFER
m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag));
const int iDropCnt = m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag));

if (iDropCnt > 0)
{
LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %"
<< dropdata[0] << "-%" << dropdata[1] << ", msgno " << ctrlpkt.getMsgSeq(using_rexmit_flag)
<< " (SND DROP REQUEST).");

enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (size_t)iDropCnt));
leaveCS(m_StatsLock);
}
#else
m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag);
#endif
Expand Down

0 comments on commit 81a31da

Please sign in to comment.