Skip to content

Commit

Permalink
[core] Moved mutex from CSndQueue to CSndUList
Browse files Browse the repository at this point in the history
Co-authored-by: Sektor van Skijlen <ethouris@gmail.com>
  • Loading branch information
maxsharabayko and ethouris committed Jul 13, 2021
1 parent 3c1c490 commit e8d890c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 42 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ option(ENABLE_CXX_DEPS "Extra library dependencies in srt.pc for the CXX librari
option(USE_STATIC_LIBSTDCXX "Should use static rather than shared libstdc++" OFF)
option(ENABLE_INET_PTON "Set to OFF to prevent usage of inet_pton when building against modern SDKs while still requiring compatibility with older Windows versions, such as Windows XP, Windows Server 2003 etc." ON)
option(ENABLE_CODE_COVERAGE "Enable code coverage reporting" OFF)
option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV /temporary fix for #729/" OFF)
option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV" OFF)
option(ENABLE_STDCXX_SYNC "Use C++11 chrono and threads for timing instead of pthreads" OFF)
option(USE_OPENSSL_PC "Use pkg-config to find OpenSSL libraries" ON)
option(USE_BUSY_WAITING "Enable more accurate sending times at a cost of potentially higher CPU load" OFF)
Expand Down
51 changes: 29 additions & 22 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,20 @@ void srt::CUnitQueue::makeUnitGood(CUnit* unit)
unit->m_iFlag = CUnit::GOOD;
}

srt::CSndUList::CSndUList()
srt::CSndUList::CSndUList(sync::CTimer* pTimer)
: m_pHeap(NULL)
, m_iArrayLength(512)
, m_iLastEntry(-1)
, m_ListLock()
, m_pWindowLock(NULL)
, m_pWindowCond(NULL)
, m_pTimer(NULL)
, m_pTimer(pTimer)
{
setupCond(m_ListCond, "CSndUListCond");
m_pHeap = new CSNode*[m_iArrayLength];
}

srt::CSndUList::~CSndUList()
{
releaseCond(m_ListCond);
delete[] m_pHeap;
}

Expand Down Expand Up @@ -305,7 +305,7 @@ int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt)
if (-1 == m_iLastEntry)
return -1;

// no pop until the next schedulled time
// no pop until the next scheduled time
if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
return -1;

Expand Down Expand Up @@ -342,7 +342,6 @@ int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt)
void srt::CSndUList::remove(const CUDT* u)
{
ScopedLock listguard(m_ListLock);

remove_(u);
}

Expand All @@ -356,6 +355,21 @@ steady_clock::time_point srt::CSndUList::getNextProcTime()
return m_pHeap[0]->m_tsTimeStamp;
}

void srt::CSndUList::waitNonEmpty() const
{
UniqueLock listguard(m_ListLock);
if (m_iLastEntry >= 0)
return;

m_ListCond.wait(listguard);
}

void srt::CSndUList::signalInterrupt() const
{
ScopedLock listguard(m_ListLock);
m_ListCond.notify_all();
}

void srt::CSndUList::realloc_()
{
CSNode** temp = NULL;
Expand Down Expand Up @@ -420,7 +434,8 @@ void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const
// first entry, activate the sending queue
if (0 == m_iLastEntry)
{
CSync::lock_signal(*m_pWindowCond, *m_pWindowLock);
// m_ListLock is assumed to be locked.
m_ListCond.notify_all();
}
}

Expand Down Expand Up @@ -468,10 +483,8 @@ srt::CSndQueue::CSndQueue()
: m_pSndUList(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_WindowCond()
, m_bClosing(false)
{
setupCond(m_WindowCond, "Window");
}

srt::CSndQueue::~CSndQueue()
Expand All @@ -483,14 +496,14 @@ srt::CSndQueue::~CSndQueue()
m_pTimer->interrupt();
}

CSync::lock_signal(m_WindowCond, m_WindowLock);
// Unblock CSndQueue worker thread if it is waiting.
m_pSndUList->signalInterrupt();

if (m_WorkerThread.joinable())
{
HLOGC(rslog.Debug, log << "SndQueue: EXIT");
m_WorkerThread.join();
}
releaseCond(m_WindowCond);

delete m_pSndUList;
}
Expand All @@ -510,12 +523,9 @@ int srt::CSndQueue::m_counter = 0;

void srt::CSndQueue::init(CChannel* c, CTimer* t)
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock;
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList(t);

#if ENABLE_LOGGING
++m_counter;
Expand Down Expand Up @@ -575,14 +585,11 @@ void* srt::CSndQueue::worker(void* param)
self->m_WorkerStats.lNotReadyTs++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */

UniqueLock windlock(self->m_WindowLock);
CSync windsync(self->m_WindowCond, windlock);

// wait here if there is no sockets with data to be sent
THREAD_PAUSED();
if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
if (!self->m_bClosing)
{
windsync.wait();
self->m_pSndUList->waitNonEmpty();

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lCondWait++;
Expand Down
35 changes: 16 additions & 19 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ struct CSNode

class CSndUList
{
friend class CSndQueue;

public:
CSndUList();
CSndUList(sync::CTimer* pTimer);
~CSndUList();

public:
Expand All @@ -175,30 +173,32 @@ class CSndUList
/// Update the timestamp of the UDT instance on the list.
/// @param [in] u pointer to the UDT instance
/// @param [in] reschedule if the timestamp should be rescheduled

void update(const CUDT* u, EReschedule reschedule);

/// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
/// @param [out] addr destination address of the next packet
/// @param [out] pkt the next packet to be sent
/// @return 1 if successfully retrieved, -1 if no packet found.

int pop(sockaddr_any& addr, CPacket& pkt);

/// Remove UDT instance from the list.
/// @param [in] u pointer to the UDT instance

void remove(const CUDT* u);
void remove(const CUDT* u);// EXCLUDES(m_ListLock);

/// Retrieve the next scheduled processing time.
/// @return Scheduled processing time of the first UDT socket in the list.

sync::steady_clock::time_point getNextProcTime();

/// Wait for the list to become non empty.
void waitNonEmpty() const;

/// Signal to stop waiting in waitNonEmpty().
void signalInterrupt() const;

private:
/// Doubles the size of the list.
///
void realloc_();
void realloc_();// REQUIRES(m_ListLock);

/// Insert a new UDT instance into the list with realloc if required.
///
Expand All @@ -211,21 +211,21 @@ class CSndUList
///
/// @param [in] ts time stamp: next processing time
/// @param [in] u pointer to the UDT instance
void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);
void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);// REQUIRES(m_ListLock);

/// Removes CUDT entry from the list.
/// If the last entry is removed, calls sync::CTimer::interrupt().
void remove_(const CUDT* u);

private:
CSNode** m_pHeap; // The heap array
int m_iArrayLength; // physical length of the array
int m_iLastEntry; // position of last entry on the heap array
int m_iLastEntry; // position of last entry on the heap array or -1 if empty.

sync::Mutex m_ListLock;
mutable sync::Mutex m_ListLock; // Protects the list (m_pHeap, m_iArrayLength, m_iLastEntry).
mutable sync::Condition m_ListCond;

sync::Mutex* m_pWindowLock;
sync::Condition* m_pWindowCond;

sync::CTimer* m_pTimer;
sync::CTimer* const m_pTimer;

private:
CSndUList(const CSndUList&);
Expand Down Expand Up @@ -462,9 +462,6 @@ class CSndQueue
CChannel* m_pChannel; // The UDP channel for data sending
srt::sync::CTimer* m_pTimer; // Timing facility

srt::sync::Mutex m_WindowLock;
srt::sync::Condition m_WindowCond;

srt::sync::atomic<bool> m_bClosing; // closing the worker

#if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker
Expand Down

0 comments on commit e8d890c

Please sign in to comment.