diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bebae3cd..77286fb7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -151,6 +151,7 @@ option(USE_OPENSSL_PC "Use pkg-config to find OpenSSL libraries" ON) option(USE_BUSY_WAITING "Enable more accurate sending times at a cost of potentially higher CPU load" OFF) option(USE_GNUSTL "Get c++ library/headers from the gnustl.pc" OFF) option(ENABLE_SOCK_CLOEXEC "Enable setting SOCK_CLOEXEC on a socket" ON) +option(ENABLE_NEW_RCVBUFFER "Enable new receiver buffer implementation" ON) option(ENABLE_CLANG_TSA "Enable Clang Thread Safety Analysis" OFF) @@ -204,7 +205,6 @@ if (NOT USE_ENCLIB) else() set (USE_ENCLIB openssl) endif() - endif() set(USE_ENCLIB "${USE_ENCLIB}" CACHE STRING "The crypto library that SRT uses") @@ -486,6 +486,14 @@ if (ENABLE_SOCK_CLOEXEC) add_definitions(-DENABLE_SOCK_CLOEXEC=1) endif() +if (ENABLE_NEW_RCVBUFFER) + add_definitions(-DENABLE_NEW_RCVBUFFER=1) + message(STATUS "RECEIVER_BUFFER: NEW") +else() + remove_definitions(-DENABLE_NEW_RCVBUFFER) + message(STATUS "RECEIVER_BUFFER: OLD") +endif() + if (CMAKE_MAJOR_VERSION LESS 3) set (FORCE_CXX_STANDARD_GNUONLY 1) endif() @@ -1291,6 +1299,7 @@ if (ENABLE_UNITTESTS AND ENABLE_CXX11) endif() MafReadDir(test filelist.maf + HEADERS SOURCES_unittests SOURCES SOURCES_unittests ) diff --git a/docs/build/build-options.md b/docs/build/build-options.md index d8c1d757a..3afce1cf3 100644 --- a/docs/build/build-options.md +++ b/docs/build/build-options.md @@ -192,6 +192,11 @@ will be removed when the problem is fixed globally. This option enables the standard C++ `thread` and `chrono` libraries (available since C++11) to be used by SRT instead of the `pthreads`. +**`--enable-new-rcvbuffer`** (default: ON) + +This option enables the new implementation of the receiver buffer with behavior and code improvements. +The new receiver buffer is to remain the only one. For the transition period there is a possibility to +fall back to the old one. **`--enable-profile`** (default: OFF) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 59399a4b3..8e18bd23e 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -151,8 +151,10 @@ void srt::CUDTSocket::setBrokenClosed() bool srt::CUDTSocket::readReady() { + // TODO: Use m_RcvBufferLock here (CUDT::isRcvReadReady())? if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady()) return true; + if (m_UDT.m_bListening) return !m_QueuedSockets.empty(); @@ -2622,7 +2624,11 @@ void srt::CUDTUnited::checkBrokenSockets() // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when // this function is called (isRcvDataReady also checks if the // available data is "ready to play"). +#if ENABLE_NEW_RCVBUFFER + && s->core().m_pRcvBuffer->hasAvailablePackets()) +#else && s->core().m_pRcvBuffer->isRcvDataAvailable()) +#endif { const int bc = s->core().m_iBrokenCounter.load(); if (bc > 0) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 794066702..e861654eb 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -792,6 +792,8 @@ void CSndBuffer::increase() //////////////////////////////////////////////////////////////////////////////// +#if (!ENABLE_NEW_RCVBUFFER) + /* * RcvBuffer (circular buffer): * @@ -2289,4 +2291,6 @@ bool CRcvBuffer::scanMsg(int& w_p, int& w_q, bool& w_passack) return found; } +#endif // !ENABLE_NEW_RCVBUFFER + } // namespace srt diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 8756a49d1..428db748d 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -272,6 +272,8 @@ class CSndBuffer //////////////////////////////////////////////////////////////////////////////// +#if (!ENABLE_NEW_RCVBUFFER) + class CRcvBuffer { typedef sync::steady_clock::time_point time_point; @@ -562,6 +564,8 @@ class CRcvBuffer CRcvBuffer& operator=(const CRcvBuffer&); }; +#endif // !ENABLE_NEW_RCVBUFFER + } // namespace srt #endif diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp new file mode 100644 index 000000000..12e935057 --- /dev/null +++ b/srtcore/buffer_rcv.cpp @@ -0,0 +1,969 @@ +#if ENABLE_NEW_RCVBUFFER +#include +#include "buffer_rcv.h" +#include "logging.h" + +using namespace std; + +using namespace srt::sync; +using namespace srt_logging; +namespace srt_logging +{ + extern Logger brlog; +} +#define rbuflog brlog + +namespace srt { + +namespace { + struct ScopedLog + { + ScopedLog() {}; + + ~ScopedLog() + { + LOGC(rbuflog.Warn, log << ss.str()); + } + + stringstream ss; + }; + +#define IF_RCVBUF_DEBUG(instr) (void)0 + + // Check if iFirstNonreadPos is in range [iStartPos, (iStartPos + iMaxPosInc) % iSize]. + // The right edge is included because we expect iFirstNonreadPos to be + // right after the last valid packet position if all packets are available. + bool isInRange(int iStartPos, int iMaxPosInc, size_t iSize, int iFirstNonreadPos) + { + if (iFirstNonreadPos == iStartPos) + return true; + + const int iLastPos = (iStartPos + iMaxPosInc) % iSize; + const bool isOverrun = iLastPos < iStartPos; + + if (isOverrun) + return iFirstNonreadPos > iStartPos || iFirstNonreadPos <= iLastPos; + + return iFirstNonreadPos > iStartPos && iFirstNonreadPos <= iLastPos; + } +} + + +/* + * RcvBufferNew (circular buffer): + * + * |<------------------- m_iSize ----------------------------->| + * | |<----------- m_iMaxPosInc ------------>| | + * | | | | + * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ + * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] + * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ + * | | + * | |__last pkt received + * |___ m_iStartPos: first message to read + * + * m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped + * + * thread safety: + * m_iStartPos: CUDT::m_RecvLock + * m_iLastAckPos: CUDT::m_AckLock + * m_iMaxPosInc: none? (modified on add and ack + */ + +CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit, bool bMessageAPI) + : m_entries(size) + , m_szSize(size) // TODO: maybe just use m_entries.size() + , m_pUnitQueue(unitqueue) + , m_iStartSeqNo(initSeqNo) + , m_iStartPos(0) + , m_iFirstNonreadPos(0) + , m_iMaxPosInc(0) + , m_iNotch(0) + , m_numOutOfOrderPackets(0) + , m_iFirstReadableOutOfOrder(-1) + , m_bPeerRexmitFlag(peerRexmit) + , m_bMessageAPI(bMessageAPI) + , m_iBytesCount(0) + , m_iPktsCount(0) + , m_uAvgPayloadSz(SRT_LIVE_DEF_PLSIZE) +{ + SRT_ASSERT(size < INT_MAX); // All position pointers are integers +} + +CRcvBufferNew::~CRcvBufferNew() +{ + for (size_t i = 0; i < m_szSize; ++i) + { + CUnit* unit = m_entries[i].pUnit; + if (unit != NULL) + { + m_pUnitQueue->makeUnitFree(unit); + m_entries[i].pUnit = NULL; + } + } +} + +int CRcvBufferNew::insert(CUnit* unit) +{ + SRT_ASSERT(unit != NULL); + const int32_t seqno = unit->m_Packet.getSeqNo(); + const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno); + + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::insert: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset); + + if (offset < 0) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -2"); + return -2; + } + + if (offset >= (int)capacity()) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -3"); + return -3; + } + + // TODO: Don't do assert here. Process this situation somehow. + // If >= 2, then probably there is a long gap, and buffer needs to be reset. + SRT_ASSERT((m_iStartPos + offset) / m_szSize < 2); + + const int pos = (m_iStartPos + offset) % m_szSize; + if (offset >= m_iMaxPosInc) + m_iMaxPosInc = offset + 1; + + // Packet already exists + SRT_ASSERT(pos >= 0 && pos < m_szSize); + if (m_entries[pos].status != EntryState_Empty) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -1"); + return -1; + } + SRT_ASSERT(m_entries[pos].pUnit == NULL); + + m_pUnitQueue->makeUnitGood(unit); + m_entries[pos].pUnit = unit; + m_entries[pos].status = EntryState_Avail; + countBytes(1, (int)unit->m_Packet.getLength()); + + // If packet "in order" flag is zero, it can be read out of order. + // With TSBPD enabled packets are always assumed in order (the flag is ignored). + if (!m_tsbpd.isEnabled() && m_bMessageAPI && !unit->m_Packet.getMsgOrderFlag()) + { + ++m_numOutOfOrderPackets; + onInsertNotInOrderPacket(pos); + } + + updateNonreadPos(); + IF_RCVBUF_DEBUG(scoped_log.ss << " returns 0 (OK)"); + return 0; +} + +void CRcvBufferNew::dropUpTo(int32_t seqno) +{ + // Can drop only when nothing to read, and + // first unacknowledged packet is missing. + SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos); + + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); + + int len = CSeqNo::seqoff(m_iStartSeqNo, seqno); + SRT_ASSERT(len > 0); + if (len <= 0) + { + IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); + return; + } + + /*LOGC(rbuflog.Warn, log << "CRcvBufferNew.dropUpTo(): seqno=" << seqno << ", pkts=" << len + << ". Buffer start " << m_iStartSeqNo << ".");*/ + + m_iMaxPosInc -= len; + if (m_iMaxPosInc < 0) + m_iMaxPosInc = 0; + + // Check that all packets being dropped are missing. + while (len > 0) + { + if (m_entries[m_iStartPos].pUnit != NULL) + { + releaseUnitInPos(m_iStartPos); + } + + if (m_entries[m_iStartPos].status != EntryState_Empty) + { + SRT_ASSERT(m_entries[m_iStartPos].status == EntryState_Drop || m_entries[m_iStartPos].status == EntryState_Read); + m_entries[m_iStartPos].status = EntryState_Empty; + } + + SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty); + m_iStartPos = incPos(m_iStartPos); + --len; + } + + // Update positions + m_iStartSeqNo = seqno; + // Move forward if there are "read" entries. + releaseNextFillerEntries(); + // Set nonread position to the starting position before updating, + // because start position was increased, and preceeding packets are invalid. + m_iFirstNonreadPos = m_iStartPos; + updateNonreadPos(); +} + +void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) +{ + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo); + // TODO: count bytes as removed? + const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); + if (msgno != 0) + { + for (int i = m_iStartPos; i != end_pos; i = incPos(i)) + { + // TODO: Maybe check status? + if (!m_entries[i].pUnit) + continue; + + const int32_t msgseq = m_entries[i].pUnit->m_Packet.getMsgSeq(m_bPeerRexmitFlag); + if (msgseq == msgno) + { + releaseUnitInPos(i); + m_entries[i].status = EntryState_Drop; + } + } + + return; + } + + // Drop by packet seqno range. + const int offset_a = CSeqNo::seqoff(m_iStartSeqNo, seqnolo); + const int offset_b = CSeqNo::seqoff(m_iStartSeqNo, seqnohi); + if (offset_b < 0) + { + LOGC(rbuflog.Warn, log << "CRcvBufferNew.dropMessage(): nothing to drop. Requested [" << seqnolo << "; " + << seqnohi << "]. Buffer start " << m_iStartSeqNo << "."); + return; + } + + const int start_off = max(0, offset_a); + const int last_pos = incPos(m_iStartPos, offset_b); + for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != last_pos; i = incPos(i)) + { + if (m_entries[i].pUnit) + { + releaseUnitInPos(i); + } + m_entries[i].status = EntryState_Drop; + } + + LOGC(rbuflog.Debug, log << "CRcvBufferNew.dropMessage(): [" << seqnolo << "; " + << seqnohi << "]."); +} + +int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) +{ + const bool canReadInOrder = hasReadableInorderPkts(); + if (!canReadInOrder && m_iFirstReadableOutOfOrder < 0) + { + LOGC(rbuflog.Warn, log << "CRcvBufferNew.readMessage(): nothing to read. Ignored isRcvDataReady() result?"); + return 0; + } + + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo); + + const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder; + // Remember if we actually read out of order packet. + const bool readingOutOfOrderPacket = !canReadInOrder || m_iStartPos == m_iFirstReadableOutOfOrder; + + size_t remain = len; + char* dst = data; + int pkts_read = 0; + int bytes_extracted = 0; // The total number of bytes extracted from the buffer. + const bool updateStartPos = (readPos == m_iStartPos); // Indicates if the m_iStartPos can be changed + for (int i = readPos;; i = incPos(i)) + { + SRT_ASSERT(m_entries[i].pUnit); + if (!m_entries[i].pUnit) + { + LOGC(rbuflog.Error, log << "CRcvBufferNew::readMessage(): null packet encountered."); + break; + } + + const CPacket& packet = m_entries[i].pUnit->m_Packet; + const size_t pktsize = packet.getLength(); + const int32_t pktseqno = packet.getSeqNo(); + + // unitsize can be zero + const size_t unitsize = std::min(remain, pktsize); + memcpy(dst, packet.m_pcData, unitsize); + remain -= unitsize; + dst += unitsize; + + ++pkts_read; + bytes_extracted += (int) pktsize; + + if (m_tsbpd.isEnabled()) + updateTsbPdTimeBase(packet.getMsgTimeStamp()); + + if (m_numOutOfOrderPackets && !packet.getMsgOrderFlag()) + --m_numOutOfOrderPackets; + + const bool pbLast = packet.getMsgBoundary() & PB_LAST; + if (msgctrl && (packet.getMsgBoundary() & PB_FIRST)) + { + msgctrl->pktseq = pktseqno; + msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag); + } + if (msgctrl && pbLast) + { + msgctrl->srctime = count_microseconds(getPktTsbPdTime(packet.getMsgTimeStamp()).time_since_epoch()); + } + + releaseUnitInPos(i); + if (updateStartPos) + { + m_iStartPos = incPos(i); + --m_iMaxPosInc; + SRT_ASSERT(m_iMaxPosInc >= 0); + m_iStartSeqNo = CSeqNo::incseq(pktseqno); + } + else + { + // If out of order, only mark it read. + m_entries[i].status = EntryState_Read; + } + + if (pbLast) + { + if (readPos == m_iFirstReadableOutOfOrder) + m_iFirstReadableOutOfOrder = -1; + break; + } + } + + countBytes(-pkts_read, -bytes_extracted); + if (!m_tsbpd.isEnabled() && readingOutOfOrderPacket) + updateFirstReadableOutOfOrder(); + + releaseNextFillerEntries(); + + if (!isInRange(m_iStartPos, m_iMaxPosInc, m_szSize, m_iFirstNonreadPos)) + { + m_iFirstNonreadPos = m_iStartPos; + //updateNonreadPos(); + } + + const int bytes_read = dst - data; + if (bytes_read < bytes_extracted) + { + LOGC(rbuflog.Error, log << "readMessage: small dst buffer, copied only " << bytes_read << "/" << bytes_extracted << " bytes."); + } + + return bytes_read; +} + +namespace { + /// @brief Writes bytes to file stream. + /// @param data pointer to data to write. + /// @param len the number of bytes to write + /// @param arg a void pointer to the fstream to write to. + /// @return true on success, false on failure + bool writeBytesToFile(char* data, int len, void* arg) + { + fstream* pofs = reinterpret_cast(arg); + pofs->write(data, len); + return !pofs->fail(); + } + + /// @brief Copies bytes to the destination buffer. + /// @param data pointer to data to copy. + /// @param len the number of bytes to copy + /// @param arg A pointer to the destination buffer + /// @return true on success, false on failure + bool copyBytesToBuf(char* data, int len, void* arg) + { + char* dst = reinterpret_cast(arg); + memcpy(dst, data, len); + return true; + } +} + +int CRcvBufferNew::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) +{ + int p = m_iStartPos; + const int end_pos = m_iFirstNonreadPos; + + const bool bTsbPdEnabled = m_tsbpd.isEnabled(); + const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point()); + + int rs = len; + while ((p != end_pos) && (rs > 0)) + { + if (!m_entries[p].pUnit) + { + p = incPos(p); + LOGC(rbuflog.Error, log << "readBufferTo: IPE: NULL unit found in file transmission"); + return -1; + } + + const srt::CPacket& pkt = m_entries[p].pUnit->m_Packet; + + if (bTsbPdEnabled) + { + const steady_clock::time_point tsPlay = getPktTsbPdTime(pkt.getMsgTimeStamp()); + HLOGC(rbuflog.Debug, + log << "readBuffer: check if time to play:" + << " NOW=" << FormatTime(now) + << " PKT TS=" << FormatTime(tsPlay)); + + if ((tsPlay > now)) + break; /* too early for this unit, return whatever was copied */ + } + + const int pktlen = (int)pkt.getLength(); + const int remain_pktlen = pktlen - m_iNotch; + const int unitsize = std::min(remain_pktlen, rs); + + if (!funcCopyToDst(pkt.m_pcData + m_iNotch, unitsize, arg)) + break; + + if (rs >= remain_pktlen) + { + releaseUnitInPos(p); + p = incPos(p); + m_iNotch = 0; + + m_iStartPos = p; + --m_iMaxPosInc; + SRT_ASSERT(m_iMaxPosInc >= 0); + m_iStartSeqNo = CSeqNo::incseq(m_iStartSeqNo); + } + else + m_iNotch += rs; + + rs -= unitsize; + } + + const int iBytesRead = len - rs; + /* we removed acked bytes form receive buffer */ + countBytes(-1, -iBytesRead); + + // Update positions + // Set nonread position to the starting position before updating, + // because start position was increased, and preceeding packets are invalid. + if (!isInRange(m_iStartPos, m_iMaxPosInc, m_szSize, m_iFirstNonreadPos)) + { + m_iFirstNonreadPos = m_iStartPos; + } + + if (iBytesRead == 0) + { + LOGC(rbuflog.Error, log << "readBufferTo: 0 bytes read. m_iStartPos=" << m_iStartPos << ", m_iFirstNonreadPos=" << m_iFirstNonreadPos); + } + + return iBytesRead; +} + +int CRcvBufferNew::readBuffer(char* dst, int len) +{ + return readBufferTo(len, copyBytesToBuf, reinterpret_cast(dst)); +} + +int CRcvBufferNew::readBufferToFile(fstream& ofs, int len) +{ + return readBufferTo(len, writeBytesToFile, reinterpret_cast(&ofs)); +} + +bool CRcvBufferNew::hasAvailablePackets() const +{ + return hasReadableInorderPkts() || (m_numOutOfOrderPackets > 0 && m_iFirstReadableOutOfOrder != -1); +} + +int CRcvBufferNew::getRcvDataSize() const +{ + if (m_iFirstNonreadPos >= m_iStartPos) + return m_iFirstNonreadPos - m_iStartPos; + + return m_szSize + m_iFirstNonreadPos - m_iStartPos; +} + +int CRcvBufferNew::getTimespan_ms() const +{ + if (!m_tsbpd.isEnabled()) + return 0; + + if (m_iMaxPosInc == 0) + return 0; + + const int lastpos = incPos(m_iStartPos, m_iMaxPosInc - 1); + int startpos = m_iStartPos; + + while (m_entries[startpos].pUnit == NULL) + { + if (startpos == lastpos) + break; + + ++startpos; + } + + if (m_entries[startpos].pUnit == NULL) + return 0; + + // Should not happen + SRT_ASSERT(m_entries[lastpos].pUnit != NULL); + if (m_entries[lastpos].pUnit == NULL) + return 0; + + const steady_clock::time_point startstamp = + getPktTsbPdTime(m_entries[startpos].pUnit->m_Packet.getMsgTimeStamp()); + const steady_clock::time_point endstamp = getPktTsbPdTime(m_entries[lastpos].pUnit->m_Packet.getMsgTimeStamp()); + if (endstamp < startstamp) + return 0; + + // One millisecond is added as a duration of a packet in the buffer. + // If there is only one packet in the buffer, one millisecond is returned. + return count_milliseconds(endstamp - startstamp) + 1; +} + +int CRcvBufferNew::getRcvDataSize(int& bytes, int& timespan) const +{ + ScopedLock lck(m_BytesCountLock); + bytes = m_iBytesCount; + timespan = getTimespan_ms(); + return m_iPktsCount; +} + +CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstValidPacketInfo() const +{ + const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); + for (int i = m_iStartPos; i != end_pos; i = incPos(i)) + { + // TODO: Maybe check status? + if (!m_entries[i].pUnit) + continue; + + const CPacket& packet = m_entries[i].pUnit->m_Packet; + const PacketInfo info = { packet.getSeqNo(), i != m_iStartPos, getPktTsbPdTime(packet.getMsgTimeStamp()) }; + return info; + } + + const PacketInfo info = { -1, false, time_point() }; + return info; +} + +std::pair CRcvBufferNew::getAvailablePacketsRange() const +{ + const int seqno_last = CSeqNo::incseq(m_iStartSeqNo, countReadable()); + return std::pair(m_iStartSeqNo, seqno_last); +} + +size_t CRcvBufferNew::countReadable() const +{ + if (m_iFirstNonreadPos >= m_iStartPos) + return m_iFirstNonreadPos - m_iStartPos; + return m_szSize + m_iFirstNonreadPos - m_iStartPos; +} + +bool CRcvBufferNew::isRcvDataReady(time_point time_now) const +{ + const bool haveInorderPackets = hasReadableInorderPkts(); + if (!m_tsbpd.isEnabled()) + { + if (haveInorderPackets) + return true; + + SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI); + return (m_numOutOfOrderPackets > 0 && m_iFirstReadableOutOfOrder != -1); + } + + if (!haveInorderPackets) + return false; + + const PacketInfo info = getFirstValidPacketInfo(); + + return info.tsbpd_time <= time_now; +} + +void CRcvBufferNew::countBytes(int pkts, int bytes) +{ + ScopedLock lock(m_BytesCountLock); + m_iBytesCount += bytes; // added or removed bytes from rcv buffer + m_iPktsCount += pkts; + if (bytes > 0) // Assuming one pkt when adding bytes + m_uAvgPayloadSz = avg_iir<100>(m_uAvgPayloadSz, (unsigned) bytes); +} + +void CRcvBufferNew::releaseUnitInPos(int pos) +{ + CUnit* tmp = m_entries[pos].pUnit; + m_entries[pos] = Entry(); // pUnit = NULL; status = Empty + if (tmp != NULL) + m_pUnitQueue->makeUnitFree(tmp); +} + +void CRcvBufferNew::releaseNextFillerEntries() +{ + int pos = m_iStartPos; + while (m_entries[pos].status == EntryState_Read || m_entries[pos].status == EntryState_Drop) + { + m_iStartSeqNo = CSeqNo::incseq(m_iStartSeqNo); + releaseUnitInPos(pos); + pos = incPos(pos); + m_iStartPos = pos; + } +} + +// TODO: Is this function complete? There are some comments left inside. +void CRcvBufferNew::updateNonreadPos() +{ + if (m_iMaxPosInc == 0) + return; + + // const PacketBoundary boundary = packet.getMsgBoundary(); + + //// The simplest case is when inserting a sequential PB_SOLO packet. + // if (boundary == PB_SOLO && (m_iFirstNonreadPos + 1) % m_szSize == pos) + //{ + // m_iFirstNonreadPos = pos; + // return; + //} + const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); // The empty position right after the last valid entry. + + int pos = m_iFirstNonreadPos; + while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST)) + { + // bool good = true; + + // look ahead for the whole message + + // We expect to see either of: + // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] + // [PB_SOLO] + // but not: + // [PB_FIRST] NULL ... + // [PB_FIRST] FREE/PASSACK/DROPPED... + // If the message didn't look as expected, interrupt this. + + // This begins with a message starting at m_iStartPos + // up to end_pos (excluding) OR until the PB_LAST message is found. + // If any of the units on this way isn't good, this OUTER loop + // will be interrupted. + for (int i = pos; i != end_pos; i = (i + 1) % m_szSize) + { + if (!m_entries[i].pUnit || m_entries[pos].status != EntryState_Avail) + { + // good = false; + break; + } + + // Likewise, boundary() & PB_LAST will be satisfied for last OR solo. + if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) + { + m_iFirstNonreadPos = incPos(i); + break; + } + } + + if (pos == m_iFirstNonreadPos || !m_entries[m_iFirstNonreadPos].pUnit) + break; + + pos = m_iFirstNonreadPos; + } + + // 1. If there is a gap between this packet and m_iLastReadablePos + // then no sense to update m_iLastReadablePos. + + // 2. The simplest case is when this is the first sequential packet +} + +int CRcvBufferNew::findLastMessagePkt() +{ + for (int i = m_iStartPos; i != m_iFirstNonreadPos; i = incPos(i)) + { + SRT_ASSERT(m_entries[i].pUnit); + + if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) + { + return i; + } + } + + return -1; +} + +void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos) +{ + if (m_numOutOfOrderPackets == 0) + return; + + // If the following condition is true, there is already a packet, + // that can be read out of order. We don't need to search for + // another one. The search should be done when that packet is read out from the buffer. + // + // There might happen that the packet being added precedes the previously found one. + // However, it is allowed to re bead out of order, so no need to update the position. + if (m_iFirstReadableOutOfOrder >= 0) + return; + + // Just a sanity check. This function is called when a new packet is added. + // So the should be unacknowledged packets. + SRT_ASSERT(m_iMaxPosInc > 0); + SRT_ASSERT(m_entries[insertPos].pUnit); + const CPacket& pkt = m_entries[insertPos].pUnit->m_Packet; + const PacketBoundary boundary = pkt.getMsgBoundary(); + + //if ((boundary & PB_FIRST) && (boundary & PB_LAST)) + //{ + // // This packet can be read out of order + // m_iFirstReadableOutOfOrder = insertPos; + // return; + //} + + const int msgNo = pkt.getMsgSeq(m_bPeerRexmitFlag); + // First check last packet, because it is expected to be received last. + const bool hasLast = (boundary & PB_LAST) || (-1 < scanNotInOrderMessageRight(insertPos, msgNo)); + if (!hasLast) + return; + + const int firstPktPos = (boundary & PB_FIRST) + ? insertPos + : scanNotInOrderMessageLeft(insertPos, msgNo); + if (firstPktPos < 0) + return; + + m_iFirstReadableOutOfOrder = firstPktPos; + return; +} + +void CRcvBufferNew::updateFirstReadableOutOfOrder() +{ + if (hasReadableInorderPkts() || m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder >= 0) + return; + + if (m_iMaxPosInc == 0) + return; + + // TODO: unused variable outOfOrderPktsRemain? + int outOfOrderPktsRemain = m_numOutOfOrderPackets; + + // Search further packets to the right. + // First check if there are packets to the right. + const int lastPos = (m_iStartPos + m_iMaxPosInc - 1) % m_szSize; + + int posFirst = -1; + int posLast = -1; + int msgNo = -1; + + for (int pos = m_iStartPos; outOfOrderPktsRemain; pos = incPos(pos)) + { + if (!m_entries[pos].pUnit) + { + posFirst = posLast = msgNo = -1; + continue; + } + + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + + if (pkt.getMsgOrderFlag()) // Skip in order packet + { + posFirst = posLast = msgNo = -1; + continue; + } + + --outOfOrderPktsRemain; + + const PacketBoundary boundary = pkt.getMsgBoundary(); + if (boundary & PB_FIRST) + { + posFirst = pos; + msgNo = pkt.getMsgSeq(m_bPeerRexmitFlag); + } + + if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) + { + posFirst = posLast = msgNo = -1; + continue; + } + + if (boundary & PB_LAST) + { + m_iFirstReadableOutOfOrder = posFirst; + return; + } + + if (pos == lastPos) + break; + } + + return; +} + +int CRcvBufferNew::scanNotInOrderMessageRight(const int startPos, int msgNo) const +{ + // Search further packets to the right. + // First check if there are packets to the right. + const int lastPos = (m_iStartPos + m_iMaxPosInc - 1) % m_szSize; + if (startPos == lastPos) + return -1; + + int pos = startPos; + do + { + pos = incPos(pos); + if (!m_entries[pos].pUnit) + break; + + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + + if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) + { + LOGC(rbuflog.Error, log << "Missing PB_LAST packet for msgNo " << msgNo); + return -1; + } + + const PacketBoundary boundary = pkt.getMsgBoundary(); + if (boundary & PB_LAST) + return pos; + } while (pos != lastPos); + + return -1; +} + +int CRcvBufferNew::scanNotInOrderMessageLeft(const int startPos, int msgNo) const +{ + // Search preceeding packets to the left. + // First check if there are packets to the left. + if (startPos == m_iStartPos) + return -1; + + int pos = startPos; + do + { + pos = decPos(pos); + + if (!m_entries[pos].pUnit) + return -1; + + const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + + if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) + { + LOGC(rbuflog.Error, log << "Missing PB_FIRST packet for msgNo " << msgNo); + return -1; + } + + const PacketBoundary boundary = pkt.getMsgBoundary(); + if (boundary & PB_FIRST) + return pos; + } while (pos != m_iStartPos); + + return -1; +} + +bool CRcvBufferNew::addRcvTsbPdDriftSample(uint32_t usTimestamp, int usRTTSample) +{ + return m_tsbpd.addDriftSample(usTimestamp, usRTTSample); +} + +void CRcvBufferNew::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay) +{ + m_tsbpd.setTsbPdMode(timebase, wrap, delay); +} + +void CRcvBufferNew::applyGroupTime(const steady_clock::time_point& timebase, + bool wrp, + uint32_t delay, + const steady_clock::duration& udrift) +{ + m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift); +} + +void CRcvBufferNew::applyGroupDrift(const steady_clock::time_point& timebase, + bool wrp, + const steady_clock::duration& udrift) +{ + m_tsbpd.applyGroupDrift(timebase, wrp, udrift); +} + +CRcvBufferNew::time_point CRcvBufferNew::getTsbPdTimeBase(uint32_t usPktTimestamp) const +{ + return m_tsbpd.getTsbPdTimeBase(usPktTimestamp); +} + +void CRcvBufferNew::updateTsbPdTimeBase(uint32_t usPktTimestamp) +{ + m_tsbpd.updateTsbPdTimeBase(usPktTimestamp); +} + +string CRcvBufferNew::strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const +{ + stringstream ss; + + ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize; + ss << " pkts. "; + if (m_tsbpd.isEnabled() && m_iMaxPosInc > 0) + { + ss << " (TSBPD ready in "; + if (m_entries[m_iStartPos].pUnit) + { + const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp(); + ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow); + } + else + { + ss << "n/a"; + } + + const int iLastPos = incPos(m_iStartPos, m_iMaxPosInc - 1); + if (m_entries[iLastPos].pUnit) + { + ss << ":"; + const uint32_t usPktTimestamp = m_entries[m_iStartPos].pUnit->m_Packet.getMsgTimeStamp(); + ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - tsNow); + ss << " ms"; + } + else + { + ss << ":n/a ms"; + } + } + + ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms."; + return ss.str(); +} + +CRcvBufferNew::time_point CRcvBufferNew::getPktTsbPdTime(uint32_t usPktTimestamp) const +{ + return m_tsbpd.getPktTsbPdTime(usPktTimestamp); +} + +/* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */ +int CRcvBufferNew::getRcvAvgDataSize(int& bytes, int& timespan) +{ + // Average number of packets and timespan could be small, + // so rounding is beneficial, while for the number of + // bytes in the buffer is a higher value, so rounding can be omitted, + // but probably better to round all three values. + timespan = static_cast(round((m_mavg.timespan_ms()))); + bytes = static_cast(round((m_mavg.bytes()))); + return static_cast(round(m_mavg.pkts())); +} + +/* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */ +void CRcvBufferNew::updRcvAvgDataSize(const steady_clock::time_point& now) +{ + if (!m_mavg.isTimeToUpdate(now)) + return; + + int bytes = 0; + int timespan_ms = 0; + const int pkts = getRcvDataSize(bytes, timespan_ms); + m_mavg.update(now, pkts, bytes, timespan_ms); +} + +} // namespace srt + +#endif // ENABLE_NEW_RCVBUFFER diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h new file mode 100644 index 000000000..dfc63d3f6 --- /dev/null +++ b/srtcore/buffer_rcv.h @@ -0,0 +1,343 @@ +/* + * SRT - Secure, Reliable, Transport + * Copyright (c) 2020 Haivision Systems Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + */ + +#ifndef INC_SRT_BUFFER_RCV_H +#define INC_SRT_BUFFER_RCV_H + +#if ENABLE_NEW_RCVBUFFER + +#include "buffer.h" // AvgBufSize +#include "common.h" +#include "queue.h" +#include "sync.h" +#include "tsbpd_time.h" + +namespace srt +{ + +/* + * Circular receiver buffer. + * + * |<------------------- m_szSize ---------------------------->| + * | |<------------ m_iMaxPosInc ----------->| | + * | | | | + * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ + * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] + * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ + * | | + * | \__last pkt received + * | + * \___ m_iStartPos: first message to read + * + * m_pUnit[i]->status_: 0: free, 1: good, 2: read, 3: dropped (can be combined with read?) + * + * thread safety: + * start_pos_: CUDT::m_RecvLock + * first_unack_pos_: CUDT::m_AckLock + * max_pos_inc_: none? (modified on add and ack + * first_nonread_pos_: + */ + +class CRcvBufferNew +{ + typedef sync::steady_clock::time_point time_point; + typedef sync::steady_clock::duration duration; + +public: + CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit, bool bMessageAPI); + + ~CRcvBufferNew(); + +public: + /// Insert a unit into the buffer. + /// Similar to CRcvBuffer::addData(CUnit* unit, int offset) + /// + /// @param [in] unit pointer to a data unit containing new packet + /// @param [in] offset offset from last ACK point. + /// + /// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo. + /// -3 if a packet is offset is ahead the buffer capacity. + // TODO: Previously '-2' also meant 'already acknowledged'. Check usage of this value. + int insert(CUnit* unit); + + /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno). + /// @param [in] seqno drop units up to this sequence number + /// + void dropUpTo(int32_t seqno); + + /// @brief Drop the whole message from the buffer. + /// If message number is 0, then use sequence numbers to locate sequence range to drop [seqnolo, seqnohi]. + /// When one packet of the message is in the range of dropping, the whole message is to be dropped. + /// @param seqnolo sequence number of the first packet in the dropping range. + /// @param seqnohi sequence number of the last packet in the dropping range. + /// @param msgno message number to drop (0 if unknown) + void dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno); + + /// Read the whole message from one or several packets. + /// + /// @param [in,out] data buffer to write the message into. + /// @param [in] len size of the buffer. + /// @param [in,out] message control data + /// + /// @return actual number of bytes extracted from the buffer. + /// 0 if nothing to read. + /// -1 on failure. + int readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl = NULL); + + /// Read acknowledged data into a user buffer. + /// @param [in, out] dst pointer to the target user buffer. + /// @param [in] len length of user buffer. + /// @return size of data read. -1 on error. + int readBuffer(char* dst, int len); + + /// Read acknowledged data directly into file. + /// @param [in] ofs C++ file stream. + /// @param [in] len expected length of data to write into the file. + /// @return size of data read. -1 on error. + int readBufferToFile(std::fstream& ofs, int len); + +public: + /// Get the starting position of the buffer as a packet sequence number. + int getStartSeqNo() const { return m_iStartSeqNo; } + + /// Given the sequence number of the first unacknowledged packet + /// tells the size of the buffer available for packets. + /// Effective returns capacity of the buffer minus acknowledged packet still kept in it. + // TODO: Maybe does not need to return minus one slot now to distinguish full and empty buffer. + size_t getAvailSize(int iFirstUnackSeqNo) const + { + // Receiver buffer allows reading unacknowledged packets. + // Therefore if the first packet in the buffer is ahead of the iFirstUnackSeqNo + // then it does not have acknowledged packets and its full capacity is available. + // Otherwise subtract the number of acknowledged but not yet read packets from its capacity. + const int iRBufSeqNo = getStartSeqNo(); + if (CSeqNo::seqcmp(iRBufSeqNo, iFirstUnackSeqNo) >= 0) // iRBufSeqNo >= iFirstUnackSeqNo + { + // Full capacity is available, still don't want to encourage extra packets to come. + // Note: CSeqNo::seqlen(n, n) returns 1. + return capacity() - CSeqNo::seqlen(iFirstUnackSeqNo, iRBufSeqNo) + 1; + } + + // Note: CSeqNo::seqlen(n, n) returns 1. + return capacity() - CSeqNo::seqlen(iRBufSeqNo, iFirstUnackSeqNo) + 1; + } + + /// @brief Checks if the buffer has packets available for reading regardless of the TSBPD. + /// @return true if there are packets available for reading, false otherwise. + bool hasAvailablePackets() const; + + /// Query how many data has been continuously received (for reading) and available for reading out + /// regardless of the TSBPD. + /// TODO: Rename to countAvailablePackets(). + /// @return size of valid (continous) data for reading. + int getRcvDataSize() const; + + /// Get the number of packets, bytes and buffer timespan. + /// Differs from getRcvDataSize() that it counts all packets in the buffer, not only continious. + int getRcvDataSize(int& bytes, int& timespan) const; + + struct PacketInfo + { + int seqno; + bool seq_gap; //< true if there are missing packets in the buffer, preceding current packet + time_point tsbpd_time; + }; + + /// Get information on the 1st message in queue. + /// Similar to CRcvBuffer::getRcvFirstMsg + /// Parameters (of the 1st packet queue, ready to play or not): + /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if + /// none + /// @param [out] passack true if 1st ready packet is not yet acknowleged (allowed to be delivered to the app) + /// @param [out] skipseqno -1 or seq number of 1st unacknowledged pkt ready to play preceeded by missing packets. + /// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true + /// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE: + /// IF skipseqno != -1, packet ready to play preceeded by missing packets.; + /// IF skipseqno == -1, no missing packet but 1st not ready to play. + PacketInfo getFirstValidPacketInfo() const; + + /// Get information on the packets available to be read + /// @returns a pair of sequence numbers + std::pair getAvailablePacketsRange() const; + + size_t countReadable() const; + + bool empty() const + { + return (m_iMaxPosInc == 0); + } + + /// Return buffer capacity. + /// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer". + /// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously. + /// TODO: Old receiver buffer capacity returns the actual size. Check for conflicts. + size_t capacity() const + { + return m_szSize - 1; + } + + int64_t getDrift() const { return m_tsbpd.drift(); } + + // TODO: make thread safe? + int debugGetSize() const + { + return getRcvDataSize(); + } + + /// Zero time to include all available packets. + /// TODO: Rename to 'canRead`. + bool isRcvDataReady(time_point time_now = time_point()) const; + + int getRcvAvgDataSize(int& bytes, int& timespan); + void updRcvAvgDataSize(const time_point& now); + + unsigned getRcvAvgPayloadSize() const { return m_uAvgPayloadSz; } + + void getInternalTimeBase(time_point& w_timebase, bool& w_wrp, duration& w_udrift) + { + return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift); + } + +public: // Used for testing + /// Peek unit in position of seqno + const CUnit* peek(int32_t seqno); + +private: + inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; } + inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); } + +private: + void countBytes(int pkts, int bytes); + void updateNonreadPos(); + void releaseUnitInPos(int pos); + + /// Release entries following the current buffer position if they were already + /// read out of order (EntryState_Read) or dropped (EntryState_Drop). + void releaseNextFillerEntries(); + + bool hasReadableInorderPkts() const { return (m_iFirstNonreadPos != m_iStartPos); } + + /// Find position of the last packet of the message. + int findLastMessagePkt(); + + /// Scan for availability of out of order packets. + void onInsertNotInOrderPacket(int insertpos); + void updateFirstReadableOutOfOrder(); + int scanNotInOrderMessageRight(int startPos, int msgNo) const; + int scanNotInOrderMessageLeft(int startPos, int msgNo) const; + + typedef bool copy_to_dst_f(char* data, int len, void* arg); + + /// Read acknowledged data directly into file. + /// @param [in] ofs C++ file stream. + /// @param [in] len expected length of data to write into the file. + /// @return size of data read. + int readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg); + + /// @brief Estimate timespan of the stored packets (acknowledged and unacknowledged). + /// @return timespan in milliseconds + int getTimespan_ms() const; + +private: + // TODO: Call makeUnitGood upon assignment, and makeUnitFree upon clearing. + // TODO: CUnitPtr is not in use at the moment, but may be a smart pointer. + // class CUnitPtr + // { + // public: + // void operator=(CUnit* pUnit) + // { + // if (m_pUnit != NULL) + // { + // // m_pUnitQueue->makeUnitFree(m_entries[i].pUnit); + // } + // m_pUnit = pUnit; + // } + // private: + // CUnit* m_pUnit; + // }; + + enum EntryStatus + { + EntryState_Empty, //< No CUnit record. + EntryState_Avail, //< Entry is available for reading. + EntryState_Read, //< Entry has already been read (out of order). + EntryState_Drop //< Entry has been dropped. + }; + struct Entry + { + Entry() + : pUnit(NULL) + , status(EntryState_Empty) + {} + + CUnit* pUnit; + EntryStatus status; + }; + + //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } + + FixedArray m_entries; + + const size_t m_szSize; // size of the array of units (buffer) + CUnitQueue* m_pUnitQueue; // the shared unit queue + + int m_iStartSeqNo; + int m_iStartPos; // the head position for I/O (inclusive) + int m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos) + int m_iMaxPosInc; // the furthest data position + int m_iNotch; // the starting read point of the first unit + + size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false + int m_iFirstReadableOutOfOrder; // In case of out ouf order packet, points to a position of the first such packet to + // read + const bool m_bPeerRexmitFlag; // Needed to read message number correctly + const bool m_bMessageAPI; // Operation mode flag: message or stream. + +public: // TSBPD public functions + /// Set TimeStamp-Based Packet Delivery Rx Mode + /// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay + /// @param [in] wrap Is in wrapping period + /// @param [in] delay aggreed TsbPD delay + /// + /// @return 0 + void setTsbPdMode(const time_point& timebase, bool wrap, duration delay); + + void applyGroupTime(const time_point& timebase, bool wrp, uint32_t delay, const duration& udrift); + + void applyGroupDrift(const time_point& timebase, bool wrp, const duration& udrift); + + bool addRcvTsbPdDriftSample(uint32_t usTimestamp, int usRTTSample); + + time_point getPktTsbPdTime(uint32_t usPktTimestamp) const; + + time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const; + void updateTsbPdTimeBase(uint32_t usPktTimestamp); + + /// Form a string of the current buffer fullness state. + /// number of packets acknowledged, TSBPD readiness, etc. + std::string strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const; + +private: + CTsbpdTime m_tsbpd; + +private: // Statistics + AvgBufSize m_mavg; + + // TODO: m_BytesCountLock is probably not needed as the buffer has to be protected from simultaneous access. + mutable sync::Mutex m_BytesCountLock; // used to protect counters operations + int m_iBytesCount; // Number of payload bytes in the buffer + int m_iPktsCount; // Number of payload bytes in the buffer + unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation +}; + +} // namespace srt + +#endif // ENABLE_NEW_RCVBUFFER +#endif // INC_SRT_BUFFER_RCV_H diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 787533cea..67c2e6b08 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5166,6 +5166,199 @@ void srt::CUDT::rendezvousSwitchState(UDTRequestType& w_rsptype, bool& w_needs_e * This thread runs only if TsbPd mode is enabled * Hold received packets until its time to 'play' them, at PktTimeStamp + TsbPdDelay. */ +#if ENABLE_NEW_RCVBUFFER +void * srt::CUDT::tsbpd(void* param) +{ + CUDT* self = (CUDT*)param; + + THREAD_STATE_INIT("SRT:TsbPd"); + +#if ENABLE_EXPERIMENTAL_BONDING + // Make the TSBPD thread a "client" of the group, + // which will ensure that the group will not be physically + // deleted until this thread exits. + // NOTE: DO NOT LEAD TO EVER CANCEL THE THREAD!!! + CUDTUnited::GroupKeeper gkeeper(self->uglobal(), self->m_parent); +#endif + + UniqueLock recv_lock(self->m_RecvLock); + CSync recvdata_cc(self->m_RecvDataCond, recv_lock); + CSync tsbpd_cc(self->m_RcvTsbPdCond, recv_lock); + + self->m_bTsbPdAckWakeup = true; + while (!self->m_bClosing) + { + steady_clock::time_point tsNextDelivery; // Next packet delivery time + bool rxready = false; +#if ENABLE_EXPERIMENTAL_BONDING + bool shall_update_group = false; +#endif + + enterCS(self->m_RcvBufferLock); + const steady_clock::time_point tnow = steady_clock::now(); + + self->m_pRcvBuffer->updRcvAvgDataSize(tnow); + const srt::CRcvBufferNew::PacketInfo info = self->m_pRcvBuffer->getFirstValidPacketInfo(); + + const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time); + tsNextDelivery = info.tsbpd_time; + + if (!self->m_bTLPktDrop) + { + rxready = !info.seq_gap && is_time_to_deliver; + } + else if (is_time_to_deliver) + { + rxready = true; + if (info.seq_gap) + { + const int seq_gap_len = CSeqNo::seqoff(self->m_iRcvLastSkipAck, info.seqno); + SRT_ASSERT(seq_gap_len > 0); + /*if (!info.seq_gap) + { + LOGC(brlog.Warn, log << "TSBPD worker: no gap. pktseqno=" << info.seqno + << ", m_iRcvLastSkipAck=" << self->m_iRcvLastSkipAck + << ", RBuffer start seqno=" << self->m_pRcvBuffer->getStartSeqNo() + << ", m_iRcvLastAck=" << self->m_iRcvLastAck + << ", init seqnoo=" << self->m_iISN); + }*/ + + // Drop too late packets + self->updateForgotten(seq_gap_len, self->m_iRcvLastSkipAck, info.seqno); + //if (info.seq_gap) // If there is no sequence gap, we are reading ahead of ACK. + //{ + self->m_pRcvBuffer->dropUpTo(info.seqno); + //} + + self->m_iRcvLastSkipAck = info.seqno; +#if ENABLE_EXPERIMENTAL_BONDING + shall_update_group = true; +#endif + +#if ENABLE_LOGGING + const int64_t timediff_us = count_microseconds(tnow - info.tsbpd_time); + // TODO: seq_gap_len is not the actual number of packets dropped. +#if ENABLE_HEAVY_LOGGING + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: DROPSEQ: up to seqno %" << CSeqNo::decseq(info.seqno) << " (" + << seq_gap_len << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed " + << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms"); +#endif + LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << seq_gap_len << " packet(s), packet seqno %" << info.seqno + << " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') + << (timediff_us % 1000) << " ms"); +#endif + + tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for. + } + } + leaveCS(self->m_RcvBufferLock); + + if (rxready) + { + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " + << (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)"); + /* + * There are packets ready to be delivered + * signal a waiting "recv" call if there is any data available + */ + if (self->m_config.bSynRecving) + { + recvdata_cc.signal_locked(recv_lock); + } + /* + * Set EPOLL_IN to wakeup any thread waiting on epoll + */ + self->uglobal().m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true); +#if ENABLE_EXPERIMENTAL_BONDING + // If this is NULL, it means: + // - the socket never was a group member + // - the socket was a group member, but: + // - was just removed as a part of closure + // - and will never be member of the group anymore + + // If this is not NULL, it means: + // - This socket is currently member of the group + // - This socket WAS a member of the group, though possibly removed from it already, BUT: + // - the group that this socket IS OR WAS member of is in the GroupKeeper + // - the GroupKeeper prevents the group from being deleted + // - it is then completely safe to access the group here, + // EVEN IF THE SOCKET THAT WAS ITS MEMBER IS BEING DELETED. + + // It is ensured that the group object exists here because GroupKeeper + // keeps it busy, even if you just closed the socket, remove it as a member + // or even the group is empty and was explicitly closed. + if (gkeeper.group) + { + // Functions called below will lock m_GroupLock, which in hierarchy + // lies after m_RecvLock. Must unlock m_RecvLock to be able to lock + // m_GroupLock inside the calls. + InvertedLock unrecv(self->m_RecvLock); + // The current "APP reader" needs to simply decide as to whether + // the next CUDTGroup::recv() call should return with no blocking or not. + // When the group is read-ready, it should update its pollers as it sees fit. + + // NOTE: this call will set lock to m_IncludedGroup->m_GroupLock + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: GROUP: checking if %" << info.seqno << " makes group readable"); + gkeeper.group->updateReadState(self->m_SocketID, info.seqno); + + if (shall_update_group) + { + // A group may need to update the parallelly used idle links, + // should it have any. Pass the current socket position in order + // to skip it from the group loop. + // NOTE: SELF LOCKING. + gkeeper.group->updateLatestRcv(self->m_parent); + } + } +#endif + CGlobEvent::triggerEvent(); + tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for. + } + + if (!is_zero(tsNextDelivery)) + { + IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow); + /* + * Buffer at head of queue is not ready to play. + * Schedule wakeup when it will be. + */ + self->m_bTsbPdAckWakeup = false; + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno + << " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms"); + THREAD_PAUSED(); + tsbpd_cc.wait_until(tsNextDelivery); + THREAD_RESUMED(); + } + else + { + /* + * We have just signaled epoll; or + * receive queue is empty; or + * next buffer to deliver is not in receive queue (missing packet in sequence). + * + * Block until woken up by one of the following event: + * - All ready-to-play packets have been pulled and EPOLL_IN cleared (then loop to block until next pkt time + * if any) + * - New buffers ACKed + * - Closing the connection + */ + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); + self->m_bTsbPdAckWakeup = true; + THREAD_PAUSED(); + tsbpd_cc.wait(); + THREAD_RESUMED(); + } + + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); + } + THREAD_EXIT(); + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); + return NULL; +} +#else void * srt::CUDT::tsbpd(void *param) { CUDT *self = (CUDT *)param; @@ -5382,6 +5575,7 @@ void * srt::CUDT::tsbpd(void *param) HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; } +#endif // ENABLE_NEW_RCVBUFFER void srt::CUDT::updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno) { @@ -5435,7 +5629,12 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd try { m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize); +#if ENABLE_NEW_RCVBUFFER + SRT_ASSERT(m_iISN != -1); + m_pRcvBuffer = new srt::CRcvBufferNew(m_iISN, m_config.iRcvBufSize, &(m_pRcvQueue->m_UnitQueue), m_bPeerRexmitFlag, m_config.bMessageAPI); +#else m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_config.iRcvBufSize); +#endif // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_config.iFlightFlagSize); @@ -6609,18 +6808,26 @@ int srt::CUDT::recvmsg2(char* data, int len, SRT_MSGCTRL& w_mctrl) size_t srt::CUDT::getAvailRcvBufferSizeLock() const { ScopedLock lck(m_RcvBufferLock); - return m_pRcvBuffer->getAvailBufSize(); + return getAvailRcvBufferSizeNoLock(); } size_t srt::CUDT::getAvailRcvBufferSizeNoLock() const { +#if ENABLE_NEW_RCVBUFFER + return m_pRcvBuffer->getAvailSize(m_iRcvLastAck); +#else return m_pRcvBuffer->getAvailBufSize(); +#endif } bool srt::CUDT::isRcvBufferReady() const { ScopedLock lck(m_RcvBufferLock); +#if ENABLE_NEW_RCVBUFFER + return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); +#else return m_pRcvBuffer->isRcvDataReady(); +#endif } // int by_exception: accepts values of CUDTUnited::ErrorHandling: @@ -6663,7 +6870,13 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ { HLOGC(arlog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality"); enterCS(m_RcvBufferLock); +#if ENABLE_NEW_RCVBUFFER + const int res = (m_pRcvBuffer->isRcvDataReady(steady_clock::now())) + ? m_pRcvBuffer->readMessage(data, len, &w_mctrl) + : 0; +#else const int res = m_pRcvBuffer->readMsg(data, len); +#endif leaveCS(m_RcvBufferLock); w_mctrl.srctime = 0; @@ -6697,13 +6910,21 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ return res; } +#if !ENABLE_NEW_RCVBUFFER const int seqdistance = -1; +#endif if (!m_config.bSynRecving) { HLOGC(arlog.Debug, log << CONID() << "receiveMessage: BEGIN ASYNC MODE. Going to extract payload size=" << len); enterCS(m_RcvBufferLock); +#if ENABLE_NEW_RCVBUFFER + const int res = (m_pRcvBuffer->isRcvDataReady(steady_clock::now())) + ? m_pRcvBuffer->readMessage(data, len, &w_mctrl) + : 0; +#else const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance); +#endif leaveCS(m_RcvBufferLock); HLOGC(arlog.Debug, log << CONID() << "AFTER readMsg: (NON-BLOCKING) result=" << res); @@ -6765,9 +6986,13 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ do { +#if ENABLE_NEW_RCVBUFFER + if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now())) +#else steady_clock::time_point tstime SRT_ATR_UNUSED; int32_t seqno; if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady((tstime), (seqno), seqdistance)) +#endif { /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) @@ -6780,7 +7005,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ // of kicking TSBPD. // bool spurious = (tstime != 0); - HLOGC(tslog.Debug, log << CONID() << "receiveMessage: KICK tsbpd" << (is_zero(tstime) ? " (SPURIOUS!)" : "")); + HLOGC(tslog.Debug, log << CONID() << "receiveMessage: KICK tsbpd"); tscond.signal_locked(recvguard); } @@ -6823,7 +7048,11 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ */ enterCS(m_RcvBufferLock); +#if ENABLE_NEW_RCVBUFFER + res = m_pRcvBuffer->readMessage((data), len, &w_mctrl); +#else res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance); +#endif leaveCS(m_RcvBufferLock); HLOGC(arlog.Debug, log << CONID() << "AFTER readMsg: (BLOCKING) result=" << res); @@ -7509,12 +7738,22 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack) { const int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack); - HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << ack << " vs. current %" << m_iRcvLastSkipAck - << " (signing off " << acksize << " packets)"); + HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << m_iRcvLastSkipAck << " -> %" << ack + << " (" << acksize << " packets)"); m_iRcvLastAck = ack; m_iRcvLastSkipAck = ack; +#if ENABLE_NEW_RCVBUFFER + const std::pair range = m_pRcvBuffer->getAvailablePacketsRange(); + // Some packets acknowledged are not available in the buffer. + if (CSeqNo::seqcmp(range.second, ack) < 0) + { + LOGC(xtlog.Error, log << "IPE: Acknowledged seqno %" << ack << " outruns the RCV buffer state %" << range.first + << " - %" << range.second); + } + return acksize; +#else // NOTE: This is new towards UDT and prevents spurious // wakeup of select/epoll functions when no new packets // were signed off for extraction. @@ -7529,6 +7768,7 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack) if (distance > 0) return CSeqNo::decseq(ack, distance); return ack; +#endif } namespace srt { @@ -8644,10 +8884,16 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt) void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) { + const int32_t* dropdata = (const int32_t*) ctrlpkt.m_pcData; + { - const bool using_rexmit_flag = m_bPeerRexmitFlag; UniqueLock rlock(m_RecvLock); + const bool using_rexmit_flag = m_bPeerRexmitFlag; +#if ENABLE_NEW_RCVBUFFER + m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag)); +#else m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag); +#endif // When the drop request was received, it means that there are // packets for which there will never be ACK sent; if the TSBPD thread // is currently in the ACK-waiting state, it may never exit. @@ -8659,8 +8905,6 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) } } - const int32_t* dropdata = (const int32_t*) ctrlpkt.m_pcData; - dropFromLossLists(dropdata[0], dropdata[1]); // move forward with current recv seq no. @@ -8809,7 +9053,11 @@ void srt::CUDT::updateSrtRcvSettings() { /* We are TsbPd receiver */ enterCS(m_RecvLock); +#if ENABLE_NEW_RCVBUFFER + m_pRcvBuffer->setTsbPdMode(m_tsRcvPeerStartTime, false, milliseconds_from(m_iTsbPdDelay_ms)); +#else m_pRcvBuffer->setRcvTsbPdMode(m_tsRcvPeerStartTime, milliseconds_from(m_iTsbPdDelay_ms)); +#endif leaveCS(m_RecvLock); HLOGF(cnlog.Debug, @@ -9718,17 +9966,28 @@ int srt::CUDT::processData(CUnit* in_unit) } else { +#if ENABLE_NEW_RCVBUFFER + LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo + << ", insert offset " << offset << ". " + << m_pRcvBuffer->strFullnessState(m_iRcvLastAck, steady_clock::now()) + ); +#else LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo << ", insert offset " << offset << ". " << m_pRcvBuffer->strFullnessState(steady_clock::now()) ); +#endif return -1; } } bool adding_successful = true; +#if ENABLE_NEW_RCVBUFFER + if (m_pRcvBuffer->insert(*i) < 0) +#else if (m_pRcvBuffer->addData(*i, offset) < 0) +#endif { // addData returns -1 if at the m_iLastAckPos+offset position there already is a packet. // So this packet is "redundant". @@ -9774,14 +10033,6 @@ int srt::CUDT::processData(CUnit* in_unit) } #if ENABLE_HEAVY_LOGGING - std::ostringstream timebufspec; - if (m_bTsbPd) - { - int dsize = m_pRcvBuffer->getRcvDataSize(); - timebufspec << "(" << FormatTime(m_pRcvBuffer->debugGetDeliveryTime(0)) - << "-" << FormatTime(m_pRcvBuffer->debugGetDeliveryTime(dsize-1)) << ")"; - } - std::ostringstream expectspec; if (excessive) expectspec << "EXCESSIVE(" << exc_type << rexmit_reason << ")"; @@ -9798,7 +10049,6 @@ int srt::CUDT::processData(CUnit* in_unit) << ") " << " RSL=" << expectspec.str() << " SN=" << rexmitstat[pktrexmitflag] - << " DLVTM=" << timebufspec.str() << " FLAGS: " << rpkt.MessageFlagStr()); #endif diff --git a/srtcore/core.h b/srtcore/core.h index 523118fce..9f957d040 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -60,6 +60,7 @@ modified by #include "common.h" #include "list.h" #include "buffer.h" +#include "buffer_rcv.h" #include "window.h" #include "packet.h" #include "channel.h" @@ -417,7 +418,11 @@ class CUDT SRTU_PROPERTY_RO(SRTSOCKET, id, m_SocketID); SRTU_PROPERTY_RO(bool, isClosing, m_bClosing); +#if ENABLE_NEW_RCVBUFFER + SRTU_PROPERTY_RO(srt::CRcvBufferNew*, rcvBuffer, m_pRcvBuffer); +#else SRTU_PROPERTY_RO(CRcvBuffer*, rcvBuffer, m_pRcvBuffer); +#endif SRTU_PROPERTY_RO(bool, isTLPktDrop, m_bTLPktDrop); SRTU_PROPERTY_RO(bool, isSynReceiving, m_config.bSynRecving); SRTU_PROPERTY_RR(sync::Condition*, recvDataCond, &m_RecvDataCond); @@ -867,7 +872,11 @@ class CUDT int32_t m_iReXmitCount; // Re-Transmit Count since last ACK private: // Receiving related data +#if ENABLE_NEW_RCVBUFFER + CRcvBufferNew* m_pRcvBuffer; //< Receiver buffer +#else CRcvBuffer* m_pRcvBuffer; //< Receiver buffer +#endif CRcvLossList* m_pRcvLossList; //< Receiver loss list std::deque m_FreshLoss; //< Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for. int m_iReorderTolerance; //< Current value of dynamic reorder tolerance diff --git a/srtcore/filelist.maf b/srtcore/filelist.maf index 54e15df62..16fb48d45 100644 --- a/srtcore/filelist.maf +++ b/srtcore/filelist.maf @@ -3,6 +3,7 @@ SOURCES api.cpp buffer.cpp +buffer_rcv.cpp cache.cpp channel.cpp common.cpp @@ -53,6 +54,7 @@ udt.h PRIVATE HEADERS api.h buffer.h +buffer_rcv.h cache.h channel.h common.h diff --git a/srtcore/queue.h b/srtcore/queue.h index 2b7408747..21983d005 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -79,6 +79,7 @@ struct CUnit DROPPED = 3 }; Flag m_iFlag; // 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped + // TODO: Transition to the new RcvBuffer allows to use bool here. }; class CUnitQueue diff --git a/srtcore/utilities.h b/srtcore/utilities.h index f787a579e..1610505b8 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -410,6 +410,70 @@ struct DynamicStruct }; +/// Fixed-size array template class. +namespace srt { + +template +class FixedArray +{ +public: + FixedArray(size_t size) + : m_size(size) + , m_entries(new T[size]) + { + } + + ~FixedArray() + { + delete [] m_entries; + } + +public: + const T& operator[](size_t index) const + { + if (index >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + T& operator[](size_t index) + { + if (index >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + const T& operator[](int index) const + { + if (index < 0 || static_cast(index) >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + T& operator[](int index) + { + if (index < 0 || static_cast(index) >= m_size) + throw std::runtime_error("Invalid index"); + + return m_entries[index]; + } + + size_t size() const { return m_size; } + +private: + FixedArray(const FixedArray& ); + FixedArray& operator=(const FixedArray&); + +private: + size_t m_size; + T* const m_entries; +}; + +} // namespace srt + // ------------------------------------------------------------