From 6cd5d4049100729a4514189b89dd4c66f5841781 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Tue, 29 May 2018 12:02:00 +0100 Subject: [PATCH 1/4] Threadpool exploration Max one threadpool request outstanding When a steal takes place, store location for next thread to continue (steal) search from that point --- .../src/System/Threading/ThreadPool.cs | 51 +++++++++---------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index ad70726cbe7f..d0484507fa19 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -19,7 +19,6 @@ using System.Runtime.CompilerServices; using System.Runtime.ConstrainedExecution; using System.Runtime.InteropServices; -using System.Security; using Microsoft.Win32; namespace System.Threading @@ -30,8 +29,6 @@ internal static class ThreadPoolGlobals //requests in the current domain. public const uint TP_QUANTUM = 30U; - public static readonly int processorCount = Environment.ProcessorCount; - public static volatile bool vmTpInitialized; public static bool enableWorkerTracking; @@ -43,6 +40,8 @@ internal sealed class ThreadPoolWorkQueue { internal static class WorkStealingQueueList { + internal static int QueueSerachStart; + private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; public static WorkStealingQueue[] Queues => _queues; @@ -380,7 +379,7 @@ public IThreadPoolWorkItem TrySteal(ref bool missedSteal) private Internal.PaddingFor32 pad1; - private volatile int numOutstandingThreadRequests = 0; + private int threadRequestOutstanding = 0; private Internal.PaddingFor32 pad2; @@ -395,23 +394,13 @@ public ThreadPoolWorkQueueThreadLocals EnsureCurrentThreadHasQueue() => internal void EnsureThreadRequested() { - // - // If we have not yet requested #procs threads from the VM, then request a new thread - // as needed + // If we have not yet a thread request outstanding from the VM, then request a new thread // // Note that there is a separate count in the VM which will also be incremented in this case, // which is handled by RequestWorkerThread. - // - int count = numOutstandingThreadRequests; - while (count < ThreadPoolGlobals.processorCount) + if (Interlocked.Exchange(ref threadRequestOutstanding, 1) == 0) { - int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count); - if (prev == count) - { - ThreadPool.RequestWorkerThread(); - break; - } - count = prev; + ThreadPool.RequestWorkerThread(); } } @@ -423,16 +412,7 @@ internal void MarkThreadRequestSatisfied() // Note that there is a separate count in the VM which has already been decremented by the VM // by the time we reach this point. // - int count = numOutstandingThreadRequests; - while (count > 0) - { - int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count); - if (prev == count) - { - break; - } - count = prev; - } + Volatile.Write(ref threadRequestOutstanding, 0); } public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) @@ -475,7 +455,15 @@ public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool int c = queues.Length; Debug.Assert(c > 0, "There must at least be a queue for this thread."); int maxIndex = c - 1; - int i = tl.random.Next(c); + + // Get the current search start and set to -1 to indicate no preference + int i = Interlocked.Exchange(ref WorkStealingQueueList.QueueSerachStart, -1); + if (i == -1) + { + // Current start was -1, so choose a random start + i = tl.random.Next(c); + } + while (c > 0) { i = (i < maxIndex) ? i + 1 : 0; @@ -490,6 +478,13 @@ public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool } c--; } + + if (callback != null) + { + // Set the search start to the next queue, if it is currently set to no preference + i = (i < maxIndex) ? i + 1 : 0; + Interlocked.CompareExchange(ref WorkStealingQueueList.QueueSerachStart, i, -1); + } } return callback; From dbbfa4840e02933a1b6daf4fc5a417766836f100 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Tue, 29 May 2018 22:36:24 +0100 Subject: [PATCH 2/4] Add Volatile.Read guard --- .../src/System/Threading/ThreadPool.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index d0484507fa19..507636208e3b 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -398,7 +398,8 @@ internal void EnsureThreadRequested() // // Note that there is a separate count in the VM which will also be incremented in this case, // which is handled by RequestWorkerThread. - if (Interlocked.Exchange(ref threadRequestOutstanding, 1) == 0) + if (Volatile.Read(ref threadRequestOutstanding) == 0 && + Interlocked.Exchange(ref threadRequestOutstanding, 1) == 0) { ThreadPool.RequestWorkerThread(); } @@ -412,7 +413,10 @@ internal void MarkThreadRequestSatisfied() // Note that there is a separate count in the VM which has already been decremented by the VM // by the time we reach this point. // - Volatile.Write(ref threadRequestOutstanding, 0); + if (Volatile.Read(ref threadRequestOutstanding) == 1) + { + Volatile.Write(ref threadRequestOutstanding, 0); + } } public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) From c705310acf1706087ae6f042c03f1ef524f1c04c Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 30 May 2018 03:44:05 +0100 Subject: [PATCH 3/4] Go back to full random --- .../src/System/Threading/ThreadPool.cs | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index 507636208e3b..770a711ab9ef 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -40,8 +40,6 @@ internal sealed class ThreadPoolWorkQueue { internal static class WorkStealingQueueList { - internal static int QueueSerachStart; - private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; public static WorkStealingQueue[] Queues => _queues; @@ -459,15 +457,7 @@ public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool int c = queues.Length; Debug.Assert(c > 0, "There must at least be a queue for this thread."); int maxIndex = c - 1; - - // Get the current search start and set to -1 to indicate no preference - int i = Interlocked.Exchange(ref WorkStealingQueueList.QueueSerachStart, -1); - if (i == -1) - { - // Current start was -1, so choose a random start - i = tl.random.Next(c); - } - + int i = tl.random.Next(c); while (c > 0) { i = (i < maxIndex) ? i + 1 : 0; @@ -482,13 +472,6 @@ public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool } c--; } - - if (callback != null) - { - // Set the search start to the next queue, if it is currently set to no preference - i = (i < maxIndex) ? i + 1 : 0; - Interlocked.CompareExchange(ref WorkStealingQueueList.QueueSerachStart, i, -1); - } } return callback; From b60449c39d123111a020edbb266461523d214d86 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 30 May 2018 06:12:45 +0100 Subject: [PATCH 4/4] Feedback --- .../src/System/Threading/ThreadPool.cs | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index 770a711ab9ef..151421d33dae 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -390,19 +390,28 @@ public ThreadPoolWorkQueueThreadLocals EnsureCurrentThreadHasQueue() => ThreadPoolWorkQueueThreadLocals.threadLocals ?? (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this)); - internal void EnsureThreadRequested() + internal void RequestThread() { // If we have not yet a thread request outstanding from the VM, then request a new thread // // Note that there is a separate count in the VM which will also be incremented in this case, // which is handled by RequestWorkerThread. - if (Volatile.Read(ref threadRequestOutstanding) == 0 && - Interlocked.Exchange(ref threadRequestOutstanding, 1) == 0) + if (Interlocked.Exchange(ref threadRequestOutstanding, 1) == 0) { ThreadPool.RequestWorkerThread(); } } + internal void EnsureThreadRequested() + { + // Thread is exiting while there are items in the queue, make an unconditional ThreadRequest + // + // Note that there is a separate count in the VM which will also be incremented in this case, + // which is handled by RequestWorkerThread. + Volatile.Write(ref threadRequestOutstanding, 1); + ThreadPool.RequestWorkerThread(); + } + internal void MarkThreadRequestSatisfied() { // @@ -411,10 +420,7 @@ internal void MarkThreadRequestSatisfied() // Note that there is a separate count in the VM which has already been decremented by the VM // by the time we reach this point. // - if (Volatile.Read(ref threadRequestOutstanding) == 1) - { - Volatile.Write(ref threadRequestOutstanding, 0); - } + Volatile.Write(ref threadRequestOutstanding, 0); } public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) @@ -435,7 +441,7 @@ public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) workItems.Enqueue(callback); } - EnsureThreadRequested(); + RequestThread(); } internal bool LocalFindAndPop(IThreadPoolWorkItem callback) @@ -541,9 +547,10 @@ internal static bool Dispatch() // // If we found work, there may be more work. Ask for another thread so that the other work can be processed - // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. + // in parallel. Make sure there is a thread request for the other work. + // It's capped at 1 outstanding thread request so is safe to call it for every dequeue. // - workQueue.EnsureThreadRequested(); + workQueue.RequestThread(); // // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.