Skip to content

Commit

Permalink
[core] use seq larger than m_RcvBaseSeqNo to update group readablity (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gou4shi1 authored Jun 1, 2021
1 parent 0f0caf9 commit 28a7006
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 23 deletions.
62 changes: 42 additions & 20 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,10 @@ size_t CRcvBuffer::dropData(int len)
bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
bool& w_passack,
int32_t& w_skipseqno,
int32_t& w_curpktseq)
int32_t& w_curpktseq,
int32_t base_seq)
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: base_seq=" << base_seq);
w_skipseqno = SRT_SEQNO_NONE;
w_passack = false;
// tsbpdtime will be retrieved by the below call
Expand All @@ -1137,8 +1139,8 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,

/* Check the acknowledged packets */
// getRcvReadyMsg returns true if the time to play for the first message
// (returned in w_tsbpdtime) is in the past.
if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1))
// that larger than base_seq is in the past.
if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1, base_seq))
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq);
return true;
Expand Down Expand Up @@ -1167,9 +1169,10 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
* No acked packets ready but caller want to know next packet to wait for
* Check the not yet acked packets that may be stuck by missing packet(s).
*/
bool haslost = false;
w_tsbpdtime = steady_clock::time_point(); // redundant, for clarity
w_passack = true;
bool haslost = false;
steady_clock::time_point tsbpdtime = steady_clock::time_point();
w_tsbpdtime = steady_clock::time_point();
w_passack = true;

// XXX SUSPECTED ISSUE with this algorithm:
// The above call to getRcvReadyMsg() should report as to whether:
Expand All @@ -1195,8 +1198,11 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// When done so, the below loop would be completely unnecessary.

// Logical description of the below algorithm:
// 1. Check if the VERY FIRST PACKET is valid; if so then:
// - check if it's ready to play, return boolean value that marks it.
// 1. update w_tsbpdtime and w_curpktseq if found one packet ready to play
// - keep check the next packet if still smaller than base_seq
// 2. set w_skipseqno if found packets before w_curpktseq lost
// if no packets larger than base_seq ready to play, return the largest RTP
// else return the first one that larger than base_seq and rady to play

for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
{
Expand All @@ -1208,19 +1214,21 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
}
else
{
/* We got the 1st valid packet */
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (w_tsbpdtime <= steady_clock::now())
tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (tsbpdtime <= steady_clock::now())
{
/* Packet ready to play */
w_tsbpdtime = tsbpdtime;
w_curpktseq = m_pUnit[i]->m_Packet.m_iSeqNo;
if (haslost)
w_skipseqno = w_curpktseq;

if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
{
/*
* Packet stuck on non-acked side because of missing packets.
* Tell 1st valid packet seqno so caller can skip (drop) the missing packets.
*/
w_skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo;
w_curpktseq = w_skipseqno;
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found ready packet %" << w_curpktseq
<< " but not larger than base_seq, try next");
continue;
}

HLOGC(brlog.Debug,
Expand All @@ -1234,6 +1242,10 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// ...
return true;
}

if (!is_zero(w_tsbpdtime)) {
return true;
}
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
Expand All @@ -1246,6 +1258,9 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// the 'haslost' is set, which means that it continues only to find the first valid
// packet after stating that the very first packet isn't valid.
}
if (!is_zero(w_tsbpdtime)) {
return true;
}
HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
return false;
}
Expand Down Expand Up @@ -1276,7 +1291,7 @@ int32_t CRcvBuffer::getTopMsgno() const
return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq();
}

bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto)
bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq)
{
const bool havelimit = upto != -1;
int end = -1, past_end = -1;
Expand Down Expand Up @@ -1342,7 +1357,8 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&
// 1. Get the TSBPD time of the unit. Stop and return false if this unit
// is not yet ready to play.
// 2. If it's ready to play, check also if it's decrypted. If not, skip it.
// 3. If it's ready to play and decrypted, stop and return it.
// 3. Check also if it's larger than base_seq, if not, skip it.
// 4. If it's ready to play, decrypted and larger than base, stop and return it.
if (!havelimit)
{
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
Expand All @@ -1361,6 +1377,12 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
freeunit = true; /* packet not decrypted */
}
else if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
{
IF_HEAVY_LOGGING(reason = "smaller than base_seq");
w_tsbpdtime = steady_clock::time_point();
freeunit = true;
}
else
{
HLOGC(brlog.Debug,
Expand Down Expand Up @@ -1415,7 +1437,7 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&

if (freeunit)
{
HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED: " << reason);
/* removed skipped, dropped, undecryptable bytes from rcv buffer */
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
countBytes(-1, -rmbytes, true);
Expand Down
11 changes: 9 additions & 2 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,17 @@ class CRcvBuffer
/// @param [out] w_passack true if 1st ready packet is not yet acknowleged (allowed to be delivered to the app)
/// @param [out] w_skipseqno SRT_SEQNO_NONE or seq number of 1st unacknowledged pkt ready to play preceeded by
/// missing packets.
/// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base if exist packet ready-to-play
/// and larger than base
/// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true
/// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE:
/// IF skipseqno != SRT_SEQNO_NONE, packet ready to play preceeded by missing packets.;
/// IF skipseqno == SRT_SEQNO_NONE, no missing packet but 1st not ready to play.
bool getRcvFirstMsg(time_point& w_tsbpdtime, bool& w_passack, int32_t& w_skipseqno, int32_t& w_curpktseq);
bool getRcvFirstMsg(time_point& w_tsbpdtime,
bool& w_passack,
int32_t& w_skipseqno,
int32_t& w_curpktseq,
int32_t base_seq = SRT_SEQNO_NONE);

/// Update the ACK point of the buffer.
/// @param [in] len size of data to be skip & acknowledged.
Expand Down Expand Up @@ -473,9 +479,10 @@ class CRcvBuffer
/// Parameters (of the 1st packet queue, ready to play or not):
/// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if
/// none
/// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base
/// @retval true 1st packet ready to play without discontinuity (no hole)
/// @retval false tsbpdtime = 0: no packet ready to play
bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto);
bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq = SRT_SEQNO_NONE);

public:
/// @brief Get clock drift in microseconds.
Expand Down
11 changes: 10 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5161,8 +5161,17 @@ void * srt::CUDT::tsbpd(void *param)
int32_t current_pkt_seq = 0;
steady_clock::time_point tsbpdtime;
bool rxready = false;
int32_t rcv_base_seq = SRT_SEQNO_NONE;
#if ENABLE_EXPERIMENTAL_BONDING
bool shall_update_group = false;
if (gkeeper.group)
{
// Functions called below will lock m_GroupLock, which in hierarchy
// lies after m_RecvLock. Must unlock m_RecvLock to be able to lock
// m_GroupLock inside the calls.
InvertedLock unrecv(self->m_RecvLock);
rcv_base_seq = gkeeper.group->getRcvBaseSeqNo();
}
#endif

enterCS(self->m_RcvBufferLock);
Expand All @@ -5174,7 +5183,7 @@ void * srt::CUDT::tsbpd(void *param)
int32_t skiptoseqno = SRT_SEQNO_NONE;
bool passack = true; // Get next packet to wait for even if not acked

rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq));
rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq);

HLOGC(tslog.Debug,
log << boolalpha << "NEXT PKT CHECK: rdy=" << rxready << " passack=" << passack << " skipto=%"
Expand Down
6 changes: 6 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,12 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
}
}

int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
return m_RcvBaseSeqNo;
}

void CUDTGroup::updateWriteState()
{
ScopedLock lg(m_GroupLock);
Expand Down
1 change: 1 addition & 0 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ class CUDTGroup
void updateWriteState();
void updateFailedLink();
void activateUpdateEvent(bool still_have_items);
int32_t getRcvBaseSeqNo();

/// Update the in-group array of packet providers per sequence number.
/// Also basing on the information already provided by possibly other sockets,
Expand Down

0 comments on commit 28a7006

Please sign in to comment.