Skip to content

Commit

Permalink
[core] Added CUDT::isRcvReady() with mutex lock.
Browse files Browse the repository at this point in the history
Added CUDT::getAvailRcvBufferSize() function with and without mutex
lock.
  • Loading branch information
maxsharabayko committed Oct 12, 2021
1 parent 6886bfa commit 86d1eb2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 42 deletions.
2 changes: 1 addition & 1 deletion srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ class CSeqNo

/// This behaves like seq1 - seq2, in comparison to numbers,
/// and with the statement that only the sign of the result matters.
/// That is, it returns a negative value if seq1 < seq2,
/// Returns a negative value if seq1 < seq2,
/// positive if seq1 > seq2, and zero if they are equal.
/// The only correct application of this function is when you
/// compare two values and it works faster than seqoff. However
Expand Down
66 changes: 41 additions & 25 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)
else
{
enterCS(m_RecvLock);
if (m_pRcvBuffer && m_pRcvBuffer->isRcvDataReady())
if (m_pRcvBuffer && isRcvBufferReady())
event |= SRT_EPOLL_IN;
leaveCS(m_RecvLock);
if (m_pSndBuffer && (m_config.iSndBufSize > m_pSndBuffer->getCurrBufSize()))
Expand Down Expand Up @@ -5211,7 +5211,6 @@ void * srt::CUDT::tsbpd(void *param)
{
int32_t skiptoseqno = SRT_SEQNO_NONE;
bool passack = true; // Get next packet to wait for even if not acked

rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq);

HLOGC(tslog.Debug,
Expand Down Expand Up @@ -6052,7 +6051,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

UniqueLock recvguard(m_RecvLock);

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (m_bShutdown)
{
Expand Down Expand Up @@ -6084,7 +6083,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

CSync rcond (m_RecvDataCond, recvguard);
CSync tscond (m_RcvTsbPdCond, recvguard);
if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
if (!m_config.bSynRecving)
{
Expand All @@ -6096,7 +6095,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
if (m_config.iRcvTimeOut < 0)
{
THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
{
// Do not block forever, check connection status each 1 sec.
rcond.wait_for(seconds_from(1));
Expand All @@ -6108,7 +6107,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
const steady_clock::time_point exptime =
steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);
THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
{
if (!rcond.wait_until(exptime)) // NOT means "not received a signal"
break; // timeout
Expand All @@ -6122,7 +6121,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
if (!m_bConnected)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
// See at the beginning
if (!m_config.bMessageAPI && m_bShutdown)
Expand Down Expand Up @@ -6152,7 +6151,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -6607,6 +6606,23 @@ int srt::CUDT::recvmsg2(char* data, int len, SRT_MSGCTRL& w_mctrl)
return receiveBuffer(data, len);
}

size_t srt::CUDT::getAvailRcvBufferSizeLock() const
{
ScopedLock lck(m_RcvBufferLock);
return m_pRcvBuffer->getAvailBufSize();
}

size_t srt::CUDT::getAvailRcvBufferSizeNoLock() const
{
return m_pRcvBuffer->getAvailBufSize();
}

bool srt::CUDT::isRcvBufferReady() const
{
ScopedLock lck(m_RcvBufferLock);
return m_pRcvBuffer->isRcvDataReady();
}

// int by_exception: accepts values of CUDTUnited::ErrorHandling:
// - 0 - by return value
// - 1 - by exception
Expand Down Expand Up @@ -6647,7 +6663,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
{
HLOGC(arlog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality");
enterCS(m_RcvBufferLock);
int res = m_pRcvBuffer->readMsg(data, len);
const int res = m_pRcvBuffer->readMsg(data, len);
leaveCS(m_RcvBufferLock);
w_mctrl.srctime = 0;

Expand All @@ -6662,7 +6678,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -6713,7 +6729,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// Kick TsbPd thread to schedule next wakeup (if running)
if (m_bTsbPd)
Expand Down Expand Up @@ -6792,12 +6808,12 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
{
HLOGP(tslog.Debug, "receiveMessage: DATA COND: KICKED.");
}
} while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
} while (stillConnected() && !timeout && (!isRcvBufferReady()));
THREAD_RESUMED();

HLOGC(tslog.Debug,
log << CONID() << "receiveMessage: lock-waiting loop exited: stillConntected=" << stillConnected()
<< " timeout=" << timeout << " data-ready=" << m_pRcvBuffer->isRcvDataReady());
<< " timeout=" << timeout << " data-ready=" << isRcvBufferReady());
}

/* XXX DEBUG STUFF - enable when required
Expand Down Expand Up @@ -6829,7 +6845,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
}
} while ((res == 0) && !timeout);

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// Falling here means usually that res == 0 && timeout == true.
// res == 0 would repeat the above loop, unless there was also a timeout.
Expand Down Expand Up @@ -6990,7 +7006,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
{
if (!m_bConnected || !m_CongCtl.ready())
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (!m_config.bMessageAPI && m_bShutdown)
return 0;
Expand Down Expand Up @@ -7072,14 +7088,14 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
CSync rcond (m_RecvDataCond, recvguard);

THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
rcond.wait();
THREAD_RESUMED();
}

if (!m_bConnected)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (!m_config.bMessageAPI && m_bShutdown)
return 0;
Expand All @@ -7098,7 +7114,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
}
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -7239,7 +7255,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

if (m_pRcvBuffer)
{
perf->byteAvailRcvBuf = m_pRcvBuffer->getAvailBufSize() * m_config.iMSS;
perf->byteAvailRcvBuf = getAvailRcvBufferSizeLock() * m_config.iMSS;
if (instantaneous) // no need for historical API for Rcv side
{
perf->pktRcvBuf = m_pRcvBuffer->getRcvDataSize(perf->byteRcvBuf, perf->msRcvBuf);
Expand Down Expand Up @@ -7489,7 +7505,7 @@ void srt::CUDT::releaseSynch()
// [[using locked(m_RcvBufferLock)]];
int32_t srt::CUDT::ackDataUpTo(int32_t ack)
{
int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);
const int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << ack << " vs. current %" << m_iRcvLastSkipAck
<< " (signing off " << acksize << " packets)");
Expand Down Expand Up @@ -7691,7 +7707,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp
int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
{
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
int32_t ack;
int32_t ack; // First unacknowledged packet seqnuence number (acknowledge up to ack).
int nbsent = 0;
int local_prevack = 0;

Expand Down Expand Up @@ -7881,7 +7897,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
data[ACKD_RCVLASTACK] = m_iRcvLastAck;
data[ACKD_RTT] = m_iSRTT;
data[ACKD_RTTVAR] = m_iRTTVar;
data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize();
data[ACKD_BUFFERLEFT] = getAvailRcvBufferSizeNoLock();
// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
if (data[ACKD_BUFFERLEFT] < 2)
data[ACKD_BUFFERLEFT] = 2;
Expand Down Expand Up @@ -9666,7 +9682,7 @@ int srt::CUDT::processData(CUnit* in_unit)
continue;
}

const int avail_bufsize = m_pRcvBuffer->getAvailBufSize();
const int avail_bufsize = getAvailRcvBufferSizeNoLock();
if (offset >= avail_bufsize)
{
// This is already a sequence discrepancy. Probably there could be found
Expand Down Expand Up @@ -9773,7 +9789,7 @@ int srt::CUDT::processData(CUnit* in_unit)
LOGC(qrlog.Debug, log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo
<< " offset=" << offset
<< " BUFr=" << avail_bufsize
<< " avail=" << m_pRcvBuffer->getAvailBufSize()
<< " avail=" << getAvailRcvBufferSizeNoLock()
<< " buffer=(" << m_iRcvLastSkipAck
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1)
Expand Down Expand Up @@ -11123,7 +11139,7 @@ void srt::CUDT::addEPoll(const int eid)
return;

enterCS(m_RecvLock);
if (m_pRcvBuffer->isRcvDataReady())
if (isRcvBufferReady())
{
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);
}
Expand Down
28 changes: 20 additions & 8 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ class CUDT
return m_stats.tsStartTime;
}

SRT_ATTR_EXCLUDES(m_RcvBufferLock)
bool isRcvBufferReady() const;

// TSBPD thread main function.
static void* tsbpd(void* param);

Expand Down Expand Up @@ -863,17 +866,17 @@ class CUDT
int32_t m_iReXmitCount; // Re-Transmit Count since last ACK

private: // Receiving related data
CRcvBuffer* m_pRcvBuffer; // Receiver buffer
CRcvLossList* m_pRcvLossList; // Receiver loss list
std::deque<CRcvFreshLoss> m_FreshLoss; // Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
int m_iReorderTolerance; // Current value of dynamic reorder tolerance
int m_iConsecEarlyDelivery; // Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
int m_iConsecOrderedDelivery; // Increases with every packet coming in order or retransmitted, resets with every out-of-order packet
CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
CRcvLossList* m_pRcvLossList; //< Receiver loss list
std::deque<CRcvFreshLoss> m_FreshLoss; //< Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
int m_iReorderTolerance; //< Current value of dynamic reorder tolerance
int m_iConsecEarlyDelivery; //< Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
int m_iConsecOrderedDelivery; //< Increases with every packet coming in order or retransmitted, resets with every out-of-order packet

CACKWindow<ACK_WND_SIZE> m_ACKWindow; // ACK history window
CPktTimeWindow<16, 64> m_RcvTimeWindow; // Packet arrival time window

int32_t m_iRcvLastAck; // Last sent ACK
int32_t m_iRcvLastAck; // First unacknowledged packet seqno sent in the latest ACK.
#ifdef ENABLE_LOGGING
int32_t m_iDebugPrevLastAck;
#endif
Expand Down Expand Up @@ -921,7 +924,7 @@ class CUDT
sync::Condition m_SendBlockCond; // used to block "send" call
sync::Mutex m_SendBlockLock; // lock associated to m_SendBlockCond

sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer
mutable sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer
// Protects access to m_iSndCurrSeqNo, m_iSndLastAck
sync::Mutex m_RecvAckLock; // Protects the state changes while processing incomming ACK (SRT_EPOLL_OUT)

Expand Down Expand Up @@ -1033,6 +1036,15 @@ class CUDT
int32_t ackDataUpTo(int32_t seq);
void handleKeepalive(const char* data, size_t lenghth);

/// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.
SRT_ATTR_EXCLUDES(m_RcvBufferLock)
size_t getAvailRcvBufferSizeLock() const;

/// Retrieves the available size of the receiver buffer.
/// Expects that m_RcvBufferLock is locked.
SRT_ATTR_REQUIRES(m_RcvBufferLock)
size_t getAvailRcvBufferSizeNoLock() const;

private: // Trace
struct CoreStats
{
Expand Down
14 changes: 6 additions & 8 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,27 +357,25 @@ class SRT_ATTR_SCOPED_CAPABILITY ScopedLock
class SRT_ATTR_SCOPED_CAPABILITY UniqueLock
{
friend class SyncEvent;
int m_iLocked;
Mutex& m_Mutex;

public:
SRT_ATTR_ACQUIRE(m_Mutex)
SRT_ATTR_ACQUIRE(m)
UniqueLock(Mutex &m);

SRT_ATTR_RELEASE(m_Mutex)
SRT_ATTR_RELEASE()
~UniqueLock();

public:
SRT_ATTR_ACQUIRE(m_Mutex)
SRT_ATTR_ACQUIRE()
void lock();

SRT_ATTR_RELEASE(m_Mutex)
SRT_ATTR_RELEASE()
void unlock();

SRT_ATTR_RETURN_CAPABILITY(m_Mutex)
Mutex* mutex(); // reflects C++11 unique_lock::mutex()

private:
int m_iLocked;
Mutex& m_Mutex;
};
#endif // ENABLE_STDCXX_SYNC

Expand Down

0 comments on commit 86d1eb2

Please sign in to comment.