Skip to content

Commit

Permalink
Improve thread pool worker thread's spinning for work
Browse files Browse the repository at this point in the history
Closes https://github.com/dotnet/coreclr/issues/5928

Replaced UnfairSemaphore with a new implementation in CLRLifoSemaphore
- UnfairSemaphore had a some benefits:
  - It tracked the number of spinners and avoids waking up waiters as long as the signal count can be satisfied by spinners
  - Since spinners get priority over waiters, that's the main "unfair" part of it that allows hot threads to remain hot and cold threads to remain cold. However, waiters are still released in FIFO order.
  - Spinning helps with throughput when incoming work is bursty
- All of the above benefits were retained in CLRLifoSemaphore and some were improved:
  - Similarly to UnfairSemaphore, the number of spinners are tracked and preferenced to avoid waking up waiters
  - For waiting, on Windows, a I/O completion port is used since it releases waiters in LIFO order. For Unix, added a prioritized wait function to the PAL to register waiters in reverse order for LIFO release behavior. This allows cold waiters to time out more easily since they will be used less frequently.
  - Similarly to SemaphoreSlim, the number of waiters that were signaled to wake but have not yet woken is tracked to help avoid waking up an excessive number of waiters
  - Added some YieldProcessorNormalized() calls to the spin loop. This avoids thrashing on Sleep(0) by adding a delay to the spin loop to allow it to be more effective when there are no threads to switch to, or the only other threads to switch to are other similar spinners.
  - Removed the processor count multiplier on the max spin count and retuned the default max spin count. The processor count multiplier was causing excessive CPU usage on machines with many processors.

Perf results

For the test case in https://github.com/dotnet/coreclr/issues/5928, CPU time spent in UnfairSemaphore::Wait was halved. CPU time % spent in UnfairSemaphore::Wait relative to time spent in WorkerThreadStart reduced from about 88% to 78%.

Updated spin perf code here: dotnet#13670
- NPc = (N * proc count) threads
- MPcWi = (M * proc count) work items
- BurstWorkThroughput queues that many work items in a burst, then releases the thread pool threads to process all of them, and once all are processed, repeats
- SustainedWorkThroughput has work items queue another of itself with some initial number of work items such that the work item count never reaches zero

```
Spin                                          Left score      Right score     ∆ Score %
--------------------------------------------  --------------  --------------  ---------
ThreadPoolBurstWorkThroughput 1Pc 000.25PcWi   276.10 ±1.09%   268.90 ±1.36%     -2.61%
ThreadPoolBurstWorkThroughput 1Pc 000.50PcWi   362.63 ±0.47%   388.82 ±0.33%      7.22%
ThreadPoolBurstWorkThroughput 1Pc 001.00PcWi   498.33 ±0.32%   797.01 ±0.29%     59.94%
ThreadPoolBurstWorkThroughput 1Pc 004.00PcWi  1222.52 ±0.42%  1348.78 ±0.47%     10.33%
ThreadPoolBurstWorkThroughput 1Pc 016.00PcWi  1672.72 ±0.48%  1724.06 ±0.47%      3.07%
ThreadPoolBurstWorkThroughput 1Pc 064.00PcWi  1853.94 ±0.25%  1868.36 ±0.45%      0.78%
ThreadPoolBurstWorkThroughput 1Pc 256.00PcWi  1849.30 ±0.24%  1902.58 ±0.48%      2.88%
ThreadPoolSustainedWorkThroughput 1Pc         1495.62 ±0.78%  1505.89 ±0.20%      0.69%
--------------------------------------------  --------------  --------------  ---------
Total                                          922.22 ±0.51%  1004.59 ±0.51%      8.93%
```

Numbers on Linux were similar with a slightly different spread and no regressions.

I also tried the plaintext benchmark from https://github.com/aspnet/benchmarks on Windows (couldn't get it to build on Linux at the time). No noticeable change to throughput or latency, and the CPU time spent in UnfairSemaphore::Wait decreased a little from ~2% to ~0.5% in CLRLifoSemaphore::Wait.
  • Loading branch information
kouvel committed Sep 17, 2017
1 parent 7f2b64a commit 2a518e9
Show file tree
Hide file tree
Showing 13 changed files with 687 additions and 355 deletions.
2 changes: 1 addition & 1 deletion src/inc/clrconfigvalues.h
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_ForceMaxWorkerThreads, W("ThreadPoo
RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_DisableStarvationDetection, W("ThreadPool_DisableStarvationDetection"), 0, "Disables the ThreadPool feature that forces new threads to be added when workitems run for too long")
RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_DebugBreakOnWorkerStarvation, W("ThreadPool_DebugBreakOnWorkerStarvation"), 0, "Breaks into the debugger if the ThreadPool detects work queue starvation")
RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_EnableWorkerTracking, W("ThreadPool_EnableWorkerTracking"), 0, "Enables extra expensive tracking of how many workers threads are working simultaneously")
RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit, W("ThreadPool_UnfairSemaphoreSpinLimit"), 50, "Per processor limit used when calculating spin duration in UnfairSemaphore::Wait")
RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit, W("ThreadPool_UnfairSemaphoreSpinLimit"), 0x46, "Maximum number of spins a thread pool worker thread performs before waiting for work")
RETAIL_CONFIG_DWORD_INFO(EXTERNAL_Thread_UseAllCpuGroups, W("Thread_UseAllCpuGroups"), 0, "Specifies if to automatically distribute thread across CPU Groups")

CONFIG_DWORD_INFO(INTERNAL_ThreadpoolTickCountAdjustment, W("ThreadpoolTickCountAdjustment"), 0, "")
Expand Down
7 changes: 7 additions & 0 deletions src/pal/inc/pal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,13 @@ WaitForSingleObject(
IN HANDLE hHandle,
IN DWORD dwMilliseconds);

PALIMPORT
DWORD
PALAPI
PAL_WaitForSingleObjectPrioritized(
IN HANDLE hHandle,
IN DWORD dwMilliseconds);

PALIMPORT
DWORD
PALAPI
Expand Down
3 changes: 2 additions & 1 deletion src/pal/src/include/pal/corunix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ namespace CorUnix
RegisterWaitingThread(
WaitType eWaitType,
DWORD dwIndex,
bool fAltertable
bool fAltertable,
bool fPrioritize
) = 0;

//
Expand Down
3 changes: 2 additions & 1 deletion src/pal/src/include/pal/synchobjects.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace CorUnix
CONST HANDLE *lpHandles,
BOOL bWaitAll,
DWORD dwMilliseconds,
BOOL bAlertable);
BOOL bAlertable,
BOOL bPrioritize = FALSE);

PAL_ERROR InternalSleepEx(
CPalThread * pthrCurrent,
Expand Down
152 changes: 111 additions & 41 deletions src/pal/src/synchmgr/synchcontrollers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ namespace CorUnix
PAL_ERROR CSynchWaitController::RegisterWaitingThread(
WaitType wtWaitType,
DWORD dwIndex,
bool fAlertable)
bool fAlertable,
bool fPrioritize)
{
VALIDATEOBJECT(m_psdSynchData);

Expand Down Expand Up @@ -421,12 +422,12 @@ namespace CorUnix
// Add new node to queue
if (fSharedObject)
{
m_psdSynchData->SharedWaiterEnqueue(shridNewNode);
m_psdSynchData->SharedWaiterEnqueue(shridNewNode, fPrioritize);
ptwiWaitInfo->lSharedObjCount += 1;
}
else
{
m_psdSynchData->WaiterEnqueue(pwtlnNewNode);
m_psdSynchData->WaiterEnqueue(pwtlnNewNode, fPrioritize);
}

// Succeeded: update object count
Expand Down Expand Up @@ -1821,7 +1822,7 @@ namespace CorUnix
Note: this method must be called while holding the local process
synchronization lock.
--*/
void CSynchData::WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode)
void CSynchData::WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode, bool fPrioritize)
{
VALIDATEOBJECT(this);
VALIDATEOBJECT(pwtlnNewNode);
Expand All @@ -1833,26 +1834,55 @@ namespace CorUnix
"Trying to add a WaitingThreadsListNode marked as shared "
"as it was a local one\n");

WaitingThreadsListNode * pwtlnCurrLast = m_ptrWTLTail.ptr;

pwtlnNewNode->ptrNext.ptr = NULL;
if (NULL == pwtlnCurrLast)
if (!fPrioritize)
{
_ASSERT_MSG(NULL == m_ptrWTLHead.ptr,
"Corrupted waiting list on local CSynchData @ %p\n",
this);
// Enqueue normally to the end of the queue
WaitingThreadsListNode * pwtlnCurrLast = m_ptrWTLTail.ptr;

pwtlnNewNode->ptrNext.ptr = NULL;
if (NULL == pwtlnCurrLast)
{
_ASSERT_MSG(NULL == m_ptrWTLHead.ptr,
"Corrupted waiting list on local CSynchData @ %p\n",
this);

pwtlnNewNode->ptrPrev.ptr = NULL;
m_ptrWTLHead.ptr = pwtlnNewNode;
m_ptrWTLTail.ptr = pwtlnNewNode;
pwtlnNewNode->ptrPrev.ptr = NULL;
m_ptrWTLHead.ptr = pwtlnNewNode;
m_ptrWTLTail.ptr = pwtlnNewNode;
}
else
{
VALIDATEOBJECT(pwtlnCurrLast);

pwtlnNewNode->ptrPrev.ptr = pwtlnCurrLast;
pwtlnCurrLast->ptrNext.ptr = pwtlnNewNode;
m_ptrWTLTail.ptr = pwtlnNewNode;
}
}
else
{
VALIDATEOBJECT(pwtlnCurrLast);
// The wait is prioritized, enqueue to the beginning of the queue
WaitingThreadsListNode * pwtlnCurrFirst = m_ptrWTLHead.ptr;

pwtlnNewNode->ptrPrev.ptr = NULL;
if (NULL == pwtlnCurrFirst)
{
_ASSERT_MSG(NULL == m_ptrWTLTail.ptr,
"Corrupted waiting list on local CSynchData @ %p\n",
this);

pwtlnNewNode->ptrNext.ptr = NULL;
m_ptrWTLHead.ptr = pwtlnNewNode;
m_ptrWTLTail.ptr = pwtlnNewNode;
}
else
{
VALIDATEOBJECT(pwtlnCurrFirst);

pwtlnNewNode->ptrPrev.ptr = pwtlnCurrLast;
pwtlnCurrLast->ptrNext.ptr = pwtlnNewNode;
m_ptrWTLTail.ptr = pwtlnNewNode;
pwtlnNewNode->ptrNext.ptr = pwtlnCurrFirst;
pwtlnCurrFirst->ptrPrev.ptr = pwtlnNewNode;
m_ptrWTLHead.ptr = pwtlnNewNode;
}
}

m_ulcWaitingThreads += 1;
Expand All @@ -1872,45 +1902,85 @@ namespace CorUnix
Note: this method must be called while holding both local and shared
synchronization locks.
--*/
void CSynchData::SharedWaiterEnqueue(SharedID shridNewNode)
void CSynchData::SharedWaiterEnqueue(SharedID shridNewNode, bool fPrioritize)
{
VALIDATEOBJECT(this);

_ASSERT_MSG(SharedObject == GetObjectDomain(),
"Trying to enqueue a WaitingThreadsListNode as shared "
"on a local object\n");

SharedID shridCurrLast;
WaitingThreadsListNode * pwtlnCurrLast, * pwtlnNewNode;
if (!fPrioritize)
{
// Enqueue normally to the end of the queue
SharedID shridCurrLast;
WaitingThreadsListNode * pwtlnCurrLast, * pwtlnNewNode;

shridCurrLast = m_ptrWTLTail.shrid;
pwtlnCurrLast = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrLast);
pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode);
shridCurrLast = m_ptrWTLTail.shrid;
pwtlnCurrLast = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrLast);
pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode);

_ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags),
"Trying to add a WaitingThreadsListNode marked as local "
"as it was a shared one\n");
_ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags),
"Trying to add a WaitingThreadsListNode marked as local "
"as it was a shared one\n");

VALIDATEOBJECT(pwtlnNewNode);
VALIDATEOBJECT(pwtlnNewNode);

pwtlnNewNode->ptrNext.shrid = NULL;
if (NULL == pwtlnCurrLast)
{
_ASSERT_MSG(NULL == m_ptrWTLHead.shrid,
"Corrupted waiting list on shared CSynchData at "
"{shrid=%p, p=%p}\n", m_shridThis, this);
pwtlnNewNode->ptrNext.shrid = NULL;
if (NULL == pwtlnCurrLast)
{
_ASSERT_MSG(NULL == m_ptrWTLHead.shrid,
"Corrupted waiting list on shared CSynchData at "
"{shrid=%p, p=%p}\n", m_shridThis, this);

pwtlnNewNode->ptrPrev.shrid = NULL;
m_ptrWTLHead.shrid = shridNewNode;
m_ptrWTLTail.shrid = shridNewNode;
pwtlnNewNode->ptrPrev.shrid = NULL;
m_ptrWTLHead.shrid = shridNewNode;
m_ptrWTLTail.shrid = shridNewNode;
}
else
{
VALIDATEOBJECT(pwtlnCurrLast);

pwtlnNewNode->ptrPrev.shrid = shridCurrLast;
pwtlnCurrLast->ptrNext.shrid = shridNewNode;
m_ptrWTLTail.shrid = shridNewNode;
}
}
else
{
VALIDATEOBJECT(pwtlnCurrLast);
// The wait is prioritized, enqueue to the beginning of the queue
SharedID shridCurrFirst;
WaitingThreadsListNode * pwtlnCurrFirst, * pwtlnNewNode;

shridCurrFirst = m_ptrWTLHead.shrid;
pwtlnCurrFirst = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrFirst);
pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode);

_ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags),
"Trying to add a WaitingThreadsListNode marked as local "
"as it was a shared one\n");

VALIDATEOBJECT(pwtlnNewNode);

pwtlnNewNode->ptrPrev.shrid = NULL;
if (NULL == pwtlnCurrFirst)
{
_ASSERT_MSG(NULL == m_ptrWTLTail.shrid,
"Corrupted waiting list on shared CSynchData at "
"{shrid=%p, p=%p}\n", m_shridThis, this);

pwtlnNewNode->ptrPrev.shrid = shridCurrLast;
pwtlnCurrLast->ptrNext.shrid = shridNewNode;
m_ptrWTLTail.shrid = shridNewNode;
pwtlnNewNode->ptrNext.shrid = NULL;
m_ptrWTLHead.shrid = shridNewNode;
m_ptrWTLTail.shrid = shridNewNode;
}
else
{
VALIDATEOBJECT(pwtlnCurrFirst);

pwtlnNewNode->ptrNext.shrid = shridCurrFirst;
pwtlnCurrFirst->ptrPrev.shrid = shridNewNode;
m_ptrWTLHead.shrid = shridNewNode;
}
}

m_ulcWaitingThreads += 1;
Expand Down
2 changes: 1 addition & 1 deletion src/pal/src/synchmgr/synchmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3867,7 +3867,7 @@ namespace CorUnix
pwtlnNew->shridWaitingState = pwtlnOld->shridWaitingState;
pwtlnNew->ptwiWaitInfo = pwtlnOld->ptwiWaitInfo;

psdShared->SharedWaiterEnqueue(rgshridWTLNodes[i]);
psdShared->SharedWaiterEnqueue(rgshridWTLNodes[i], false);
psdShared->AddRef();

_ASSERTE(pwtlnOld = pwtlnOld->ptwiWaitInfo->rgpWTLNodes[pwtlnOld->dwObjIndex]);
Expand Down
7 changes: 4 additions & 3 deletions src/pal/src/synchmgr/synchmanager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ namespace CorUnix
CPalThread * pthrCurrent,
CPalThread * pthrTarget);

void WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode);
void SharedWaiterEnqueue(SharedID shridNewNode);
void WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode, bool fPrioritize);
void SharedWaiterEnqueue(SharedID shridNewNode, bool fPrioritize);

// Object Domain accessor methods
ObjectDomain GetObjectDomain(void)
Expand Down Expand Up @@ -464,7 +464,8 @@ namespace CorUnix
virtual PAL_ERROR RegisterWaitingThread(
WaitType wtWaitType,
DWORD dwIndex,
bool fAlertable);
bool fAlertable,
bool fPrioritize);

virtual void ReleaseController(void);

Expand Down
35 changes: 33 additions & 2 deletions src/pal/src/synchmgr/wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,35 @@ WaitForSingleObject(IN HANDLE hHandle,
}


/*++
Function:
WaitForSingleObjectPrioritized
Similar to WaitForSingleObject, except uses a LIFO release policy for waiting threads by prioritizing new waiters (registering
them at the beginning of the wait queue rather than at the end).
--*/
DWORD
PALAPI
PAL_WaitForSingleObjectPrioritized(IN HANDLE hHandle,
IN DWORD dwMilliseconds)
{
DWORD dwRet;

PERF_ENTRY(PAL_WaitForSingleObjectPrioritized);
ENTRY("PAL_WaitForSingleObjectPrioritized(hHandle=%p, dwMilliseconds=%u)\n",
hHandle, dwMilliseconds);

CPalThread * pThread = InternalGetCurrentThread();

dwRet = InternalWaitForMultipleObjectsEx(pThread, 1, &hHandle, FALSE,
dwMilliseconds, FALSE, TRUE /* bPrioritize */);

LOGEXIT("PAL_WaitForSingleObjectPrioritized returns DWORD %u\n", dwRet);
PERF_EXIT(PAL_WaitForSingleObjectPrioritized);
return dwRet;
}


/*++
Function:
WaitForSingleObjectEx
Expand Down Expand Up @@ -285,7 +314,8 @@ DWORD CorUnix::InternalWaitForMultipleObjectsEx(
CONST HANDLE *lpHandles,
BOOL bWaitAll,
DWORD dwMilliseconds,
BOOL bAlertable)
BOOL bAlertable,
BOOL bPrioritize)
{
DWORD dwRet = WAIT_FAILED;
PAL_ERROR palErr = NO_ERROR;
Expand Down Expand Up @@ -530,7 +560,8 @@ DWORD CorUnix::InternalWaitForMultipleObjectsEx(
palErr = ppISyncWaitCtrlrs[i]->RegisterWaitingThread(
wtWaitType,
i,
(TRUE == bAlertable));
(TRUE == bAlertable),
bPrioritize != FALSE);
if (NO_ERROR != palErr)
{
ERROR("RegisterWaitingThread() failed for %d-th object "
Expand Down
Loading

0 comments on commit 2a518e9

Please sign in to comment.