Skip to content

Commit

Permalink
[core] Minor CSndBuffer edits. (#2430)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored Aug 15, 2022
1 parent 89e11eb commit f0b2003
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 55 deletions.
104 changes: 52 additions & 52 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes
}
}

CSndBuffer::CSndBuffer(int size, int mss)
CSndBuffer::CSndBuffer(int size, int maxpld)
: m_BufLock()
, m_pBlock(NULL)
, m_pFirstBlock(NULL)
Expand All @@ -171,35 +171,34 @@ CSndBuffer::CSndBuffer(int size, int mss)
, m_pBuffer(NULL)
, m_iNextMsgNo(1)
, m_iSize(size)
, m_iMSS(mss)
, m_iBlockLen(maxpld)
, m_iCount(0)
, m_iBytesCount(0)
{
// initial physical buffer of "size"
m_pBuffer = new Buffer;
m_pBuffer->m_pcData = new char[m_iSize * m_iMSS];
m_pBuffer->m_pcData = new char[m_iSize * m_iBlockLen];
m_pBuffer->m_iSize = m_iSize;
m_pBuffer->m_pNext = NULL;

// circular linked list for out bound packets
m_pBlock = new Block;
Block* pb = m_pBlock;
for (int i = 1; i < m_iSize; ++i)
{
pb->m_pNext = new Block;
pb->m_iMsgNoBitset = 0;
pb = pb->m_pNext;
}
pb->m_pNext = m_pBlock;
char* pc = m_pBuffer->m_pcData;

pb = m_pBlock;
char* pc = m_pBuffer->m_pcData;
for (int i = 0; i < m_iSize; ++i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
pb->m_iMsgNoBitset = 0;
pb->m_pcData = pc;
pc += m_iBlockLen;

if (i < m_iSize - 1)
{
pb->m_pNext = new Block;
pb = pb->m_pNext;
}
}
pb->m_pNext = m_pBlock;

m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;

Expand Down Expand Up @@ -230,31 +229,31 @@ CSndBuffer::~CSndBuffer()

void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
{
int32_t& w_msgno = w_mctrl.msgno;
int32_t& w_seqno = w_mctrl.pktseq;
int32_t& w_msgno = w_mctrl.msgno;
int32_t& w_seqno = w_mctrl.pktseq;
int64_t& w_srctime = w_mctrl.srctime;
const int& ttl = w_mctrl.msgttl;
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
const int iPktLen = m_iBlockLen; // Payload length per packet.
int iNumBlocks = len / iPktLen;
if ((len % m_iBlockLen) != 0)
++iNumBlocks;

HLOGC(bslog.Debug,
log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
<< len << " bytes");
log << "addBuffer: needs=" << iNumBlocks << " buffers for " << len << " bytes. Taken=" << m_iCount << "/" << m_iSize);
// Retrieve current time before locking the mutex to be closer to packet submission event.
const steady_clock::time_point tnow = steady_clock::now();

ScopedLock bufferguard(m_BufLock);
// Dynamically increase sender buffer if there is not enough room.
while (size + m_iCount >= m_iSize)
while (iNumBlocks + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (iNumBlocks + m_iCount - m_iSize) << " buffers...");
increase();
}

const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;
HLOGC(bslog.Debug,
log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
log << CONID() << "addBuffer: adding " << iNumBlocks << " packets (" << len << " bytes) to send, msgno="
<< (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");

// Calculate origin time (same for all blocks of the message).
Expand All @@ -281,16 +280,16 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
m_iNextMsgNo = w_msgno;
}

for (int i = 0; i < size; ++i)
for (int i = 0; i < iNumBlocks; ++i)
{
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
int pktlen = len - i * iPktLen;
if (pktlen > iPktLen)
pktlen = iPktLen;

HLOGC(bslog.Debug,
log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS)
log << "addBuffer: %" << w_seqno << " #" << w_msgno << " offset=" << (i * iPktLen)
<< " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
memcpy((s->m_pcData), data + i * m_iMSS, pktlen);
memcpy((s->m_pcData), data + i * iPktLen, pktlen);
s->m_iLength = pktlen;

s->m_iSeqNo = w_seqno;
Expand All @@ -299,7 +298,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
if (i == iNumBlocks - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
// if i == 0 == size-1, it results with PB_SOLO.
Expand All @@ -318,10 +317,10 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
}
m_pLastBlock = s;

m_iCount += size;
m_iCount += iNumBlocks;
m_iBytesCount += len;

m_rateEstimator.updateInputRate(m_tsLastOriginTime, size, len);
m_rateEstimator.updateInputRate(m_tsLastOriginTime, iNumBlocks, len);
updAvgBufSize(m_tsLastOriginTime);

// MSGNO_SEQ::mask has a form: 00000011111111...
Expand All @@ -337,39 +336,40 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)

int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
{
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
const int iPktLen = m_iBlockLen; // Payload length per packet.
int iNumBlocks = len / iPktLen;
if ((len % m_iBlockLen) != 0)
++iNumBlocks;

HLOGC(bslog.Debug,
log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size
log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << iPktLen
<< " buffers for " << len << " bytes");

// dynamically increase sender buffer
while (size + m_iCount >= m_iSize)
while (iPktLen + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug,
log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
log << "addBufferFromFile: ... still lacking " << (iPktLen + m_iCount - m_iSize) << " buffers...");
increase();
}

HLOGC(bslog.Debug,
log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len
log << CONID() << "addBufferFromFile: adding " << iPktLen << " packets (" << len
<< " bytes) to send, msgno=" << m_iNextMsgNo);

Block* s = m_pLastBlock;
int total = 0;
for (int i = 0; i < size; ++i)
for (int i = 0; i < iPktLen; ++i)
{
if (ifs.bad() || ifs.fail() || ifs.eof())
break;

int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
int pktlen = len - i * iPktLen;
if (pktlen > iPktLen)
pktlen = iPktLen;

HLOGC(bslog.Debug,
log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen
log << "addBufferFromFile: reading from=" << (i * iPktLen) << " size=" << pktlen
<< " TO BUFFER:" << (void*)s->m_pcData);
ifs.read(s->m_pcData, pktlen);
if ((pktlen = int(ifs.gcount())) <= 0)
Expand All @@ -379,7 +379,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
if (i == iPktLen - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: PB_FIRST | PB_LAST == PB_SOLO.
// none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
Expand All @@ -393,7 +393,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
m_pLastBlock = s;

enterCS(m_BufLock);
m_iCount += size;
m_iCount += iPktLen;
m_iBytesCount += total;

leaveCS(m_BufLock);
Expand Down Expand Up @@ -588,7 +588,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time
}

w_packet.m_pcData = p->m_pcData;
int readlen = p->m_iLength;
const int readlen = p->m_iLength;
w_packet.setLength(readlen);

// XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
Expand Down Expand Up @@ -750,7 +750,7 @@ void CSndBuffer::increase()
try
{
nbuf = new Buffer;
nbuf->m_pcData = new char[unitsize * m_iMSS];
nbuf->m_pcData = new char[unitsize * m_iBlockLen];
}
catch (...)
{
Expand Down Expand Up @@ -794,13 +794,13 @@ void CSndBuffer::increase()
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
pc += m_iBlockLen;
}

m_iSize += unitsize;

HLOGC(bslog.Debug,
log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize
log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iBlockLen) << " bytes spread to " << unitsize
<< " blocks"
<< " (total size: " << m_iSize << " bytes)");
}
Expand Down
9 changes: 6 additions & 3 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ class CSndBuffer
// Currently just "unimplemented".
std::string CONID() const { return ""; }

CSndBuffer(int size = 32, int mss = 1500);
/// @brief CSndBuffer constructor.
/// @param size initial number of blocks (each block to store one packet payload).
/// @param maxpld maximum packet payload.
CSndBuffer(int size = 32, int maxpld = 1500);
~CSndBuffer();

public:
Expand Down Expand Up @@ -256,7 +259,7 @@ class CSndBuffer
struct Block
{
char* m_pcData; // pointer to the data block
int m_iLength; // length of the block
int m_iLength; // payload length of the block.

int32_t m_iMsgNoBitset; // message number
int32_t m_iSeqNo; // sequence number for scheduling
Expand Down Expand Up @@ -292,7 +295,7 @@ class CSndBuffer
int32_t m_iNextMsgNo; // next message number

int m_iSize; // buffer size (number of packets)
int m_iMSS; // maximum seqment/packet size
const int m_iBlockLen; // maximum length of a block holding packet payload (excluding packet header).
int m_iCount; // number of used blocks

int m_iBytesCount; // number of payload bytes in queue
Expand Down

0 comments on commit f0b2003

Please sign in to comment.