Skip to content

Commit

Permalink
Adding FixedArray
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jul 14, 2021
1 parent bd1ee2c commit a8f4551
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 53 deletions.
98 changes: 50 additions & 48 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#define ENABLE_NEW_RCVBUFFER 1
#if ENABLE_NEW_RCVBUFFER
#include <cmath>
#include "buffer_rcv.h"
Expand All @@ -18,7 +19,7 @@ namespace srt {


/*
* RcvBuffer2 (circular buffer):
* RcvBufferNew (circular buffer):
*
* |<------------------- m_iSize ----------------------------->|
* | |<--- acked pkts -->|<--- m_iMaxPosInc --->| |
Expand All @@ -40,8 +41,8 @@ namespace srt {
*/

CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit)
: m_entries(NULL)
, m_szSize(size)
: m_entries(size)
, m_szSize(size) // TODO: maybe just use m_entries.size()
, m_pUnitQueue(unitqueue)
, m_iStartSeqNo(initSeqNo)
, m_iStartPos(0)
Expand All @@ -57,17 +58,18 @@ CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue,
, m_uAvgPayloadSz(7 * 188)
{
SRT_ASSERT(size < INT_MAX); // All position pointers are integers


}

CRcvBufferNew::~CRcvBufferNew()
{
for (size_t i = 0; i < m_szSize; ++i)
{
if (m_entries[i].pUnit != NULL)
Entry& entry = m_entries[i];
CUnit* unit = entry.pUnit;
if (unit != NULL)
{
m_pUnitQueue->makeUnitFree(m_entries[i].pUnit);
m_pUnitQueue->makeUnitFree(unit);
m_entries[i].pUnit = NULL;
}
}
}
Expand Down Expand Up @@ -141,9 +143,10 @@ void CRcvBufferNew::dropUpTo(int32_t seqno)
updateNonreadPos();
}

void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
void CRcvBufferNew::dropMessage(int32_t seqnolo SRT_ATR_UNUSED, int32_t seqnohi SRT_ATR_UNUSED, int32_t msgno SRT_ATR_UNUSED)
{
// TODO: Add implementation.

}

int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
Expand All @@ -164,15 +167,16 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
const bool updateStartPos = (readPos == m_iStartPos); // Indicates if the m_iStartPos can be changed
for (int i = readPos;; i = incPos(i))
{
SRT_ASSERT(m_pUnit[i]);
if (!m_pUnit[i])
SRT_ASSERT(m_entries[i].pUnit);
if (!m_entries[i].pUnit)
{
LOGC(rbuflog.Error, log << "CRcvBufferNew::readMessage(): null packet encountered.");
break;
}

const CPacket& packet = m_pUnit[i]->m_Packet;
const CPacket& packet = m_entries[i].pUnit->m_Packet;
const size_t pktsize = packet.getLength();
const int32_t pktseqno = packet.getSeqNo();

const size_t unitsize = std::min(remain, pktsize);
memcpy(dst, packet.m_pcData, unitsize);
Expand All @@ -190,7 +194,7 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
const bool pbLast = packet.getMsgBoundary() & PB_LAST;
if (msgctrl && (packet.getMsgBoundary() & PB_FIRST))
{
msgctrl->pktseq = packet.getSeqNo();
msgctrl->pktseq = pktseqno;
msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag);
}
if (msgctrl && pbLast)
Expand All @@ -200,18 +204,17 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)

if (updateStartPos)
{
CUnit* tmp = m_pUnit[i];
releaseUnitInPos(i);

m_iStartPos = incPos(i);
--m_iMaxPosInc;
SRT_ASSERT(m_iMaxPosInc >= 0);
m_iStartSeqNo = CSeqNo::incseq(tmp->m_Packet.getSeqNo());

m_pUnit[i] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
m_iStartSeqNo = CSeqNo::incseq(pktseqno);
}
else
{
m_pUnit[i]->m_iFlag = CUnit::PASSACK;
m_entries[i].status = EntryState_Read;
//m_pUnit[i]->m_iFlag = CUnit::PASSACK;
}

if (pbLast)
Expand Down Expand Up @@ -260,10 +263,11 @@ CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstValidPacketInfo() const
const int end_pos = (m_iStartPos + m_iMaxPosInc) % m_szSize;
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
if (!m_pUnit[i])
// TODO: Maybe check status?
if (!m_entries[i].pUnit)
continue;

const CPacket& packet = m_pUnit[i]->m_Packet;
const CPacket& packet = m_entries[i].pUnit->m_Packet;
const PacketInfo info = { packet.getSeqNo(), i != m_iStartPos, getPktTsbPdTime(packet.getMsgTimeStamp()) };
return info;
}
Expand Down Expand Up @@ -334,19 +338,18 @@ void CRcvBufferNew::countBytes(int pkts, int bytes, bool acked)

void CRcvBufferNew::releaseUnitInPos(int pos)
{
SRT_ASSERT(m_pUnit[pos]);

CUnit* tmp = m_pUnit[pos];
m_pUnit[pos] = NULL;
CUnit* tmp = m_entries[pos].pUnit;
SRT_ASSERT(tmp !+ NULL);
m_entries[pos] = Entry();
m_pUnitQueue->makeUnitFree(tmp);
}

void CRcvBufferNew::releasePassackUnits()
{
int pos = m_iStartPos;
while (m_pUnit[pos] && m_pUnit[pos]->m_iFlag == CUnit::PASSACK)
while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Read)
{
m_iStartSeqNo = CSeqNo::incseq(m_pUnit[pos]->m_Packet.getSeqNo());
m_iStartSeqNo = CSeqNo::incseq(m_entries[pos].pUnit->m_Packet.getSeqNo());
releaseUnitInPos(pos);
pos = incPos(pos);
m_iStartPos = pos;
Expand All @@ -368,12 +371,12 @@ void CRcvBufferNew::updateNonreadPos()
//}

// Check if the gap is filled.
SRT_ASSERT(m_pUnit[m_iFirstNonreadPos]);
SRT_ASSERT(m_entries[m_iFirstNonreadPos].pUnit);

const int last_pos = incPos(m_iStartPos, m_iMaxPosInc);

int pos = m_iFirstNonreadPos;
while (m_pUnit[pos] && m_pUnit[pos]->m_iFlag == CUnit::GOOD && (m_pUnit[pos]->m_Packet.getMsgBoundary() & PB_FIRST))
while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST))
{
// bool good = true;

Expand All @@ -393,21 +396,21 @@ void CRcvBufferNew::updateNonreadPos()
// will be interrupted.
for (int i = pos; i != last_pos; i = (i + 1) % m_szSize)
{
if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
if (!m_entries[i].pUnit || m_entries[pos].status != EntryState_Avail)
{
// good = false;
break;
}

// Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST)
if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST)
{
m_iFirstNonreadPos = incPos(i);
break;
}
}

if (pos == m_iFirstNonreadPos || !m_pUnit[m_iFirstNonreadPos])
if (pos == m_iFirstNonreadPos || !m_entries[m_iFirstNonreadPos].pUnit)
break;

pos = m_iFirstNonreadPos;
Expand All @@ -416,16 +419,16 @@ void CRcvBufferNew::updateNonreadPos()
// 1. If there is a gap between this packet and m_iLastReadablePos
// then no sense to update m_iLastReadablePos.

// 2. The simplest case is when this is the first sequntial packet
// 2. The simplest case is when this is the first sequential packet
}

int CRcvBufferNew::findLastMessagePkt()
{
for (int i = m_iStartPos; i != m_iFirstNonreadPos; i = incPos(i))
{
SRT_ASSERT(m_pUnit[i]);
SRT_ASSERT(m_entries[i].pUnit);

if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST)
if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST)
{
return i;
}
Expand All @@ -440,7 +443,7 @@ void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos)
if (m_numOutOfOrderPackets == 0)
return;

// If the following condition is true, there is alreadt a packet,
// If the following condition is true, there is already a packet,
// that can be read out of order. We don't need to search for
// another one. The search should be done when that packet is read out from the buffer.
//
Expand All @@ -452,8 +455,8 @@ void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos)
// Just a sanity check. This function is called when a new packet is added.
// So the should be unacknowledged packets.
SRT_ASSERT(m_iMaxPosInc > 0);
SRT_ASSERT(m_pUnit[insertPos]);
const CPacket& pkt = m_pUnit[insertPos]->m_Packet;
SRT_ASSERT(m_entries[insertPos].pUnit);
const CPacket& pkt = m_entries[insertPos].pUnit->m_Packet;
const PacketBoundary boundary = pkt.getMsgBoundary();

//if ((boundary & PB_FIRST) && (boundary & PB_LAST))
Expand Down Expand Up @@ -500,13 +503,13 @@ void CRcvBufferNew::updateFirstReadableOutOfOrder()

for (int pos = m_iStartPos; outOfOrderPktsRemain; pos = incPos(pos))
{
if (!m_pUnit[pos])
if (!m_entries[pos].pUnit)
{
posFirst = posLast = msgNo = -1;
continue;
}

const CPacket& pkt = m_pUnit[pos]->m_Packet;
const CPacket& pkt = m_entries[pos].pUnit->m_Packet;

if (pkt.getMsgOrderFlag()) // Skip in order packet
{
Expand Down Expand Up @@ -554,11 +557,10 @@ int CRcvBufferNew::scanNotInOrderMessageRight(const int startPos, int msgNo) con
do
{
pos = incPos(pos);

if (!m_pUnit[pos])
if (!m_entries[pos].pUnit)
break;

const CPacket& pkt = m_pUnit[pos]->m_Packet;
const CPacket& pkt = m_entries[pos].pUnit->m_Packet;

if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo)
{
Expand Down Expand Up @@ -586,10 +588,10 @@ int CRcvBufferNew::scanNotInOrderMessageLeft(const int startPos, int msgNo) cons
{
pos = decPos(pos);

if (!m_pUnit[pos])
if (!m_entries[pos].pUnit)
return -1;

const CPacket& pkt = m_pUnit[pos]->m_Packet;
const CPacket& pkt = m_entries[pos].pUnit->m_Packet;

if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo)
{
Expand Down Expand Up @@ -652,9 +654,9 @@ string CRcvBufferNew::strFullnessState(const time_point& tsNow) const
if (m_tsbpd.isEnabled())
{
ss << " (TSBPD ready in ";
if (m_pUnit[m_iStartPos])
if (m_entries[m_iStartPos].pUnit)
{
const uint32_t usPktTimestamp = m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp();
const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp();
ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow);
}
else
Expand All @@ -663,10 +665,10 @@ string CRcvBufferNew::strFullnessState(const time_point& tsNow) const
}

const int iLastPos = incPos(m_iStartPos, m_iMaxPosInc);
if (m_pUnit[iLastPos])
if (m_entries[iLastPos].pUnit)
{
ss << ":";
const uint32_t usPktTimestamp = m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp();
const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp();
ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow);
ss << " ms";
}
Expand Down
14 changes: 11 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef INC_SRT_BUFFER_RCV_H
#define INC_SRT_BUFFER_RCV_H

#define ENABLE_NEW_RCVBUFFER 1
#if ENABLE_NEW_RCVBUFFER

#include "buffer.h" // AvgBufSize
Expand Down Expand Up @@ -196,6 +197,7 @@ class CRcvBufferNew
{
// m_pUnitQueue->makeUnitFree(m_entries[i].pUnit);
}
m_pUnit = pUnit;
}
private:
CUnit* m_pUnit;
Expand All @@ -210,16 +212,22 @@ class CRcvBufferNew
};
struct Entry
{
Entry()
: pUnit(NULL)
, status(EntryState_Empty)
{}

CUnit* pUnit;
EntryStatus status;
};

static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; }
//static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; }

const Entry* m_entries;
FixedArray<Entry> m_entries;
//const Entry* m_entries;

// TODO: maybe use std::vector?
CUnit** m_pUnit; // pointer to the array of units (buffer)
//CUnit** m_pUnit; // pointer to the array of units (buffer)
const size_t m_szSize; // size of the array of units (buffer)
CUnitQueue* m_pUnitQueue; // the shared unit queue

Expand Down
4 changes: 3 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6844,7 +6844,9 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
return res;
}

#if !ENABLE_NEW_RCVBUFFER
const int seqdistance = -1;
#endif

if (!m_config.bSynRecving)
{
Expand Down Expand Up @@ -8819,13 +8821,13 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)
void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
{
{
const bool using_rexmit_flag = m_bPeerRexmitFlag;
UniqueLock rlock(m_RecvLock);
#if ENABLE_NEW_RCVBUFFER
// TODO
// If message number is present, try to drop the whole message.
// If message number is 0, then drop by sequence range.
#else
const bool using_rexmit_flag = m_bPeerRexmitFlag;
m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag);
#endif
// When the drop request was received, it means that there are
Expand Down
Loading

0 comments on commit a8f4551

Please sign in to comment.