Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Associating threadpool queues with CPU cores. #18403

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,16 @@ protected internal override void QueueTask(Task task)
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If the task was previously scheduled, and we can't pop it, then return false.
if (taskWasPreviouslyQueued && !ThreadPool.TryPopCustomWorkItem(task))
return false;
if (taskWasPreviouslyQueued)
{
// do not inline in a nontrivial sync context - it could be stricter than what enqueuer had.
SynchronizationContext? syncCtx = SynchronizationContext.Current;
if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
return false;

if (!ThreadPool.TryPopCustomWorkItem(task))
return false;
}

try
{
Expand Down
1,732 changes: 1,323 additions & 409 deletions src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,14 @@ private static int RefreshCurrentProcessorId()
return currentProcessorId;
}

// Clear the cached processor Id
// This can be used before/after blocking the thread for nontrivial amount of time
// or around other operations which are likely to result in changing executing core.
internal static void FlushCurrentProcessorId()
{
t_currentProcessorIdCache = default;
}

// Cached processor id used as a hint for which per-core stack to access. It is periodically
// refreshed to trail the actual thread core affinity.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
26 changes: 5 additions & 21 deletions src/vm/threadpoolrequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,18 +537,9 @@ void ManagedPerAppDomainTPCount::SetAppDomainRequestsActive()
_ASSERTE(m_index.m_dwIndex != UNUSED_THREADPOOL_INDEX);

#ifndef DACCESS_COMPILE
LONG count = VolatileLoad(&m_numRequestsPending);
while (true)
{
LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, count+1, count);
if (prev == count)
{
ThreadpoolMgr::MaybeAddWorkingWorker();
ThreadpoolMgr::EnsureGateThreadRunning();
break;
}
count = prev;
}
FastInterlockIncrement(&m_numRequestsPending);
ThreadpoolMgr::MaybeAddWorkingWorker();
ThreadpoolMgr::EnsureGateThreadRunning();
#endif
}

Expand All @@ -560,20 +551,13 @@ void ManagedPerAppDomainTPCount::ClearAppDomainRequestsActive()

_ASSERTE(m_index.m_dwIndex != UNUSED_THREADPOOL_INDEX);

LONG count = VolatileLoad(&m_numRequestsPending);
while (count > 0)
{
LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, 0, count);
if (prev == count)
break;
count = prev;
}
m_numRequestsPending = 0;
}

bool ManagedPerAppDomainTPCount::TakeActiveRequest()
{
LIMITED_METHOD_CONTRACT;
LONG count = VolatileLoad(&m_numRequestsPending);
LONG count = VolatileLoadWithoutBarrier(&m_numRequestsPending);
while (count > 0)
{
LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, count-1, count);
Expand Down
8 changes: 2 additions & 6 deletions src/vm/threadpoolrequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,9 @@ class ManagedPerAppDomainTPCount : public IPerAppDomainTPCount {

private:
TPIndex m_index;
struct DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) {
BYTE m_padding1[MAX_CACHE_LINE_SIZE - sizeof(LONG)];
struct ALIGNED(MAX_CACHE_LINE_SIZE) {
Copy link
Member

@kouvel kouvel Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also ensure that there is sufficient padding after this struct (by increasing the size of the containing struct for not only alignment but padding afterwards) such that another object cannot be in the same cache line as this struct?

Similarly elsewhere where struct ALIGNED() is used. #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is trailing padding. https://docs.microsoft.com/en-us/cpp/cpp/align-cpp?view=vs-2019

  • The size of a structure is the smallest multiple of its alignment greater than or equal the offset of the end of its last member

In reply to: 328285485 [](ancestors = 328285485)

// Only use with VolatileLoad+VolatileStore+FastInterlockCompareExchange
LONG m_numRequestsPending;
BYTE m_padding2[MAX_CACHE_LINE_SIZE];
};
};

Expand Down Expand Up @@ -230,11 +228,9 @@ class UnManagedPerAppDomainTPCount : public IPerAppDomainTPCount {
private:
SpinLock m_lock;
ULONG m_NumRequests;
struct DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) {
BYTE m_padding1[MAX_CACHE_LINE_SIZE - sizeof(LONG)];
struct ALIGNED(MAX_CACHE_LINE_SIZE) {
// Only use with VolatileLoad+VolatileStore+FastInterlockCompareExchange
LONG m_outstandingThreadRequestCount;
BYTE m_padding2[MAX_CACHE_LINE_SIZE];
};
};

Expand Down
6 changes: 6 additions & 0 deletions src/vm/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
#define MAX_CACHE_LINE_SIZE 64
#endif

#ifdef _MSC_VER
#define ALIGNED(x) __declspec(align(x))
#else
#define ALIGNED(x) __attribute__((aligned(x)))
#endif

//========================================================================
// More convenient names for integer types of a guaranteed size.
//========================================================================
Expand Down
31 changes: 16 additions & 15 deletions src/vm/win32threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ SetWaitableTimerExProc g_pufnSetWaitableTimerEx = NULL;
BOOL ThreadpoolMgr::InitCompletionPortThreadpool = FALSE;
HANDLE ThreadpoolMgr::GlobalCompletionPort; // used for binding io completions on file handles

// Cacheline aligned
SVAL_IMPL(ThreadpoolMgr::ThreadCounter,ThreadpoolMgr,CPThreadCounter);

SVAL_IMPL_INIT(LONG,ThreadpoolMgr,MaxLimitTotalCPThreads,1000); // = MaxLimitCPThreadsPerCPU * number of CPUS
Expand All @@ -85,8 +86,8 @@ SVAL_IMPL(LONG,ThreadpoolMgr,MaxFreeCPThreads); // = MaxFreeCP

Volatile<LONG> ThreadpoolMgr::NumCPInfrastructureThreads = 0; // number of threads currently busy handling draining cycle

// Cacheline aligned, hot variable
DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter);
// Cacheline aligned
SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter);

SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
SVAL_IMPL(LONG,ThreadpoolMgr,MaxLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
Expand All @@ -97,12 +98,9 @@ LONG ThreadpoolMgr::cpuUtilizationAverage = 0;
HillClimbing ThreadpoolMgr::HillClimbingInstance;

// Cacheline aligned, 3 hot variables updated in a group
DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::PriorCompletedWorkRequests = 0;
DWORD ThreadpoolMgr::PriorCompletedWorkRequestsTime;
DWORD ThreadpoolMgr::NextCompletedWorkRequestsTime;
ThreadpoolMgr::WorkRequestDataT ThreadpoolMgr::WorkRequestData;

LARGE_INTEGER ThreadpoolMgr::CurrentSampleStartTime;

unsigned int ThreadpoolMgr::WorkerThreadSpinLimit;
bool ThreadpoolMgr::IsHillClimbingDisabled;
int ThreadpoolMgr::ThreadAdjustmentInterval;
Expand Down Expand Up @@ -924,7 +922,7 @@ void ThreadpoolMgr::AdjustMaxWorkersActive()

DWORD currentTicks = GetTickCount();
LONG totalNumCompletions = (LONG)Thread::GetTotalWorkerThreadPoolCompletionCount();
LONG numCompletions = totalNumCompletions - VolatileLoad(&PriorCompletedWorkRequests);
LONG numCompletions = totalNumCompletions - VolatileLoad(&WorkRequestData.PriorCompletedWorkRequests);

LARGE_INTEGER startTime = CurrentSampleStartTime;
LARGE_INTEGER endTime;
Expand Down Expand Up @@ -986,10 +984,12 @@ void ThreadpoolMgr::AdjustMaxWorkersActive()
}
}

PriorCompletedWorkRequests = totalNumCompletions;
NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval;
MemoryBarrier(); // flush previous writes (especially NextCompletedWorkRequestsTime)
PriorCompletedWorkRequestsTime = currentTicks;
WorkRequestData.PriorCompletedWorkRequests = totalNumCompletions;
WorkRequestData.NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval;
// make sure that NextCompletedWorkRequestsTime is updated before PriorCompletedWorkRequestsTime
// so that reader never sees newer PriorCompletedWorkRequestsTime while having stale NextCompletedWorkRequestsTime
// NB: we are holding the ThreadAdjustmentLock and therefore the order cannot be violated by two threads writing.
VolatileStore(&WorkRequestData.PriorCompletedWorkRequestsTime, currentTicks);
CurrentSampleStartTime = endTime;;
}
}
Expand All @@ -1011,13 +1011,14 @@ void ThreadpoolMgr::MaybeAddWorkingWorker()
while (true)
{
newCounts = counts;
newCounts.NumWorking = max(counts.NumWorking, min(counts.NumWorking + 1, counts.MaxWorking));
newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking);
newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive));

if (newCounts == counts)
if (counts.NumWorking + 1 > counts.MaxWorking)
return;

newCounts.NumWorking++;
newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking);
newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive));

ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);

if (oldCounts == counts)
Expand Down
32 changes: 22 additions & 10 deletions src/vm/win32threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ class ThreadpoolMgr
friend struct _DacGlobals;

public:
struct ThreadCounter

// ThreadCounter is always cacheline aligned
struct ALIGNED(MAX_CACHE_LINE_SIZE) ThreadCounter
{
static const int MaxPossibleCount = 0x7fff;

Expand Down Expand Up @@ -299,7 +301,7 @@ class ThreadpoolMgr
static inline void UpdateLastDequeueTime()
{
LIMITED_METHOD_CONTRACT;
VolatileStore(&LastDequeueTime, (unsigned int)GetTickCount());
LastDequeueTime = (unsigned int)GetTickCount();
}

static BOOL CreateTimerQueueTimer(PHANDLE phNewTimer,
Expand Down Expand Up @@ -851,9 +853,11 @@ class ThreadpoolMgr
{
WRAPPER_NO_CONTRACT;

DWORD priorTime = PriorCompletedWorkRequestsTime;
MemoryBarrier(); // read fresh value for NextCompletedWorkRequestsTime below
DWORD requiredInterval = NextCompletedWorkRequestsTime - priorTime;
// make sure that PriorCompletedWorkRequestsTime is read before NextCompletedWorkRequestsTime
// to make sure that NextCompletedWorkRequestsTime is not older than PriorCompletedWorkRequestsTime
// NB: we write them in reverse order while holding a lock.
DWORD priorTime = VolatileLoad(&WorkRequestData.PriorCompletedWorkRequestsTime);
DWORD requiredInterval = WorkRequestData.NextCompletedWorkRequestsTime - priorTime;
DWORD elapsedInterval = GetTickCount() - priorTime;
if (elapsedInterval >= requiredInterval)
{
Expand Down Expand Up @@ -1016,13 +1020,21 @@ class ThreadpoolMgr

static HillClimbing HillClimbingInstance;

DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) static LONG PriorCompletedWorkRequests;
static DWORD PriorCompletedWorkRequestsTime;
static DWORD NextCompletedWorkRequestsTime;
// the following 3 hot variables are nearly always used together.
// put on a separate cache line.
struct ALIGNED(MAX_CACHE_LINE_SIZE) WorkRequestDataT
{
public:
LONG PriorCompletedWorkRequests;
DWORD PriorCompletedWorkRequestsTime;
DWORD NextCompletedWorkRequestsTime;
};

static WorkRequestDataT WorkRequestData;

static LARGE_INTEGER CurrentSampleStartTime;

static unsigned int WorkerThreadSpinLimit;
static unsigned int ThreadpoolMgr::WorkerThreadSpinLimit;
static bool IsHillClimbingDisabled;
static int ThreadAdjustmentInterval;

Expand All @@ -1044,7 +1056,7 @@ class ThreadpoolMgr
static const DWORD WorkerTimeout = 20 * 1000;
static const DWORD WorkerTimeoutAppX = 5 * 1000; // shorter timeout to allow threads to exit prior to app suspension

DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_DECL(ThreadCounter,WorkerCounter);
SVAL_DECL(ThreadCounter, WorkerCounter);

//
// WorkerSemaphore is an UnfairSemaphore because:
Expand Down