Skip to content

Commit

Permalink
[core] refactor Group::recv() base on new rcv buffer to support messa…
Browse files Browse the repository at this point in the history
…ge mode
  • Loading branch information
gou4shi1 committed Jan 5, 2022
1 parent 3d26644 commit 4ff6c62
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 36 deletions.
147 changes: 127 additions & 20 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,10 @@ int CRcvBufferNew::insert(CUnit* unit)

int CRcvBufferNew::dropUpTo(int32_t seqno)
{
// Can drop only when nothing to read, and
// first unacknowledged packet is missing.
SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos);

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);

int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
SRT_ASSERT(len > 0);
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
Expand All @@ -180,34 +175,35 @@ int CRcvBufferNew::dropUpTo(int32_t seqno)
if (m_iMaxPosInc < 0)
m_iMaxPosInc = 0;

// Check that all packets being dropped are missing.
const int iDropCnt = len;
while (len > 0)
{
if (m_entries[m_iStartPos].pUnit != NULL)
{
if (!m_tsbpd.isEnabled() && m_bMessageAPI && !m_entries[m_iStartPos].pUnit->m_Packet.getMsgOrderFlag())
{
--m_numOutOfOrderPackets;
if (m_iStartPos == m_iFirstReadableOutOfOrder)
m_iFirstReadableOutOfOrder = -1;
}
releaseUnitInPos(m_iStartPos);
}

if (m_entries[m_iStartPos].status != EntryState_Empty)
{
SRT_ASSERT(m_entries[m_iStartPos].status == EntryState_Drop || m_entries[m_iStartPos].status == EntryState_Read);
m_entries[m_iStartPos].status = EntryState_Empty;
}

m_entries[m_iStartPos].status = EntryState_Empty;
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
m_iStartPos = incPos(m_iStartPos);
--len;
}

// Update positions
m_iStartSeqNo = seqno;
// Move forward if there are "read" entries.
// Move forward if there are "read/drop" entries.
releaseNextFillerEntries();
// Set nonread position to the starting position before updating,
// because start position was increased, and preceeding packets are invalid.
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
}

Expand All @@ -219,6 +215,7 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
const int end_pos = incPos(m_iStartPos, m_iMaxPosInc);
if (msgno != 0)
{
int minDroppedOffset = -1;
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
// TODO: Maybe check status?
Expand All @@ -228,11 +225,32 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
const int32_t msgseq = m_entries[i].pUnit->m_Packet.getMsgSeq(m_bPeerRexmitFlag);
if (msgseq == msgno)
{
if (!m_tsbpd.isEnabled() && m_bMessageAPI && !m_entries[i].pUnit->m_Packet.getMsgOrderFlag())
{
--m_numOutOfOrderPackets;
if (i == m_iFirstReadableOutOfOrder)
m_iFirstReadableOutOfOrder = -1;
}
releaseUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}
}

// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset < getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
{
if (!checkFirstReadableOutOfOrder())
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
return;
}

Expand All @@ -248,17 +266,41 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)

const int start_off = max(0, offset_a);
const int last_pos = incPos(m_iStartPos, offset_b);
int minDroppedOffset = -1;
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != last_pos; i = incPos(i))
{
if (m_entries[i].pUnit)
{
if (!m_tsbpd.isEnabled() && m_bMessageAPI && !m_entries[i].pUnit->m_Packet.getMsgOrderFlag())
{
--m_numOutOfOrderPackets;
if (i == m_iFirstReadableOutOfOrder)
m_iFirstReadableOutOfOrder = -1;
}
releaseUnitInPos(i);
}
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}

LOGC(rbuflog.Debug, log << "CRcvBufferNew.dropMessage(): [" << seqnolo << "; "
<< seqnohi << "].");

// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset < getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
{
if (!checkFirstReadableOutOfOrder())
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
}

int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
Expand All @@ -274,8 +316,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo);

const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder;
// Remember if we actually read out of order packet.
const bool readingOutOfOrderPacket = !canReadInOrder || m_iStartPos == m_iFirstReadableOutOfOrder;

size_t remain = len;
char* dst = data;
Expand Down Expand Up @@ -313,13 +353,14 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
const bool pbLast = packet.getMsgBoundary() & PB_LAST;
if (msgctrl && (packet.getMsgBoundary() & PB_FIRST))
{
msgctrl->pktseq = pktseqno;
msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag);
}
if (msgctrl && pbLast)
{
msgctrl->srctime = count_microseconds(getPktTsbPdTime(packet.getMsgTimeStamp()).time_since_epoch());
}
if (msgctrl)
msgctrl->pktseq = pktseqno;

releaseUnitInPos(i);
if (updateStartPos)
Expand All @@ -344,8 +385,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
}

countBytes(-pkts_read, -bytes_extracted);
if (!m_tsbpd.isEnabled() && readingOutOfOrderPacket)
updateFirstReadableOutOfOrder();

releaseNextFillerEntries();

Expand All @@ -355,6 +394,11 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
//updateNonreadPos();
}

if (!m_tsbpd.isEnabled())
// We need updateFirstReadableOutOfOrder() here even if we are reading inorder,
// incase readable inorder packets are all read out.
updateFirstReadableOutOfOrder();

const int bytes_read = dst - data;
if (bytes_read < bytes_extracted)
{
Expand Down Expand Up @@ -588,6 +632,41 @@ bool CRcvBufferNew::isRcvDataReady(time_point time_now) const
return info.tsbpd_time <= time_now;
}

CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstReadablePacketInfo(time_point time_now) const
{
const PacketInfo unreadableInfo = {SRT_SEQNO_NONE, false, time_point()};
const bool hasInorderPackets = hasReadableInorderPkts();

if (!m_tsbpd.isEnabled())
{
if (hasInorderPackets)
{
const CPacket& packet = m_entries[m_iStartPos].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), false, time_point()};
return info;
}
SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI);
if (m_iFirstReadableOutOfOrder >= 0)
{
SRT_ASSERT(m_numOutOfOrderPackets > 0);
const CPacket& packet = m_entries[m_iFirstReadableOutOfOrder].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), true, time_point()};
return info;
}
return unreadableInfo;
}

if (!hasInorderPackets)
return unreadableInfo;

const PacketInfo info = getFirstValidPacketInfo();

if (info.tsbpd_time <= time_now)
return info;
else
return unreadableInfo;
}

void CRcvBufferNew::countBytes(int pkts, int bytes)
{
ScopedLock lock(m_BytesCountLock);
Expand Down Expand Up @@ -775,6 +854,34 @@ void CRcvBufferNew::updateFirstReadableOutOfOrder()
return;
}

bool CRcvBufferNew::checkFirstReadableOutOfOrder()
{
if (m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder < 0 || m_iMaxPosInc == 0)
return false;

const int endPos = incPos(m_iStartPos, m_iMaxPosInc);
int msgno = -1;
for (int pos = m_iFirstReadableOutOfOrder; pos != endPos; pos = incPos(pos))
{
if (!m_entries[pos].pUnit)
return false;

const CPacket& pkt = m_entries[pos].pUnit->m_Packet;
if (pkt.getMsgOrderFlag())
return false;

if (msgno == -1)
msgno = pkt.getMsgSeq(m_bPeerRexmitFlag);
else if (msgno != pkt.getMsgSeq(m_bPeerRexmitFlag))
return false;

if (pkt.getMsgBoundary() & PB_LAST)
return true;
}

return false;
}

int CRcvBufferNew::scanNotInOrderMessageRight(const int startPos, int msgNo) const
{
// Search further packets to the right.
Expand Down
5 changes: 5 additions & 0 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class CRcvBufferNew
/// IF skipseqno == -1, no missing packet but 1st not ready to play.
PacketInfo getFirstValidPacketInfo() const;

PacketInfo getFirstReadablePacketInfo(time_point time_now) const;

/// Get information on packets available to be read.
/// @returns a pair of sequence numbers (first available; first unavailable).
///
Expand Down Expand Up @@ -215,6 +217,7 @@ class CRcvBufferNew
private:
inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; }
inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); }
inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : (m_szSize + pos2 - pos1); }

private:
void countBytes(int pkts, int bytes);
Expand All @@ -233,6 +236,8 @@ class CRcvBufferNew
/// Scan for availability of out of order packets.
void onInsertNotInOrderPacket(int insertpos);
void updateFirstReadableOutOfOrder();
// Return true if m_iFirstReadableOutOfOrder if still readable after dropMessage()
bool checkFirstReadableOutOfOrder();
int scanNotInOrderMessageRight(int startPos, int msgNo) const;
int scanNotInOrderMessageLeft(int startPos, int msgNo) const;

Expand Down
33 changes: 19 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5305,6 +5305,12 @@ void * srt::CUDT::tsbpd(void* param)

int srt::CUDT::dropTooLateUpTo(int seqno)
{
// seqno can be larger than m_iRcvCurrSeqNo if it's dropped by group receiver.
if (CSeqNo::seqcmp(CSeqNo::decseq(seqno), m_iRcvCurrSeqNo) > 0)
{
seqno = CSeqNo::incseq(m_iRcvCurrSeqNo);
}

const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno);

// seq_gap_len can be <= 0 if a packet has been dropped by the sender.
Expand Down Expand Up @@ -7675,7 +7681,7 @@ void srt::CUDT::releaseSynch()
}

// [[using locked(m_RcvBufferLock)]];
int32_t srt::CUDT::ackDataUpTo(int32_t ack)
void srt::CUDT::ackDataUpTo(int32_t ack)
{
const int acksize SRT_ATR_UNUSED = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

Expand All @@ -7685,24 +7691,14 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack)
m_iRcvLastAck = ack;
m_iRcvLastSkipAck = ack;

#if ENABLE_NEW_RCVBUFFER
const std::pair<int, int> range = m_pRcvBuffer->getAvailablePacketsRange();
// Some packets acknowledged are not available in the buffer.
if (CSeqNo::seqcmp(range.second, ack) < 0)
{
LOGC(xtlog.Error, log << "IPE: Acknowledged seqno %" << ack << " outruns the RCV buffer state %" << range.first
<< " - %" << range.second);
}
return CSeqNo::decseq(range.second);
#else
#if !ENABLE_NEW_RCVBUFFER
// NOTE: This is new towards UDT and prevents spurious
// wakeup of select/epoll functions when no new packets
// were signed off for extraction.
if (acksize > 0)
{
m_pRcvBuffer->ackData(acksize);
}
return ack;
#endif
}

Expand Down Expand Up @@ -7942,7 +7938,16 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// IF ack %> m_iRcvLastAck
if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
{
const int32_t group_read_seq SRT_ATR_UNUSED = ackDataUpTo(ack);
ackDataUpTo(ack);

#if ENABLE_EXPERIMENTAL_BONDING
#if ENABLE_NEW_RCVBUFFER
const int32_t group_read_seq = m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()).seqno;
#else
const int32_t group_read_seq = CSeqNo::decseq(ack);
#endif
#endif

InvertedLock un_bufflock (m_RcvBufferLock);

#if ENABLE_EXPERIMENTAL_BONDING
Expand Down Expand Up @@ -8027,7 +8032,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
}
}
#if ENABLE_EXPERIMENTAL_BONDING
if (m_parent->m_GroupOf)
if (group_read_seq != SRT_SEQNO_NONE && m_parent->m_GroupOf)
{
// See above explanation for double-checking
ScopedLock glock (uglobal().m_GlobControlLock);
Expand Down
3 changes: 1 addition & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,7 @@ class CUDT
/// @brief Acknowledge reading position up to the @p seq.
/// Updates m_iRcvLastAck and m_iRcvLastSkipAck to @p seq.
/// @param seq first unacknowledged packet sequence number.
/// @return
int32_t ackDataUpTo(int32_t seq);
void ackDataUpTo(int32_t seq);

void handleKeepalive(const char* data, size_t lenghth);

Expand Down
Loading

0 comments on commit 4ff6c62

Please sign in to comment.