From a8f4551a97ac63df85b104db25f6b4278a6de4a6 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 13 Jul 2021 18:32:12 +0200 Subject: [PATCH] Adding FixedArray --- srtcore/buffer_rcv.cpp | 98 ++++++++++++++++++----------------- srtcore/buffer_rcv.h | 14 +++-- srtcore/core.cpp | 4 +- srtcore/utilities.h | 64 +++++++++++++++++++++++ test/test_rcvbuffer2_read.cpp | 2 +- 5 files changed, 129 insertions(+), 53 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 2ac1dc1999..74975460e0 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -1,3 +1,4 @@ +#define ENABLE_NEW_RCVBUFFER 1 #if ENABLE_NEW_RCVBUFFER #include #include "buffer_rcv.h" @@ -18,7 +19,7 @@ namespace srt { /* - * RcvBuffer2 (circular buffer): + * RcvBufferNew (circular buffer): * * |<------------------- m_iSize ----------------------------->| * | |<--- acked pkts -->|<--- m_iMaxPosInc --->| | @@ -40,8 +41,8 @@ namespace srt { */ CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit) - : m_entries(NULL) - , m_szSize(size) + : m_entries(size) + , m_szSize(size) // TODO: maybe just use m_entries.size() , m_pUnitQueue(unitqueue) , m_iStartSeqNo(initSeqNo) , m_iStartPos(0) @@ -57,17 +58,18 @@ CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, , m_uAvgPayloadSz(7 * 188) { SRT_ASSERT(size < INT_MAX); // All position pointers are integers - - } CRcvBufferNew::~CRcvBufferNew() { for (size_t i = 0; i < m_szSize; ++i) { - if (m_entries[i].pUnit != NULL) + Entry& entry = m_entries[i]; + CUnit* unit = entry.pUnit; + if (unit != NULL) { - m_pUnitQueue->makeUnitFree(m_entries[i].pUnit); + m_pUnitQueue->makeUnitFree(unit); + m_entries[i].pUnit = NULL; } } } @@ -141,9 +143,10 @@ void CRcvBufferNew::dropUpTo(int32_t seqno) updateNonreadPos(); } -void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) +void CRcvBufferNew::dropMessage(int32_t seqnolo SRT_ATR_UNUSED, int32_t seqnohi SRT_ATR_UNUSED, int32_t msgno SRT_ATR_UNUSED) { // TODO: Add implementation. + } int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) @@ -164,15 +167,16 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) const bool updateStartPos = (readPos == m_iStartPos); // Indicates if the m_iStartPos can be changed for (int i = readPos;; i = incPos(i)) { - SRT_ASSERT(m_pUnit[i]); - if (!m_pUnit[i]) + SRT_ASSERT(m_entries[i].pUnit); + if (!m_entries[i].pUnit) { LOGC(rbuflog.Error, log << "CRcvBufferNew::readMessage(): null packet encountered."); break; } - const CPacket& packet = m_pUnit[i]->m_Packet; + const CPacket& packet = m_entries[i].pUnit->m_Packet; const size_t pktsize = packet.getLength(); + const int32_t pktseqno = packet.getSeqNo(); const size_t unitsize = std::min(remain, pktsize); memcpy(dst, packet.m_pcData, unitsize); @@ -190,7 +194,7 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) const bool pbLast = packet.getMsgBoundary() & PB_LAST; if (msgctrl && (packet.getMsgBoundary() & PB_FIRST)) { - msgctrl->pktseq = packet.getSeqNo(); + msgctrl->pktseq = pktseqno; msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag); } if (msgctrl && pbLast) @@ -200,18 +204,17 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) if (updateStartPos) { - CUnit* tmp = m_pUnit[i]; + releaseUnitInPos(i); + m_iStartPos = incPos(i); --m_iMaxPosInc; SRT_ASSERT(m_iMaxPosInc >= 0); - m_iStartSeqNo = CSeqNo::incseq(tmp->m_Packet.getSeqNo()); - - m_pUnit[i] = NULL; - m_pUnitQueue->makeUnitFree(tmp); + m_iStartSeqNo = CSeqNo::incseq(pktseqno); } else { - m_pUnit[i]->m_iFlag = CUnit::PASSACK; + m_entries[i].status = EntryState_Read; + //m_pUnit[i]->m_iFlag = CUnit::PASSACK; } if (pbLast) @@ -260,10 +263,11 @@ CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstValidPacketInfo() const const int end_pos = (m_iStartPos + m_iMaxPosInc) % m_szSize; for (int i = m_iStartPos; i != end_pos; i = incPos(i)) { - if (!m_pUnit[i]) + // TODO: Maybe check status? + if (!m_entries[i].pUnit) continue; - const CPacket& packet = m_pUnit[i]->m_Packet; + const CPacket& packet = m_entries[i].pUnit->m_Packet; const PacketInfo info = { packet.getSeqNo(), i != m_iStartPos, getPktTsbPdTime(packet.getMsgTimeStamp()) }; return info; } @@ -334,19 +338,18 @@ void CRcvBufferNew::countBytes(int pkts, int bytes, bool acked) void CRcvBufferNew::releaseUnitInPos(int pos) { - SRT_ASSERT(m_pUnit[pos]); - - CUnit* tmp = m_pUnit[pos]; - m_pUnit[pos] = NULL; + CUnit* tmp = m_entries[pos].pUnit; + SRT_ASSERT(tmp !+ NULL); + m_entries[pos] = Entry(); m_pUnitQueue->makeUnitFree(tmp); } void CRcvBufferNew::releasePassackUnits() { int pos = m_iStartPos; - while (m_pUnit[pos] && m_pUnit[pos]->m_iFlag == CUnit::PASSACK) + while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Read) { - m_iStartSeqNo = CSeqNo::incseq(m_pUnit[pos]->m_Packet.getSeqNo()); + m_iStartSeqNo = CSeqNo::incseq(m_entries[pos].pUnit->m_Packet.getSeqNo()); releaseUnitInPos(pos); pos = incPos(pos); m_iStartPos = pos; @@ -368,12 +371,12 @@ void CRcvBufferNew::updateNonreadPos() //} // Check if the gap is filled. - SRT_ASSERT(m_pUnit[m_iFirstNonreadPos]); + SRT_ASSERT(m_entries[m_iFirstNonreadPos].pUnit); const int last_pos = incPos(m_iStartPos, m_iMaxPosInc); int pos = m_iFirstNonreadPos; - while (m_pUnit[pos] && m_pUnit[pos]->m_iFlag == CUnit::GOOD && (m_pUnit[pos]->m_Packet.getMsgBoundary() & PB_FIRST)) + while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST)) { // bool good = true; @@ -393,21 +396,21 @@ void CRcvBufferNew::updateNonreadPos() // will be interrupted. for (int i = pos; i != last_pos; i = (i + 1) % m_szSize) { - if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD) + if (!m_entries[i].pUnit || m_entries[pos].status != EntryState_Avail) { // good = false; break; } // Likewise, boundary() & PB_LAST will be satisfied for last OR solo. - if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST) + if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) { m_iFirstNonreadPos = incPos(i); break; } } - if (pos == m_iFirstNonreadPos || !m_pUnit[m_iFirstNonreadPos]) + if (pos == m_iFirstNonreadPos || !m_entries[m_iFirstNonreadPos].pUnit) break; pos = m_iFirstNonreadPos; @@ -416,16 +419,16 @@ void CRcvBufferNew::updateNonreadPos() // 1. If there is a gap between this packet and m_iLastReadablePos // then no sense to update m_iLastReadablePos. - // 2. The simplest case is when this is the first sequntial packet + // 2. The simplest case is when this is the first sequential packet } int CRcvBufferNew::findLastMessagePkt() { for (int i = m_iStartPos; i != m_iFirstNonreadPos; i = incPos(i)) { - SRT_ASSERT(m_pUnit[i]); + SRT_ASSERT(m_entries[i].pUnit); - if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST) + if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) { return i; } @@ -440,7 +443,7 @@ void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos) if (m_numOutOfOrderPackets == 0) return; - // If the following condition is true, there is alreadt a packet, + // If the following condition is true, there is already a packet, // that can be read out of order. We don't need to search for // another one. The search should be done when that packet is read out from the buffer. // @@ -452,8 +455,8 @@ void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos) // Just a sanity check. This function is called when a new packet is added. // So the should be unacknowledged packets. SRT_ASSERT(m_iMaxPosInc > 0); - SRT_ASSERT(m_pUnit[insertPos]); - const CPacket& pkt = m_pUnit[insertPos]->m_Packet; + SRT_ASSERT(m_entries[insertPos].pUnit); + const CPacket& pkt = m_entries[insertPos].pUnit->m_Packet; const PacketBoundary boundary = pkt.getMsgBoundary(); //if ((boundary & PB_FIRST) && (boundary & PB_LAST)) @@ -500,13 +503,13 @@ void CRcvBufferNew::updateFirstReadableOutOfOrder() for (int pos = m_iStartPos; outOfOrderPktsRemain; pos = incPos(pos)) { - if (!m_pUnit[pos]) + if (!m_entries[pos].pUnit) { posFirst = posLast = msgNo = -1; continue; } - const CPacket& pkt = m_pUnit[pos]->m_Packet; + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; if (pkt.getMsgOrderFlag()) // Skip in order packet { @@ -554,11 +557,10 @@ int CRcvBufferNew::scanNotInOrderMessageRight(const int startPos, int msgNo) con do { pos = incPos(pos); - - if (!m_pUnit[pos]) + if (!m_entries[pos].pUnit) break; - const CPacket& pkt = m_pUnit[pos]->m_Packet; + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { @@ -586,10 +588,10 @@ int CRcvBufferNew::scanNotInOrderMessageLeft(const int startPos, int msgNo) cons { pos = decPos(pos); - if (!m_pUnit[pos]) + if (!m_entries[pos].pUnit) return -1; - const CPacket& pkt = m_pUnit[pos]->m_Packet; + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { @@ -652,9 +654,9 @@ string CRcvBufferNew::strFullnessState(const time_point& tsNow) const if (m_tsbpd.isEnabled()) { ss << " (TSBPD ready in "; - if (m_pUnit[m_iStartPos]) + if (m_entries[m_iStartPos].pUnit) { - const uint32_t usPktTimestamp = m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp(); + const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp(); ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow); } else @@ -663,10 +665,10 @@ string CRcvBufferNew::strFullnessState(const time_point& tsNow) const } const int iLastPos = incPos(m_iStartPos, m_iMaxPosInc); - if (m_pUnit[iLastPos]) + if (m_entries[iLastPos].pUnit) { ss << ":"; - const uint32_t usPktTimestamp = m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp(); + const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp(); ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow); ss << " ms"; } diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 5e716ef6f2..ed8dfb0b4f 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -11,6 +11,7 @@ #ifndef INC_SRT_BUFFER_RCV_H #define INC_SRT_BUFFER_RCV_H +#define ENABLE_NEW_RCVBUFFER 1 #if ENABLE_NEW_RCVBUFFER #include "buffer.h" // AvgBufSize @@ -196,6 +197,7 @@ class CRcvBufferNew { // m_pUnitQueue->makeUnitFree(m_entries[i].pUnit); } + m_pUnit = pUnit; } private: CUnit* m_pUnit; @@ -210,16 +212,22 @@ class CRcvBufferNew }; struct Entry { + Entry() + : pUnit(NULL) + , status(EntryState_Empty) + {} + CUnit* pUnit; EntryStatus status; }; - static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } + //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } - const Entry* m_entries; + FixedArray m_entries; + //const Entry* m_entries; // TODO: maybe use std::vector? - CUnit** m_pUnit; // pointer to the array of units (buffer) + //CUnit** m_pUnit; // pointer to the array of units (buffer) const size_t m_szSize; // size of the array of units (buffer) CUnitQueue* m_pUnitQueue; // the shared unit queue diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 841f209b44..d64cd56315 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6844,7 +6844,9 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ return res; } +#if !ENABLE_NEW_RCVBUFFER const int seqdistance = -1; +#endif if (!m_config.bSynRecving) { @@ -8819,13 +8821,13 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt) void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) { { - const bool using_rexmit_flag = m_bPeerRexmitFlag; UniqueLock rlock(m_RecvLock); #if ENABLE_NEW_RCVBUFFER // TODO // If message number is present, try to drop the whole message. // If message number is 0, then drop by sequence range. #else + const bool using_rexmit_flag = m_bPeerRexmitFlag; m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag); #endif // When the drop request was received, it means that there are diff --git a/srtcore/utilities.h b/srtcore/utilities.h index ed96306f56..3fd593131b 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -440,6 +440,70 @@ struct DynamicStruct }; +/// Fixed-size array template class. +namespace srt { + +template +class FixedArray +{ +public: + FixedArray(size_t size) + : m_size(size) + , m_entries(new T[size]) + { + } + + ~FixedArray() + { + delete [] m_entries; + } + +public: + const T& operator[](size_t index) const + { + if (index >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + T& operator[](size_t index) + { + if (index >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + const T& operator[](int index) const + { + if (index < 0 || static_cast(index) >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + T& operator[](int index) + { + if (index < 0 || static_cast(index) >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + size_t size() const { return m_size; } + +private: + FixedArray(const FixedArray& ); + FixedArray& operator=(const FixedArray&); + +private: + size_t m_size; + T* const m_entries; +}; + +} // namespace srt + // ------------------------------------------------------------ diff --git a/test/test_rcvbuffer2_read.cpp b/test/test_rcvbuffer2_read.cpp index e663e0548c..1bcbaf667f 100644 --- a/test/test_rcvbuffer2_read.cpp +++ b/test/test_rcvbuffer2_read.cpp @@ -113,7 +113,7 @@ class TestRcvBuffer2Read : public ::testing::Test return true; } - void checkPacketPos(CUnit* unit) + void checkPacketPos(CUnit* unit SRT_ATR_UNUSED) { // TODO: check that a certain packet was placed into the right // position with right offset.