diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs index dd7b77f71f94..e71cbe6e7593 100644 --- a/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs @@ -69,8 +69,16 @@ protected internal override void QueueTask(Task task) protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If the task was previously scheduled, and we can't pop it, then return false. - if (taskWasPreviouslyQueued && !ThreadPool.TryPopCustomWorkItem(task)) - return false; + if (taskWasPreviouslyQueued) + { + // do not inline in a nontrivial sync context - it could be stricter than what enqueuer had. + SynchronizationContext? syncCtx = SynchronizationContext.Current; + if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext)) + return false; + + if (!ThreadPool.TryPopCustomWorkItem(task)) + return false; + } try { diff --git a/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs index 68cdc6ceeccf..010e4a37d9e5 100644 --- a/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs @@ -11,20 +11,20 @@ ** =============================================================================*/ -using System.Collections.Concurrent; +using Internal.Runtime.CompilerServices; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Diagnostics.Tracing; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading.Tasks; -using Internal.Runtime.CompilerServices; namespace System.Threading { internal static class ThreadPoolGlobals { + public static readonly int outstandingRequestLimit = Environment.ProcessorCount; + public static volatile bool threadPoolInitialized; public static bool enableWorkerTracking; @@ -46,395 +46,1240 @@ internal static class ThreadPoolGlobals [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing internal sealed class ThreadPoolWorkQueue { - internal static class WorkStealingQueueList + internal class WorkQueueBase { -#pragma warning disable CA1825 // avoid the extra generic instantation for Array.Empty(); this is the only place we'll ever create this array - private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; -#pragma warning restore CA1825 - - public static WorkStealingQueue[] Queues => _queues; + // This implementation provides an unbounded, multi-producer multi-consumer queue + // that supports the standard Enqueue/Dequeue operations. + // It is composed of a linked list of bounded ring buffers, each of which has an enqueue + // and a dequeue index, isolated from each other to minimize false sharing. As long as + // the number of elements in the queue remains less than the size of the current + // buffer (Segment), no additional allocations are required for enqueued items. When + // the number of items exceeds the size of the current segment, the current segment is + // "frozen" to prevent further enqueues, and a new segment is linked from it and set + // as the new tail segment for subsequent enqueues. As old segments are consumed by + // dequeues, the dequeue reference is updated to point to the segment that dequeuers should + // try next. + + /// + /// Initial length of the segments used in the queue. + /// + internal const int InitialSegmentLength = 32; + + /// + /// Maximum length of the segments used in the queue. This is a somewhat arbitrary limit: + /// larger means that as long as we don't exceed the size, we avoid allocating more segments, + /// but if we do exceed it, then the segment becomes garbage. + /// + internal const int MaxSegmentLength = 1024 * 1024; + + /// + /// Lock used to make sure only one thread allocate the new segment. + /// + internal object _addSegmentLock = new object(); + + [StructLayout(LayoutKind.Explicit, Size = 3 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] // padding before/between/after fields + internal struct PaddedQueueEnds + { + [FieldOffset(1 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] public int Dequeue; + [FieldOffset(2 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] public int Enqueue; + } - public static void Add(WorkStealingQueue queue) + internal class QueueSegmentBase { - Debug.Assert(queue != null); - while (true) + // Segment design is inspired by the algorithm outlined at: + // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + + /// The array of items in this queue. Each slot contains the item in that slot and its "sequence number". + internal readonly Slot[] _slots; + + /// Mask for quickly accessing a position within the queue's array. + internal readonly int _slotsMask; + + /// The queue end positions, with padding to help avoid false sharing contention. + internal PaddedQueueEnds _queueEnds; // mutable struct: do not make this readonly + + /// Indicates whether the segment has been marked such that no additional items may be enqueued. + internal bool _frozenForEnqueues; + + internal const int Empty = 0; + internal const int Full = 1; + + /// Creates the segment. + /// + /// The maximum number of elements the segment can contain. Must be a power of 2. + /// + internal QueueSegmentBase(int length) { - WorkStealingQueue[] oldQueues = _queues; - Debug.Assert(Array.IndexOf(oldQueues, queue) == -1); + // Validate the length + Debug.Assert(length >= 2, $"Must be >= 2, got {length}"); + Debug.Assert((length & (length - 1)) == 0, $"Must be a power of 2, got {length}"); + + // Initialize the slots and the mask. The mask is used as a way of quickly doing "% _slots.Length", + // instead letting us do "& _slotsMask". + var slots = new Slot[length]; + _slotsMask = length - 1; + + // Initialize the sequence number for each slot. The sequence number provides a ticket that + // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can + // enqueue. An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer + // for position N can dequeue when the sequence number is N + 1. When an enqueuer is done writing + // at position N, it sets the sequence number to N + 1 so that a dequeuer will be able to dequeue, + // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length, + // so that when an enqueuer loops around the slots, it'll find that the sequence number at + // position N is N. This also means that when an enqueuer finds that at position N the sequence + // number is < N, there is still a value in that slot, i.e. the segment is full, and when a + // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to + // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into + // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc. + // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will + // return false.) + for (int i = 0; i < slots.Length; i++) + { + slots[i].SequenceNumber = i; + } - var newQueues = new WorkStealingQueue[oldQueues.Length + 1]; - Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length); - newQueues[^1] = queue; - if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) + _slots = slots; + } + + /// Represents a slot in the queue. + [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")] + [StructLayout(LayoutKind.Auto)] + internal struct Slot + { + /// The item. + internal object? Item; + /// The sequence number for this slot, used to synchronize between enqueuers and dequeuers. + internal int SequenceNumber; + } + + internal ref Slot this[int i] + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get { - break; + return ref Unsafe.Add(ref Unsafe.As(ref _slots.GetRawSzArrayData()), i & _slotsMask); + } + } + + /// Gets the "freeze offset" for this segment. + internal int FreezeOffset => _slots.Length * 2; + } + } + + [DebuggerDisplay("Count = {Count}")] + internal sealed class GlobalQueue : WorkQueueBase + { + /// The current enqueue segment. + internal GlobalQueueSegment _enqSegment; + /// The current dequeue segment. + internal GlobalQueueSegment _deqSegment; + + /// + /// Initializes a new instance of the class. + /// + internal GlobalQueue() + { + _enqSegment = _deqSegment = new GlobalQueueSegment(InitialSegmentLength); + } + + // for debugging + internal int Count + { + get + { + int count = 0; + for (GlobalQueueSegment? s = _deqSegment; s != null; s = s._nextSegment) + { + count += s.Count; } + return count; } } - public static void Remove(WorkStealingQueue queue) + /// + /// Adds an object to the top of the queue + /// + internal void Enqueue(object item, LocalQueue lQueue) { - Debug.Assert(queue != null); - while (true) + // try enqueuing. Should normally succeed unless we need a new segment. + if (!_enqSegment.TryEnqueue(item, lQueue)) { - WorkStealingQueue[] oldQueues = _queues; - if (oldQueues.Length == 0) + // If we're unable to enque, this segment will never take enqueues again. + // we need to take a slow path that will try adding a new segment. + EnqueueSlow(item, lQueue); + } + } + + /// + /// Slow path for enqueue, adding a new segment if necessary. + /// + private void EnqueueSlow(object item, LocalQueue lQueue) + { + for (; ; ) + { + GlobalQueueSegment currentSegment = _enqSegment; + if (currentSegment.TryEnqueue(item, lQueue)) { return; } - int pos = Array.IndexOf(oldQueues, queue); - if (pos == -1) + // take the lock to add a new segment + // we can make this optimistically lock free, but it is a rare code path + // and we do not want stampeding enqueuers allocating a lot of new segments when only one will win. + lock (_addSegmentLock) { - Debug.Fail("Should have found the queue"); - return; + if (currentSegment == _enqSegment) + { + // Make sure that no more items could be added to the current segment. + // NB: there may be some strugglers still finishing up out-of-order enqueues + // TryDequeue knows how to deal with that. + currentSegment.EnsureFrozenForEnqueues(); + + // We determine the new segment's length based on the old length. + // In general, we double the size of the segment, to make it less likely + // that we'll need to grow again. + int nextSize = Math.Min(currentSegment._slots.Length * 2, MaxSegmentLength); + var newEnq = new GlobalQueueSegment(nextSize); + + // Hook up the new enqueue segment. + currentSegment._nextSegment = newEnq; + _enqSegment = newEnq; + } } + } + } + + /// + /// Removes an object at the bottom of the queue + /// Returns null if the queue is empty. + /// + internal object? Dequeue(LocalQueue lQueue) + { + var currentSegment = _deqSegment; + if (currentSegment.IsEmpty) + { + return null; + } + + object? result = currentSegment.TryDequeue(lQueue); + + if (result == null && currentSegment._nextSegment != null) + { + // slow path that fixes up segments + result = TryDequeueSlow(currentSegment, lQueue); + } - var newQueues = new WorkStealingQueue[oldQueues.Length - 1]; - if (pos == 0) + return result; + } + + /// + /// Slow path for Dequeue, removing frozen segments as needed. + /// + private object? TryDequeueSlow(GlobalQueueSegment currentSegment, LocalQueue lQueue) + { + object? result; + for (; ; ) + { + // At this point we know that this segment has been frozen for additional enqueues. But between + // the time that we ran TryDequeue and checked for a next segment, + // another item could have been added. Try to dequeue one more time + // to confirm that the segment is indeed empty. + Debug.Assert(currentSegment._nextSegment != null); + result = currentSegment.TryDequeue(lQueue); + if (result != null) { - Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length); + return result; } - else if (pos == oldQueues.Length - 1) + + // Current segment is frozen (nothing more can be added) and empty (nothing is in it). + // Update _deqSegment to point to the next segment in the list, assuming no one's beat us to it. + if (currentSegment == _deqSegment) { - Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length); + Interlocked.CompareExchange(ref _deqSegment, currentSegment._nextSegment, currentSegment); } - else + + currentSegment = _deqSegment; + + // Try to take. If we're successful, we're done. + result = currentSegment.TryDequeue(lQueue); + if (result != null) { - Array.Copy(oldQueues, 0, newQueues, 0, pos); - Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos); + return result; } - if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) + // Check to see whether this segment is the last. If it is, we can consider + // this to be a moment-in-time when the queue is empty. + if (currentSegment._nextSegment == null) { - break; + return null; } } } - } - internal sealed class WorkStealingQueue - { - private const int INITIAL_SIZE = 32; - internal volatile object?[] m_array = new object[INITIAL_SIZE]; // SOS's ThreadPool command depends on this name - private volatile int m_mask = INITIAL_SIZE - 1; + /// + /// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full, + /// enqueues fail and return false. When the queue is empty, dequeues fail and return null. + /// These segments are linked together to form the unbounded queue. + /// + /// The "global" flavor of the queue does not support Pop or Remove and that allows for some simplifications. + /// + [DebuggerDisplay("Count = {Count}")] + internal sealed class GlobalQueueSegment : QueueSegmentBase + { + /// The segment following this one in the queue, or null if this segment is the last in the queue. + internal GlobalQueueSegment? _nextSegment; -#if DEBUG - // in debug builds, start at the end so we exercise the index reset logic. - private const int START_INDEX = int.MaxValue; -#else - private const int START_INDEX = 0; -#endif + /// Creates the segment. + /// + /// The maximum number of elements the segment can contain. Must be a power of 2. + /// + internal GlobalQueueSegment(int length) : base(length) { } - private volatile int m_headIndex = START_INDEX; - private volatile int m_tailIndex = START_INDEX; + // for debugging + internal int Count => _queueEnds.Enqueue - _queueEnds.Dequeue; - private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false); + private static int sMaxReadCollisionDelay = Thread.OptimalMaxSpinWaitsPerSpinIteration * Environment.ProcessorCount / 2; - public void LocalPush(object obj) - { - int tail = m_tailIndex; + internal bool IsEmpty + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + // Read Deq and then Enq. If not the same, there could be work for a dequeuer. + // Order of reads is unimportant here since if there is work we are responsible for, we must see it. + // + // NB: Frozen segments have artificially increased Enqueue and will appear as having work even when there are no items. + // And they indeed require work - at very least to retire them. + return _queueEnds.Dequeue == _queueEnds.Enqueue; + } + } - // We're going to increment the tail; if we'll overflow, then we need to reset our counts - if (tail == int.MaxValue) + /// Tries to dequeue an element from the queue. + internal object? TryDequeue(LocalQueue lQueue) { - bool lockTaken = false; - try + // Loop in case of contention... + var spinner = new SpinWait(); + int delay = lQueue._coreContext.LastGlobalDequeueDelay >> 1; + + for (; ; ) { - m_foreignLock.Enter(ref lockTaken); + int position = _queueEnds.Dequeue; + ref Slot slot = ref this[position]; + + // Read the sequence number for the slot. + // Should read before reading Item, but we read Item after CAS, so ordinary read is ok. + int sequenceNumber = slot.SequenceNumber; - if (m_tailIndex == int.MaxValue) + // Check if the slot is considered Full in the current generation. + if (sequenceNumber == position + Full) { - // - // Rather than resetting to zero, we'll just mask off the bits we don't care about. - // This way we don't need to rearrange the items already in the queue; they'll be found - // correctly exactly where they are. One subtlety here is that we need to make sure that - // if head is currently < tail, it remains that way. This happens to just fall out from - // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all - // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible - // for the head to end up > than the tail, since you can't set any more bits than all of - // them. - // - m_headIndex &= m_mask; - m_tailIndex = tail = m_tailIndex & m_mask; - Debug.Assert(m_headIndex <= m_tailIndex); + // Attempt to acquire the slot for Dequeuing. + if (Interlocked.CompareExchange(ref _queueEnds.Dequeue, position + 1, position) == position) + { + var item = slot.Item; + slot.Item = null; + + // make the slot appear empty in the next generation + Volatile.Write(ref slot.SequenceNumber, position + 1 + _slotsMask); + return item; + } + } + else if (sequenceNumber - position < Full) + { + // The sequence number was less than what we needed, which means we cannot return this item. + // Check if we have reached Enqueue and return null indicating the segment is in empty state. + // NB: reading stale _frozenForEnqueues is fine - we would just spin once more + var currentEnqueue = Volatile.Read(ref _queueEnds.Enqueue); + if (currentEnqueue == position || (_frozenForEnqueues && currentEnqueue == position + FreezeOffset)) + { + return null; + } + + // The enqueuer went ahead and took a slot, but it has not finished filling the value. + // We cannot return `null` since the segment is not empty, so we must retry. + // We should be careful though to not starve the enqueuer that we are waiting for. + // In a worst case it may have lower priority than us, so we have to sleep(1) at some point. + // Let the SpinWait handle that. + spinner.SpinOnce(); + continue; + } + + // We have a stale dequeue value. Another dequeuer was quicker than us. + // We should retry with a new dequeue and do not need to wait for anyone. + // However there could be many dequeuers trying the same and only one at a time makes progress. + // Trying and failing has indirect costs as it keeps memory busy, heats up CPU, etc.. + // We will throttle the rate of polling on our side exponentially with every clash. + // + // Also since there is a cost of "learning" a good delay value, we will record the last one that worked + // and use 1/2 as a starting point when we are again in a similar situation. + // + // For more background about spinning see: + // [The Performance of Spin Lock Alternatives for Shared-Memory Multiprocessors] (https://homes.cs.washington.edu/~tom/pubs/spinlock.pdf) + // + lQueue._coreContext.LastGlobalDequeueDelay = delay; + Thread.SpinWait(lQueue.NextRnd() & delay); + + if (delay < sMaxReadCollisionDelay) + { + delay = (delay << 1) + 1; } - } - finally - { - if (lockTaken) - m_foreignLock.Exit(useMemoryBarrier: true); } } - // When there are at least 2 elements' worth of space, we can take the fast path. - if (tail < m_headIndex + m_mask) + /// + /// Attempts to enqueue the item. If successful, the item will be stored + /// in the queue and true will be returned; otherwise, the item won't be stored, the segment will be frozen + /// and false will be returned. + /// + public bool TryEnqueue(object item, LocalQueue lQueue) { - Volatile.Write(ref m_array[tail & m_mask], obj); - m_tailIndex = tail + 1; - } - else - { - // We need to contend with foreign pops, so we lock. - bool lockTaken = false; - try + // Loop in case of contention... + int delay = lQueue._coreContext.LastGlobalEnqueueDelay >> 1; + for (; ; ) { - m_foreignLock.Enter(ref lockTaken); + int position = _queueEnds.Enqueue; + ref Slot slot = ref this[position]; - int head = m_headIndex; - int count = m_tailIndex - m_headIndex; + // Read the sequence number for the enqueue position. + // Should read before writing Item, but our write is after CAS, so ordinary read is ok. + int sequenceNumber = slot.SequenceNumber; - // If there is still space (one left), just add the element. - if (count >= m_mask) + // The slot is empty and ready for us to enqueue into it if its sequence number matches the slot. + if (sequenceNumber == position) { - // We're full; expand the queue by doubling its size. - var newArray = new object?[m_array.Length << 1]; - for (int i = 0; i < m_array.Length; i++) - newArray[i] = m_array[(i + head) & m_mask]; - - // Reset the field values, incl. the mask. - m_array = newArray; - m_headIndex = 0; - m_tailIndex = tail = count; - m_mask = (m_mask << 1) | 1; + // Reserve the slot for Enqueuing. + if (Interlocked.CompareExchange(ref _queueEnds.Enqueue, position + 1, position) == position) + { + slot.Item = item; + Volatile.Write(ref slot.SequenceNumber, position + Full); + return true; + } } + else if (sequenceNumber - position < 0) + { + // The sequence number was less than what we needed, which means we have caught up with previous generation + // Technically it's possible that we have dequeuers in progress and spaces are or about to be available. + // We still would be better off with a new segment. + return false; + } + + // Lost a race to another enqueue. Need to retry, but throttle the rate of retries if keep failing. + // See more detailed coments on the same pattern in TryDequeue. + lQueue._coreContext.LastGlobalEnqueueDelay = delay; + Thread.SpinWait(lQueue.NextRnd() & delay); - Volatile.Write(ref m_array[tail & m_mask], obj); - m_tailIndex = tail + 1; + if (delay < sMaxReadCollisionDelay) + { + delay = (delay << 1) + 1; + } } - finally + } + + internal void EnsureFrozenForEnqueues() + { + // flag used to ensure we don't increase the enqueue more than once + if (!_frozenForEnqueues) { - if (lockTaken) - m_foreignLock.Exit(useMemoryBarrier: false); + // Increase the enqueue by FreezeOffset atomically. + // enqueuing will be impossible after that + // dequeuers would need to dequeue 2 generations to catch up, and they can't + Interlocked.Add(ref _queueEnds.Enqueue, FreezeOffset); + _frozenForEnqueues = true; } } } + } - public bool LocalFindAndPop(object obj) + /// + /// The "local" flavor of the queue is similar to the "global", but also supports Pop, Remove and Rob operations. + /// + /// - Pop is used to implement Busy-Leaves scheduling strategy. + /// - Remove is used when the caller finds it benefitial to execute a workitem "inline" after it has been scheduled. + /// (such as waiting on a task completion). + /// - Rob is used to rebalance queues if one is found to be "rich" by stealing 1/2 of its queue. + /// (this way we avoid having to continue stealing from the same single queue, Rob may lead to 2 rich queues, then 4, then ...) + /// + /// We create multiple local queues and softly affinitize them with CPU cores. + /// + [DebuggerDisplay("Count = {Count}")] + internal sealed class LocalQueue : WorkQueueBase + { + /// The current enqueue segment. + internal LocalQueueSegment _enqSegment; + /// The current dequeue segment. + internal LocalQueueSegment _deqSegment; + + // The above fields change infrequently, but accessible from other threads when stealing. + // The fields below are more dynamic, but most of the use is from the same core thus there is little sharing. + // separate this set in a padded struct + + [StructLayout(LayoutKind.Explicit, Size = 2 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] // padding before/after fields + internal struct CoreContext { - // Fast path: check the tail. If equal, we can skip the lock. - if (m_array[(m_tailIndex - 1) & m_mask] == obj) - { - object? unused = LocalPop(); - Debug.Assert(unused == null || unused == obj); - return unused != null; - } + [FieldOffset(1 * Internal.PaddingHelpers.CACHE_LINE_SIZE + 0 * sizeof(uint))] internal uint rndVal; + [FieldOffset(1 * Internal.PaddingHelpers.CACHE_LINE_SIZE + 1 * sizeof(uint))] internal int LastGlobalEnqueueDelay; + [FieldOffset(1 * Internal.PaddingHelpers.CACHE_LINE_SIZE + 2 * sizeof(uint))] internal int LastGlobalDequeueDelay; + } + + internal CoreContext _coreContext = new CoreContext() { rndVal = 6247 }; - // Else, do an O(N) search for the work item. The theory of work stealing and our - // inlining logic is that most waits will happen on recently queued work. And - // since recently queued work will be close to the tail end (which is where we - // begin our search), we will likely find it quickly. In the worst case, we - // will traverse the whole local queue; this is typically not going to be a - // problem (although degenerate cases are clearly an issue) because local work - // queues tend to be somewhat shallow in length, and because if we fail to find - // the work item, we are about to block anyway (which is very expensive). - for (int i = m_tailIndex - 2; i >= m_headIndex; i--) + // Very cheap random sequence generator. We keep one per-local queue. + // We do not need a lot of randomness, I think even _rnd++ would be fairly good here. + // Sequences attached to different queues go out of sync quickly and that could be sufficient. + // However this sequence is a bit more random at a very modest additional cost. + // [Fast High Quality Parallel Random Number] (http://www.drdobbs.com/tools/fast-high-quality-parallel-random-number/231000484) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal int NextRnd() + { + var r = _coreContext.rndVal; + r -= Numerics.BitOperations.RotateRight(r, 11); + return (int)(_coreContext.rndVal = r); + } + + /// + /// Initializes a new instance of the class. + /// + internal LocalQueue() + { + _addSegmentLock = new object(); + _enqSegment = _deqSegment = new LocalQueueSegment(InitialSegmentLength); + } + + // for debugging + internal int Count + { + get { - if (m_array[i & m_mask] == obj) + int count = 0; + for (LocalQueueSegment? s = _deqSegment; s != null; s = s._nextSegment) { - // If we found the element, block out steals to avoid interference. - bool lockTaken = false; - try - { - m_foreignLock.Enter(ref lockTaken); + count += s.Count; + } + return count; + } + } - // If we encountered a race condition, bail. - if (m_array[i & m_mask] == null) - return false; + /// + /// Adds an object to the top of the queue + /// + internal void Enqueue(object item) + { + // try enqueuing. Should normally succeed unless we need a new segment. + if (!_enqSegment.TryEnqueue(item)) + { + // If we're unable to enque, this segment is full. + // we need to take a slow path that will try adding a new segment. + EnqueueSlow(item); + } + } - // Otherwise, null out the element. - Volatile.Write(ref m_array[i & m_mask], null); + /// + /// Slow path for Enqueue, adding a new segment if necessary. + /// + private void EnqueueSlow(object item) + { + LocalQueueSegment currentSegment = _enqSegment; + for (; ; ) + { + if (currentSegment.TryEnqueue(item)) + { + return; + } + currentSegment = EnsureNextSegment(currentSegment); + } + } - // And then check to see if we can fix up the indexes (if we're at - // the edge). If we can't, we just leave nulls in the array and they'll - // get filtered out eventually (but may lead to superfluous resizing). - if (i == m_tailIndex) - m_tailIndex--; - else if (i == m_headIndex) - m_headIndex++; + private LocalQueueSegment EnsureNextSegment(LocalQueueSegment currentSegment) + { + var nextSegment = currentSegment._nextSegment; + if (nextSegment != null) + { + return nextSegment; + } - return true; - } - finally - { - if (lockTaken) - m_foreignLock.Exit(useMemoryBarrier: false); - } + // take the lock to add a new segment + // we can make this optimistically lock free, but it is a rare code path + // and we do not want stampeding enqueuers allocating a lot of new segments when only one will win. + lock (_addSegmentLock) + { + if (currentSegment._nextSegment == null) + { + // We determine the new segment's length based on the old length. + // In general, we double the size of the segment, to make it less likely + // that we'll need to grow again. + int nextSize = Math.Min(currentSegment._slots.Length * 2, MaxSegmentLength); + var newEnq = new LocalQueueSegment(nextSize); + + // Hook up the new enqueue segment. + currentSegment._nextSegment = newEnq; + _enqSegment = newEnq; } } - return false; + return currentSegment._nextSegment; } - public object? LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null; + /// + /// Removes an object at the bottom of the queue + /// Returns null if the queue is empty or if there is a contention + /// (no point to dwell on one local queue and make problem worse when there are other queues). + /// + internal object? Dequeue(ref bool missedSteal) + { + var currentSegment = _deqSegment; + object? result = currentSegment.TryDequeue(ref missedSteal); - private object? LocalPopCore() + // if there is a new segment, we must help with retiring the current. + if (result == null && currentSegment._nextSegment != null) + { + result = TryDequeueSlow(currentSegment, ref missedSteal); + } + + return result; + } + + /// + /// Tries to dequeue an item, removing frozen segments as needed. + /// + private object? TryDequeueSlow(LocalQueueSegment currentSegment, ref bool missedSteal) { - while (true) + object? result; + for (; ; ) { - int tail = m_tailIndex; - if (m_headIndex >= tail) + // At this point we know that this segment has been frozen for additional enqueues. But between + // the time that we ran TryDequeue and checked for a next segment, + // another item could have been added. Try to dequeue one more time + // to confirm that the segment is indeed empty. + Debug.Assert(currentSegment._nextSegment != null); + + // spin through missed steals, we must know for sure the segment is quiescent and empty + bool localMissedSteal; + do { - return null; + localMissedSteal = false; + result = currentSegment.TryDequeue(ref localMissedSteal); + if (result != null) + { + return result; + } + } while (localMissedSteal == true); + + // Current segment is frozen (nothing more can be added) and empty (nothing is in it). + // Update _deqSegment to point to the next segment in the list, assuming no one's beat us to it. + if (currentSegment == _deqSegment) + { + Interlocked.CompareExchange(ref _deqSegment, currentSegment._nextSegment, currentSegment); } - // Decrement the tail using a fence to ensure subsequent read doesn't come before. - tail--; - Interlocked.Exchange(ref m_tailIndex, tail); + currentSegment = _deqSegment; - // If there is no interaction with a take, we can head down the fast path. - if (m_headIndex <= tail) + // Try to dequeue. If we're successful, we're done. + result = currentSegment.TryDequeue(ref missedSteal); + if (result != null) { - int idx = tail & m_mask; - object? obj = Volatile.Read(ref m_array[idx]); + return result; + } + + // Check to see whether this segment is the last. If it is, we can consider + // this to be a moment-in-time when the queue is empty. + if (currentSegment._nextSegment == null) + { + return null; + } + } + } - // Check for nulls in the array. - if (obj == null) continue; + /// + /// Pops an item from the top of the queue. + /// Returns null if there is nothing to pop or there is a contention. + /// + internal object? TryPop() + { + return _enqSegment.TryPop(); + } - m_array[idx] = null; - return obj; + /// + /// Performs a search for the given item in the queue and removes the item if found. + /// Returns true if item was indeed removed. + /// Returns false if item was not found or is no longer inlineable. + /// + internal bool TryRemove(Task callback) + { + var enqSegment = _enqSegment; + if (enqSegment.TryRemove(callback)) + { + return true; + } + + for (LocalQueueSegment? segment = _deqSegment; + segment != null && segment != enqSegment; + segment = segment._nextSegment) + { + if ((callback.m_stateFlags & TASK_STATE_COMPLETED_OR_INVOKED) != 0) + { + // task cannot be inlined + return false; } - else + + if (segment.TryRemove(callback)) { - // Interaction with takes: 0 or 1 elements left. - bool lockTaken = false; - try - { - m_foreignLock.Enter(ref lockTaken); + return true; + } + } - if (m_headIndex <= tail) - { - // Element still available. Take it. - int idx = tail & m_mask; - object? obj = Volatile.Read(ref m_array[idx]); + return false; + } - // Check for nulls in the array. - if (obj == null) continue; + /// + /// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full, + /// enqueues fail and return false. When the queue is empty, dequeues fail and return null. + /// These segments are linked together to form the unbounded queue. + /// + /// The main difference of "local" segment from "global" is support for Pop as needed by the Busy Leaves algorithm. + /// For more details on Busy Leaves see: + /// [Scheduling Multithreaded Computations by Work Stealing] (http://supertech.csail.mit.edu/papers/steal.pdf) + /// + /// Supporting Pop leads to some additional complexity compared to "global" segment. + /// In particular enqueue index may move both forward and backward and therefore we cannot rely on atomic + /// update of the enqueue index as a way to acquire access to an Enqueue/Pop slot. + /// Another issue that complicates things is that all operation (Dequeue, Enqueue, Pop, etc.) may be concurrently + /// performed and should coordinate access to the slots when necessary. + /// + /// We resolve these issues by observing the following rules: + /// + /// - In an empty segment dequeue and enqueue ends point to the same slot. This is also the initial state. + /// + /// - The enqueue end is never behind the dequeue end. + /// + /// - Slots in "Full" state form a contiguous range. + /// + /// - Dequeue operation must acquire access to the dequeue slot by atomically moving it to the "Change" state. + /// Setting the slot to the next state can be done via regular write. After the Dequeue is complete and dequeue end is moved forward. + /// + /// - Pop and Enqueue operation must acquire the access to enqueue slot by atomically moving the previous slot to the "Change" state. + /// Setting the next state can be done via regular write. After the Enqueue/Pop is complete and enqueue end is moved appropriately. + /// + /// To summarise: + /// In a rare case of concurrent Pop and Enqueue, the threads coordinate the access by locking the same slot. + /// Similarly, concurrent Dequeues will coordinate the access on the other end of the segment. + /// When the segment shrinks to just 1 element, all Pop/Enqueue/Dequeue operations would end up using the same coordinating slot. + /// This indirectly guarantees that concurrent Dequeue and Pop cannot use the same slot and move enqueue/dequeue ends across each other. + /// + /// - Rob/Remove operations get exclusive access to appropriate ranges of slots by setting "Change" on both sides of the range. + /// + /// - The enqueue and dequeue ends are updated only when corresponding slots are in the "Change" state. + /// + /// - It is possible, in a case of a contention, to move a wrong slot to the "Change" state. + /// (Ex: the slot is not longer previous to an enqueue slot because enqueue index has moved forward) + /// When such situation is possible, it can be detected by re-examining the condition after the slot has moved to "Change" state. + /// This kind of contentions are rare and handled by reverting the slot(s) to the original state and performing an appropriate backoff. + /// The reverting of failed enqueue slot is done via CAS - in case if the slot has changed the state due to robbing (that new state should win). + /// The reverting of failed dequeue slot is an ordinary write. Since dequeue moves monotonically, it cannot end up in a robbing range. + /// + /// + [DebuggerDisplay("Count = {Count}")] + internal sealed class LocalQueueSegment : QueueSegmentBase + { + /// The segment following this one in the queue, or null if this segment is the last in the queue. + internal LocalQueueSegment? _nextSegment; + + /// + /// Another state of the slot in addition to Empty and Full. + /// "Change" means that the slot is reserved for possible modifications. + /// The state is used for mutual communication between Enqueue/Dequeue/Pop/Remove. + /// NB: Enqueue reserves the slot "to the left" of the slot that is targeted by Enqueue. + /// This ensures that "Full" slots occupy a contiguous range (not a requirement and is not true for the "global" flavor of the queue) + /// + private const int Change = 2; + + /// + /// When a segment has more than this, we steal half of its slots. + /// + private const int RichCount = 32; + + /// Creates the segment. + /// + /// The maximum number of elements the segment can contain. Must be a power of 2. + /// + internal LocalQueueSegment(int length) : base(length) { } + + /// + /// Attempts to enqueue the item. If successful, the item will be stored + /// in the queue and true will be returned; otherwise, the item won't be stored, and false + /// will be returned. + /// + internal bool TryEnqueue(object item) + { + // Loop in case of contention... + int position = _queueEnds.Enqueue; + for (; ; ) + { + ref Slot prevSlot = ref this[position - 1]; + int prevSequenceNumber = prevSlot.SequenceNumber; + ref Slot slot = ref this[position]; - m_array[idx] = null; - return obj; - } - else + // check if prev slot is empty in the next generation or full + // otherwise retry - we have some kind of race, most likely the prev item is being dequeued + if (prevSequenceNumber == position + _slotsMask | prevSequenceNumber == position) + { + // lock the previous slot (so noone could dequeue past us, pop the prev slot or enqueue into the same position) + if (Interlocked.CompareExchange(ref prevSlot.SequenceNumber, prevSequenceNumber + Change, prevSequenceNumber) == prevSequenceNumber) { - // If we encountered a race condition and element was stolen, restore the tail. - m_tailIndex = tail + 1; - return null; + // confirm that enqueue did not change while we were locking the slot + // it is extremely rare, but we may see another Pop or Enqueue on the same segment. + if (_queueEnds.Enqueue == position) + { + // Successfully locked prev slot. + // is the slot empty? (most common path) + int sequenceNumber = slot.SequenceNumber; + if (sequenceNumber == position) + { + // update Enqueue - must do before marking the slot full. + // otherwise someone could lock the full slot while having stale Enqueue. + _queueEnds.Enqueue = position + 1; + slot.Item = item; + + // make the slot appear full in the current generation. + // since the slot on the left is still locked, only poppers/enqueuers can use it, but can use immediately + Volatile.Write(ref slot.SequenceNumber, position + Full); + + // unlock prev slot + // must be after we moved enq to the next slot, or someone may pop prev and break continuity of full slots. + prevSlot.SequenceNumber = prevSequenceNumber; + return true; + } + + // do we see the prev generation? + if (position - sequenceNumber > 0) + { + // Set Enqueue to throw off anyone else trying to enqueue or pop, unless we have already done that. + // we need a fence between writing to Enqueue and unlocking, but we unlock with a CAS anyways + _queueEnds.Enqueue = position + FreezeOffset; + _frozenForEnqueues = true; + } + } + + // enqueue changed or segment is full (rare cases) + // unlock the slot through CAS in case slot was robbed + Interlocked.CompareExchange(ref prevSlot.SequenceNumber, prevSequenceNumber, prevSequenceNumber + Change); } } - finally + + if (_frozenForEnqueues) { - if (lockTaken) - m_foreignLock.Exit(useMemoryBarrier: false); + return false; } + + // Lost a race. Most likely to the dequeuer of the last remaining item, which will be gone shortly. Try again. + // NOTE: We only need a compiler fence here. As long as we re-read Enqueue, it could be an ordinary read. + // This is not a common code path though, so volatile read will do. + position = Volatile.Read(ref _queueEnds.Enqueue); } } - } - public bool CanSteal => m_headIndex < m_tailIndex; + // for debugging + internal int Count => _queueEnds.Enqueue - _queueEnds.Dequeue; - public object? TrySteal(ref bool missedSteal) - { - while (true) + internal object? TryPop() { - if (CanSteal) + for (; ; ) { - bool taken = false; - try + int position = _queueEnds.Enqueue - 1; + ref Slot slot = ref this[position]; + + // Read the sequence number for the slot. + int sequenceNumber = slot.SequenceNumber; + + // Check if the slot is considered Full in the current generation (other likely state - Empty). + if (sequenceNumber == position + Full) { - m_foreignLock.TryEnter(ref taken); - if (taken) + // lock the slot. + if (Interlocked.CompareExchange(ref slot.SequenceNumber, position + Change, sequenceNumber) == sequenceNumber) { - // Increment head, and ensure read of tail doesn't move before it (fence). - int head = m_headIndex; - Interlocked.Exchange(ref m_headIndex, head + 1); - - if (head < m_tailIndex) + // confirm that enqueue did not change while we were locking the slot + // it is extremely rare, but we may see another Pop or Enqueue on the same segment. + // if (_queueEnds.Enqueue == position + 1) + if (_queueEnds.Enqueue == sequenceNumber) { - int idx = head & m_mask; - object? obj = Volatile.Read(ref m_array[idx]); + var item = slot.Item; + slot.Item = null; + + // update Enqueue before marking slot empty. - if enqueue update is later than that it may happen after the slot is enqueued. + _queueEnds.Enqueue = position; + + // make the slot appear empty in the current generation and update enqueue + // that unlocks the slot + Volatile.Write(ref slot.SequenceNumber, position); - // Check for nulls in the array. - if (obj == null) continue; + if (item == null) + { + // item was removed + // this is not a lost race though, so continue. + continue; + } - m_array[idx] = null; - return obj; + return item; } else { - // Failed, restore head. - m_headIndex = head; + // enqueue changed, in this rare case we just retry. + // unlock the slot through CAS in case the slot was robbed + Interlocked.CompareExchange(ref slot.SequenceNumber, sequenceNumber, position + Change); + continue; } } } - finally + + // found no items or encountered a contention (most likely with a dequeuer) + return null; + } + } + + /// + /// Tries to dequeue an element from the queue. + /// + /// "missedSteal" is set to true when we find the segment in a state where we cannot take an element and + /// cannot claim the segment is empty. + /// That generally happens when another thread did or is doing modifications and we do not see all the changes. + /// We could spin here until we see a consistent state, but it makes more sense to look in other queues. + /// + internal object? TryDequeue(ref bool missedSteal) + { + for (; ; ) + { + int position = _queueEnds.Dequeue; + + // if prev is not empty (in next generation), there might be more work in the segment. + // NB: enqueues are initiated by CAS-locking the prev slot. + // it is unkikely, but theoretically possible that we will arrive here and see only that, + // while all other changes are still write-buffered in the other core. + // We cannot claim that the queue is empty, and should treat this as a missed steal + // lest we risk that noone comes for this workitem, ever... + // Also make sure we read the prev slot before reading the actual slot, reading after is pointless. + if (!missedSteal) { - if (taken) - m_foreignLock.Exit(useMemoryBarrier: false); + missedSteal = Volatile.Read(ref this[position - 1].SequenceNumber) != (position + _slotsMask); } + // Read the sequence number for the slot. + ref Slot slot = ref this[position]; + int sequenceNumber = slot.SequenceNumber; + + // Check if the slot is considered Full in the current generation. + int diff = sequenceNumber - position; + if (diff == Full) + { + // Reserve the slot for Dequeuing. + if (Interlocked.CompareExchange(ref slot.SequenceNumber, position + Change, sequenceNumber) == sequenceNumber) + { + object? item; + var enqPos = _queueEnds.Enqueue; + + if (enqPos - position < RichCount || + // take from the rich and give to the needy!! + // NB: "this" is a sentinel for a failed robbing attempt + (item = TryRob(position, enqPos)) == this) + { + _queueEnds.Dequeue = position + 1; + item = slot.Item; + slot.Item = null; + } + + // unlock the slot for enqueuing by making the slot empty in the next generation + Volatile.Write(ref slot.SequenceNumber, position + 1 + _slotsMask); + + if (item == null) + { + // the item was removed, so we have nothing to return. + // this is not a lost race though, so must continue. + continue; + } + + return item; + } + } + else if (diff == 0) + { + // reached an empty slot + // since full slots are contiguous, finding an empty slot means that + // for our purposes and for the moment in time the segment is empty + return null; + } + + // contention with other thread + // must check this segment again later missedSteal = true; + return null; } + } - return null; + internal bool IsEmpty + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + int position = _queueEnds.Dequeue; + int sequenceNumber = this[position].SequenceNumber; + + // "position == sequenceNumber" means that we have reached an empty slot. + // since full slots are contiguous, finding an empty slot means that + // for our purposes and for the moment in time the segment is empty + return position == sequenceNumber; + } } - } - public int Count - { - get + internal object? TryRob(int deqPosition, int enqPosition) { - bool lockTaken = false; - try + LocalQueueSegment other = ThreadPoolGlobals.workQueue.GetOrAddLocalQueue()._enqSegment; + if (this != other) { - m_foreignLock.Enter(ref lockTaken); - return Math.Max(0, m_tailIndex - m_headIndex); + // same stanza as in TryEnqueue + int otherEnqPosition = other._queueEnds.Enqueue; + ref Slot enqPrevSlot = ref other[otherEnqPosition - 1]; + int prevSequenceNumber = enqPrevSlot.SequenceNumber; + + var srcSlotsMask = _slotsMask; + // mask in case the segment is frozen and enqueue is inflated + var count = (enqPosition - deqPosition) & srcSlotsMask; + int halfPosition = deqPosition + count / 2; + ref Slot halfSlot = ref this[halfPosition]; + + // unlike Enqueue, we require prev slot be empty + // not just to prevent rich getting richer + // we also do not want a possibility that the same segment is both robbed from and robbed to, which would be messy + if (prevSequenceNumber == otherEnqPosition + other._slotsMask) + { + // lock the other segment for enqueuing + if (Interlocked.CompareExchange(ref enqPrevSlot.SequenceNumber, prevSequenceNumber + Change, prevSequenceNumber) == prevSequenceNumber) + { + // confirm that enqueue did not change while we were locking the slot + // it is extremely rare, but we may see another Pop or Enqueue on the same segment. + if (other._queueEnds.Enqueue == otherEnqPosition) + { + // lock halfslot, it must be full + if (Interlocked.CompareExchange(ref halfSlot.SequenceNumber, halfPosition + Change, halfPosition + Full) == halfPosition + Full) + { + // our enqueue could have changed before we locked half + // make sure that half-way slot is still before enqueue + // in fact give it more space - we do not want to rob all the items, especially if someone else popping them fast. + var enq = deqPosition + ((_queueEnds.Enqueue - deqPosition) & _slotsMask); + if (enq - halfPosition > (RichCount / 4)) + { + int i = deqPosition, j = otherEnqPosition; + ref Slot last = ref this[i++]; + + while (true) + { + ref Slot next = ref this[i]; + ref Slot to = ref other[j]; + + // the other slot must be empty + // next slot must be full + if (to.SequenceNumber != j | next.SequenceNumber != i + Full) + { + break; + } + + to.Item = last.Item; + // NB: enables "to" for dequeuing, which may immediately happen, + // but not for popping, yet - since the other enq is locked + Volatile.Write(ref to.SequenceNumber, j + Full); + + last.Item = null; + + // we are going to take from next, mark it empty already + next.SequenceNumber = i + 1 + srcSlotsMask; + last = ref next; + + i++; + j++; + } + + // return the last slot value + // (it should already be marked empty, or will be, if it is at deqPosition) + var result = last.Item; + last.Item = null; + + // restore the half slot, must be after all the full->empty slot transitioning + // to make sure that poppers cannot see robbed slots as still incorrectly full when moving to the left of half. + Volatile.Write(ref halfSlot.SequenceNumber, halfPosition + Full); + + // advance the other enq, must be done before unlocking other prev slot, or someone could pop prev once unlocked. + // enables enq/pop + other._queueEnds.Enqueue = j; + + // advance Dequeue, must be after halfSlot is restored - someone could immediately start robbing. + Volatile.Write(ref _queueEnds.Dequeue, i); + + // unlock other prev slot + // must be after we moved other enq to the next slot, or someone may pop prev and break continuity of full slots. + enqPrevSlot.SequenceNumber = prevSequenceNumber; + return result; + } + + // failed to lock the half-way slot. + // restore via CAS, in case target slot has been robbed to + Interlocked.CompareExchange(ref halfSlot.SequenceNumber, halfPosition + Full, halfPosition + Change); + } + } + + // failed to lock actual enqueue end, restore with CAS, in case target slot has been robbed to/from + Interlocked.CompareExchange(ref enqPrevSlot.SequenceNumber, prevSequenceNumber, prevSequenceNumber + Change); + } + } } - finally + + // "this" is a sentinel value for a failed robbing attempt + return this; + } + + /// + /// Searches for the given callback and removes it. + /// Returns "true" if actually removed the item. + /// + internal bool TryRemove(Task callback) + { + for (int position = _queueEnds.Enqueue - 1; ; position--) { - if (lockTaken) + if ((callback.m_stateFlags & TASK_STATE_COMPLETED_OR_INVOKED) != 0) { - m_foreignLock.Exit(useMemoryBarrier: false); + // task cannot be inlined + return false; + } + + ref Slot slot = ref this[position]; + if (slot.Item == callback) + { + // lock Dequeue (so that the slot would not be robbed while we are removing) + var deqPosition = _queueEnds.Dequeue; + ref var deqSlot = ref this[deqPosition]; + if (Interlocked.CompareExchange(ref deqSlot.SequenceNumber, deqPosition + Change, deqPosition + Full) == deqPosition + Full) + { + // lock the slot, + // unless it is the same as Dequeue, in which case we have already locked it + if (position == deqPosition || + Interlocked.CompareExchange(ref slot.SequenceNumber, position + Change, position + Full) == position + Full) + { + // Successfully locked the slot. + // check if the item is still there + if (slot.Item == callback) + { + slot.Item = null; + + // unlock the slot. + // must happen after setting slot to null + Volatile.Write(ref slot.SequenceNumber, position + Full); + + // unlock Dequeue (if different) and return success. + if (position != deqPosition) + { + deqSlot.SequenceNumber = deqPosition + Full; + } + return true; + } + + // unlock the slot and exit + if (position != deqPosition) + { + slot.SequenceNumber = position + Full; + } + } + + // unlock Dequeue + deqSlot.SequenceNumber = deqPosition + Full; + } + + // lost the item to someone else, will not see it again + break; + } + else if (slot.SequenceNumber - position > Change) + { + // reached the next gen + break; } } + return false; } } } + internal readonly LocalQueue[] _localQueues; + internal readonly GlobalQueue _globalQueue = new GlobalQueue(); + internal bool loggingEnabled; - internal readonly ConcurrentQueue workItems = new ConcurrentQueue(); // SOS's ThreadPool command depends on this name private readonly Internal.PaddingFor32 pad1; - - private volatile int numOutstandingThreadRequests = 0; - + private int numOutstandingThreadRequests = 0; private readonly Internal.PaddingFor32 pad2; - public ThreadPoolWorkQueue() + internal ThreadPoolWorkQueue() { loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); + _localQueues = new LocalQueue[RoundUpToPowerOf2(Environment.ProcessorCount)]; + } + + /// + /// Round the specified value up to the next power of 2, if it isn't one already. + /// + private static int RoundUpToPowerOf2(int i) + { + // Based on https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 + --i; + i |= i >> 1; + i |= i >> 2; + i |= i >> 4; + i |= i >> 8; + i |= i >> 16; + return i + 1; + } + + /// + /// Returns a local queue softly affinitized with the current thread. + /// + internal LocalQueue? GetLocalQueue() + { + return _localQueues[GetLocalQueueIndex()]; } - public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() => - ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals(); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal LocalQueue GetOrAddLocalQueue() + { + var index = GetLocalQueueIndex(); + var result = _localQueues[index]; + + if (result == null) + { + result = EnsureLocalQueue(index); + } + + return result; + } [MethodImpl(MethodImplOptions.NoInlining)] - private ThreadPoolWorkQueueThreadLocals CreateThreadLocals() + private LocalQueue EnsureLocalQueue(int index) { - Debug.Assert(ThreadPoolWorkQueueThreadLocals.threadLocals == null); + var newQueue = new LocalQueue(); + Interlocked.CompareExchange(ref _localQueues[index], newQueue, null!); + return _localQueues[index]; + } - return ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this); + internal int GetLocalQueueIndex() + { + return GetLocalQueueIndex(Threading.Thread.GetCurrentProcessorId()); } - internal void EnsureThreadRequested() + internal int GetLocalQueueIndex(int procId) + { + return procId & (_localQueues.Length - 1); + } + + internal void RequestThread() { // // If we have not yet requested #procs threads, then request a new thread. // - // CoreCLR: Note that there is a separate count in the VM which has already been incremented - // by the VM by the time we reach this point. - // int count = numOutstandingThreadRequests; - while (count < Environment.ProcessorCount) + while (count < ThreadPoolGlobals.outstandingRequestLimit) { int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count); if (prev == count) @@ -442,15 +1287,35 @@ internal void EnsureThreadRequested() ThreadPool.RequestWorkerThread(); break; } + count = prev; } } + internal void EnsureThreadRequested() + { + if (numOutstandingThreadRequests == 0 && + Interlocked.CompareExchange(ref numOutstandingThreadRequests, 1, 0) == 0) + { + ThreadPool.RequestWorkerThread(); + } + } + + internal void KeepThread() + { + // + // The thread is leaving to VM, but we like the current concurrency level. + // Don't bother about numOutstandingThreadRequests. Just ask for a thread. + // + Interlocked.Increment(ref numOutstandingThreadRequests); + ThreadPool.RequestWorkerThread(); + } + internal void MarkThreadRequestSatisfied() { // // One of our outstanding thread requests has been satisfied. - // Decrement the count so that future calls to EnsureThreadRequested will succeed. + // Decrement the count so that future calls to RequestThread will succeed. // // CoreCLR: Note that there is a separate count in the VM which has already been decremented // by the VM by the time we reach this point. @@ -474,55 +1339,90 @@ public void Enqueue(object callback, bool forceGlobal) if (loggingEnabled) System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); - ThreadPoolWorkQueueThreadLocals? tl = null; - if (!forceGlobal) - tl = ThreadPoolWorkQueueThreadLocals.threadLocals; - - if (null != tl) + var localQueue = GetOrAddLocalQueue(); + if (forceGlobal) { - tl.workStealingQueue.LocalPush(callback); + _globalQueue.Enqueue(callback, localQueue); } else { - workItems.Enqueue(callback); + localQueue.Enqueue(callback); } - EnsureThreadRequested(); + // make sure there is at least one worker request + // as long as there is one worker to come, this item will be noticed. + // in fact ask anyways, up to #proc + RequestThread(); } - internal bool LocalFindAndPop(object callback) + /// + /// Task in these states cannot be inlined + /// + private const int TASK_STATE_COMPLETED_OR_INVOKED = Task.TASK_STATE_CANCELED | Task.TASK_STATE_FAULTED | Task.TASK_STATE_RAN_TO_COMPLETION | Task.TASK_STATE_DELEGATE_INVOKED; + + internal bool TryRemove(Task callback) { - ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals; - return tl != null && tl.workStealingQueue.LocalFindAndPop(callback); + int localQueueIndex = GetLocalQueueIndex(); + LocalQueue? localQueue = _localQueues[localQueueIndex]; + if (localQueue != null && localQueue.TryRemove(callback)) + { + return true; + } + + LocalQueue[] queues = _localQueues; + for (int i = 0; i < queues.Length; ++i) + { + if (i == localQueueIndex) + { + continue; + } + + if ((callback.m_stateFlags & TASK_STATE_COMPLETED_OR_INVOKED) != 0) + { + // task cannot be inlined + return false; + } + + localQueue = queues[i]; + if (localQueue != null && localQueue.TryRemove(callback)) + { + return true; + } + } + + return false; } - public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) + public object? DequeueAny(ref bool missedSteal, LocalQueue localQueue) { - WorkStealingQueue localWsq = tl.workStealingQueue; - object? callback; + // We come here after Pop failed. + // We look at the global queue and then do a sweep through all local queues for any work remaining. + // + // However, in a rare case when local queue has multiple segments, do a local Dequeue first + // to ensure that continuously nonempty global queue does not delay retirement of old segments. + if (localQueue._enqSegment != localQueue._deqSegment) + { + object? locallyDequeued = localQueue.Dequeue(ref missedSteal); + if (locallyDequeued != null) + return locallyDequeued; + } - if ((callback = localWsq.LocalPop()) == null && // first try the local queue - !workItems.TryDequeue(out callback)) // then try the global queue + object? callback = _globalQueue.Dequeue(localQueue); + if (callback == null) { - // finally try to steal from another thread's local queue - WorkStealingQueue[] queues = WorkStealingQueueList.Queues; - int c = queues.Length; - Debug.Assert(c > 0, "There must at least be a queue for this thread."); - int maxIndex = c - 1; - int i = tl.random.Next(c); - while (c > 0) + LocalQueue[] queues = _localQueues; + int localQueueIndex = localQueue.NextRnd() & (_localQueues.Length - 1); + + // then traverse all local queues starting with those that differ in lower bits and going gradually up. + // this way we want to minimize the chances that two threads concurrently go through the same sequence of queues. + for (int i = 0; i < queues.Length; i++) { - i = (i < maxIndex) ? i + 1 : 0; - WorkStealingQueue otherQueue = queues[i]; - if (otherQueue != localWsq && otherQueue.CanSteal) + var localWsq = queues[localQueueIndex ^ i]; + callback = localWsq?.Dequeue(ref missedSteal); + if (callback != null) { - callback = otherQueue.TrySteal(ref missedSteal); - if (callback != null) - { - break; - } + break; } - c--; } } @@ -534,15 +1434,18 @@ public long LocalCount get { long count = 0; - foreach (WorkStealingQueue workStealingQueue in WorkStealingQueueList.Queues) + foreach (LocalQueue workStealingQueue in ThreadPoolGlobals.workQueue._localQueues) { - count += workStealingQueue.Count; + if (workStealingQueue != null) + { + count += workStealingQueue.Count; + } } return count; } } - public long GlobalCount => workItems.Count; + public long GlobalCount => ThreadPoolGlobals.workQueue._globalQueue.Count; /// /// Dispatches work items to this thread. @@ -553,77 +1456,113 @@ public long LocalCount /// internal static bool Dispatch() { - ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue; - - // - // Save the start time + // "Progress Guarantee" + // To ensure eventual dequeue of every enqueued item we must guarantee that: + // 1) after an enqueue, there is at least one worker request pending. + // (we can't gurantee that workers which are already in dispatch will see the new item) + // 2) the newly dispatched worker will see the work, if it is still there. + // 3) ensure another thread request when leaving Dequeue, unless certain that all the work that + // waited for us has been dequeued or that someone else will issue a thread request. // - int startTickCount = Environment.TickCount; + // NB: We are not responsible for new work enqueued after we entered dispatch. + // We will do our best, but ultimately it is the #1 that guarantees that someone will take care of + // work enqueued in the future. + + ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue; // // Update our records to indicate that an outstanding request for a thread has now been fulfilled. // From this point on, we are responsible for requesting another thread if we stop working for any - // reason, and we believe there might still be work in the queue. - // - // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will - // record a thread request on our behalf. So we don't need to worry about getting aborted right here. + // reason and are unsure whether the queue is completely empty. // + // CoreCLR: this whole scheme does not support thread aborts. + // FWIW a thread could be aborted right after dequeuing an item and before executing. + // We do not handle or expect that. outerWorkQueue.MarkThreadRequestSatisfied(); + // + // The clock is ticking! We have ThreadPoolGlobals.TP_QUANTUM milliseconds to get some work done, and then + // we need to return to the VM. + // + int quantumStartTime = Environment.TickCount; + // Has the desire for logging changed since the last time we entered? - outerWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); + var enabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); + if (outerWorkQueue.loggingEnabled != enabled) + { + // writing shared state. + outerWorkQueue.loggingEnabled = enabled; + } // // Assume that we're going to need another thread if this one returns to the VM. We'll set this to // false later, but only if we're absolutely certain that the queue is empty. // - bool needAnotherThread = true; + bool keepThreadSpinning = true; + try { - // - // Set up our thread-local data - // - // Use operate on workQueue local to try block so it can be enregistered - ThreadPoolWorkQueue workQueue = outerWorkQueue; - ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals(); - Thread currentThread = tl.currentThread; - + Thread currentThread = Thread.CurrentThread; // Start on clean ExecutionContext and SynchronizationContext currentThread._executionContext = null; currentThread._synchronizationContext = null; + int tasksDispatched = 0; + + // + // Use operate on workQueue local to try block so it can be enregistered + ThreadPoolWorkQueue workQueue = ThreadPoolGlobals.workQueue; + // // Loop until our quantum expires or there is no work. // - while (ThreadPool.KeepDispatching(startTickCount)) + do { - bool missedSteal = false; - // Use operate on workItem local to try block so it can be enregistered - object? workItem = workQueue.Dequeue(tl, ref missedSteal); + var localQueue = workQueue.GetOrAddLocalQueue(); + object? workItem = localQueue.TryPop(); if (workItem == null) { - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - needAnotherThread = missedSteal; + var spinner = new SpinWait(); + for (; ; ) + { + // we could not pop, try stealing + bool missedSteal = false; + workItem = workQueue.DequeueAny(ref missedSteal, localQueue); + if (workItem != null) + { + break; + } - // Tell the VM we're returning normally, not because Hill Climbing asked us to return. - return true; + // if there is no more work, start reducing parallelism exponentially + if (!missedSteal && (localQueue.NextRnd() & 1) == 0) + { + keepThreadSpinning = false; + // Tell the VM we're returning normally, not because Hill Climbing asked us to return. + return true; + } + + // so back off a little and try again (as long as quantum has not expired) + spinner.SpinOnce(); + } } if (workQueue.loggingEnabled) System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); - // - // If we found work, there may be more work. Ask for another thread so that the other work can be processed - // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. - // - workQueue.EnsureThreadRequested(); + // We are about to execute external code, which can take a while, block or even wait on something from other tasks. + // Make sure there is a request, so that starvation is noticed if we do not come back for a while. + // If this is our first workitem, be more aggressive. + if (tasksDispatched++ == 0) + { + // Every new worker that finds work will ask for parallelizm increase, but only once. + // This helps with front-edge ramping up from cold states. + workQueue.RequestThread(); + } + else + { + workQueue.EnsureThreadRequested(); + } // // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. @@ -667,9 +1606,6 @@ internal static bool Dispatch() currentThread.ResetThreadPoolThread(); - // Release refs - workItem = null; - // Return to clean ExecutionContext and SynchronizationContext ExecutionContext.ResetThreadPoolThread(currentThread); @@ -678,8 +1614,23 @@ internal static bool Dispatch() // us to return the thread to the pool or not. // if (!ThreadPool.NotifyWorkItemComplete()) + { return false; + } + + if ((tasksDispatched & 15) == 0) + { + // we have dispatched another 16 tasks. Make sure the core Id is not stale due to caching. + // Running with stale core Id is very rare, but when happends may result in latency outliers + // due to neglect of our "real" local queue. + // + // "16" was picked based on typical latency distributions in a task scheduling benchmark + // as happening often enough to cap the effects of "neglect" while also being cheap + // even when compared to nearly no-op tasks. + Thread.FlushCurrentProcessorId(); + } } + while (ThreadPool.KeepDispatching(quantumStartTime)); // If we get here, it's because our quantum expired. Tell the VM we're returning normally. return true; @@ -687,79 +1638,18 @@ internal static bool Dispatch() finally { // - // If we are exiting for any reason other than that the queue is definitely empty, ask for another - // thread to pick up where we left off. + // We are exiting, but not because we failed to find work, so we want to keep the same number of spinners. + // Make a request for a thread (up to #proc) to account for our leaving. // - if (needAnotherThread) - outerWorkQueue.EnsureThreadRequested(); - } - } - } - - // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast. - internal struct FastRandom // xorshift prng - { - private uint _w, _x, _y, _z; - - public FastRandom(int seed) - { - _x = (uint)seed; - _w = 88675123; - _y = 362436069; - _z = 521288629; - } - - public int Next(int maxValue) - { - Debug.Assert(maxValue > 0); + if (keepThreadSpinning) + ThreadPoolGlobals.workQueue.KeepThread(); - uint t = _x ^ (_x << 11); - _x = _y; _y = _z; _z = _w; - _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8)); - - return (int)(_w % (uint)maxValue); - } - } - - // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced. - internal sealed class ThreadPoolWorkQueueThreadLocals - { - [ThreadStatic] - public static ThreadPoolWorkQueueThreadLocals? threadLocals; - - public readonly ThreadPoolWorkQueue workQueue; - public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; - public readonly Thread currentThread; - public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly - - public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) - { - workQueue = tpq; - workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); - ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); - currentThread = Thread.CurrentThread; - } - - ~ThreadPoolWorkQueueThreadLocals() - { - // Transfer any pending workitems into the global queue so that they will be executed by another thread - if (null != workStealingQueue) - { - if (null != workQueue) - { - object? cb; - while ((cb = workStealingQueue.LocalPop()) != null) - { - Debug.Assert(null != cb); - workQueue.Enqueue(cb, forceGlobal: true); - } - } - - ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); + // we are releasing unneeded thread back to VM or the thread has run for a full quantum. + // in either case it makes sense to flush the cached core Id. + Thread.FlushCurrentProcessorId(); } } } - public delegate void WaitCallback(object? state); public delegate void WaitOrTimerCallback(object? state, bool timedOut); // signaled or timed out @@ -1191,32 +2081,55 @@ internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool prefe } // This method tries to take the target callback out of the current thread's queue. - internal static bool TryPopCustomWorkItem(object workItem) + internal static bool TryPopCustomWorkItem(Task workItem) { Debug.Assert(null != workItem); + return ThreadPoolGlobals.threadPoolInitialized && // if not initialized, so there's no way this workitem was ever queued. - ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem); + ThreadPoolGlobals.workQueue.TryRemove(workItem); } // Get all workitems. Called by TaskScheduler in its debugger hooks. internal static IEnumerable GetQueuedWorkItems() { // Enumerate global queue - foreach (object workItem in ThreadPoolGlobals.workQueue.workItems) + foreach (object o in GetGloballyQueuedWorkItems()) { - yield return workItem; + yield return o; + } + + // Enumerate each local queues + var workQueue = ThreadPoolGlobals.workQueue; + foreach (ThreadPoolWorkQueue.LocalQueue wsq in workQueue._localQueues) + { + if (wsq != null) + { + for (ThreadPoolWorkQueue.LocalQueue.LocalQueueSegment? s = wsq._deqSegment; s != null; s = s._nextSegment) + { + foreach (var slot in s._slots) + { + object? item = slot.Item; + if (item != null) + { + yield return item; + } + } + } + } } + } - // Enumerate each local queue - foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues) + internal static IEnumerable GetLocallyQueuedWorkItems() + { + ThreadPoolWorkQueue.LocalQueue? wsq = ThreadPoolGlobals.workQueue.GetLocalQueue(); + if (wsq != null) { - if (wsq != null && wsq.m_array != null) + for (ThreadPoolWorkQueue.LocalQueue.LocalQueueSegment? s = wsq._deqSegment; s != null; s = s._nextSegment) { - object?[] items = wsq.m_array; - for (int i = 0; i < items.Length; i++) + foreach (var slot in s._slots) { - object? item = items[i]; + object? item = slot.Item; if (item != null) { yield return item; @@ -1226,23 +2139,24 @@ internal static IEnumerable GetQueuedWorkItems() } } - internal static IEnumerable GetLocallyQueuedWorkItems() + internal static IEnumerable GetGloballyQueuedWorkItems() { - ThreadPoolWorkQueue.WorkStealingQueue? wsq = ThreadPoolWorkQueueThreadLocals.threadLocals?.workStealingQueue; - if (wsq != null && wsq.m_array != null) + var workQueue = ThreadPoolGlobals.workQueue; + + // Enumerate global queue + for (ThreadPoolWorkQueue.GlobalQueue.GlobalQueueSegment? s = workQueue._globalQueue._deqSegment; s != null; s = s._nextSegment) { - object?[] items = wsq.m_array; - for (int i = 0; i < items.Length; i++) + foreach (var slot in s._slots) { - object? item = items[i]; + object? item = slot.Item; if (item != null) + { yield return item; + } } } } - internal static IEnumerable GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems; - private static object[] ToObjectArray(IEnumerable workitems) { int i = 0; diff --git a/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs b/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs index 5e3899a92dfb..6974c90375a9 100644 --- a/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs +++ b/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs @@ -519,6 +519,14 @@ private static int RefreshCurrentProcessorId() return currentProcessorId; } + // Clear the cached processor Id + // This can be used before/after blocking the thread for nontrivial amount of time + // or around other operations which are likely to result in changing executing core. + internal static void FlushCurrentProcessorId() + { + t_currentProcessorIdCache = default; + } + // Cached processor id used as a hint for which per-core stack to access. It is periodically // refreshed to trail the actual thread core affinity. [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/src/vm/threadpoolrequest.cpp b/src/vm/threadpoolrequest.cpp index 65c9c512c01d..43169ad8ede5 100644 --- a/src/vm/threadpoolrequest.cpp +++ b/src/vm/threadpoolrequest.cpp @@ -537,18 +537,9 @@ void ManagedPerAppDomainTPCount::SetAppDomainRequestsActive() _ASSERTE(m_index.m_dwIndex != UNUSED_THREADPOOL_INDEX); #ifndef DACCESS_COMPILE - LONG count = VolatileLoad(&m_numRequestsPending); - while (true) - { - LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, count+1, count); - if (prev == count) - { - ThreadpoolMgr::MaybeAddWorkingWorker(); - ThreadpoolMgr::EnsureGateThreadRunning(); - break; - } - count = prev; - } + FastInterlockIncrement(&m_numRequestsPending); + ThreadpoolMgr::MaybeAddWorkingWorker(); + ThreadpoolMgr::EnsureGateThreadRunning(); #endif } @@ -560,20 +551,13 @@ void ManagedPerAppDomainTPCount::ClearAppDomainRequestsActive() _ASSERTE(m_index.m_dwIndex != UNUSED_THREADPOOL_INDEX); - LONG count = VolatileLoad(&m_numRequestsPending); - while (count > 0) - { - LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, 0, count); - if (prev == count) - break; - count = prev; - } + m_numRequestsPending = 0; } bool ManagedPerAppDomainTPCount::TakeActiveRequest() { LIMITED_METHOD_CONTRACT; - LONG count = VolatileLoad(&m_numRequestsPending); + LONG count = VolatileLoadWithoutBarrier(&m_numRequestsPending); while (count > 0) { LONG prev = FastInterlockCompareExchange(&m_numRequestsPending, count-1, count); diff --git a/src/vm/threadpoolrequest.h b/src/vm/threadpoolrequest.h index 3b2da28b5663..7ecd8705f273 100644 --- a/src/vm/threadpoolrequest.h +++ b/src/vm/threadpoolrequest.h @@ -135,11 +135,9 @@ class ManagedPerAppDomainTPCount : public IPerAppDomainTPCount { private: TPIndex m_index; - struct DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) { - BYTE m_padding1[MAX_CACHE_LINE_SIZE - sizeof(LONG)]; + struct ALIGNED(MAX_CACHE_LINE_SIZE) { // Only use with VolatileLoad+VolatileStore+FastInterlockCompareExchange LONG m_numRequestsPending; - BYTE m_padding2[MAX_CACHE_LINE_SIZE]; }; }; @@ -230,11 +228,9 @@ class UnManagedPerAppDomainTPCount : public IPerAppDomainTPCount { private: SpinLock m_lock; ULONG m_NumRequests; - struct DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) { - BYTE m_padding1[MAX_CACHE_LINE_SIZE - sizeof(LONG)]; + struct ALIGNED(MAX_CACHE_LINE_SIZE) { // Only use with VolatileLoad+VolatileStore+FastInterlockCompareExchange LONG m_outstandingThreadRequestCount; - BYTE m_padding2[MAX_CACHE_LINE_SIZE]; }; }; diff --git a/src/vm/util.hpp b/src/vm/util.hpp index 41cf728c59bc..91a4bfc71814 100644 --- a/src/vm/util.hpp +++ b/src/vm/util.hpp @@ -48,6 +48,12 @@ #define MAX_CACHE_LINE_SIZE 64 #endif +#ifdef _MSC_VER +#define ALIGNED(x) __declspec(align(x)) +#else +#define ALIGNED(x) __attribute__((aligned(x))) +#endif + //======================================================================== // More convenient names for integer types of a guaranteed size. //======================================================================== diff --git a/src/vm/win32threadpool.cpp b/src/vm/win32threadpool.cpp index cb7d84bfabc5..3bca3177a47c 100644 --- a/src/vm/win32threadpool.cpp +++ b/src/vm/win32threadpool.cpp @@ -77,6 +77,7 @@ SetWaitableTimerExProc g_pufnSetWaitableTimerEx = NULL; BOOL ThreadpoolMgr::InitCompletionPortThreadpool = FALSE; HANDLE ThreadpoolMgr::GlobalCompletionPort; // used for binding io completions on file handles +// Cacheline aligned SVAL_IMPL(ThreadpoolMgr::ThreadCounter,ThreadpoolMgr,CPThreadCounter); SVAL_IMPL_INIT(LONG,ThreadpoolMgr,MaxLimitTotalCPThreads,1000); // = MaxLimitCPThreadsPerCPU * number of CPUS @@ -85,8 +86,8 @@ SVAL_IMPL(LONG,ThreadpoolMgr,MaxFreeCPThreads); // = MaxFreeCP Volatile ThreadpoolMgr::NumCPInfrastructureThreads = 0; // number of threads currently busy handling draining cycle -// Cacheline aligned, hot variable -DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter); +// Cacheline aligned +SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter); SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS SVAL_IMPL(LONG,ThreadpoolMgr,MaxLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS @@ -97,12 +98,9 @@ LONG ThreadpoolMgr::cpuUtilizationAverage = 0; HillClimbing ThreadpoolMgr::HillClimbingInstance; // Cacheline aligned, 3 hot variables updated in a group -DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::PriorCompletedWorkRequests = 0; -DWORD ThreadpoolMgr::PriorCompletedWorkRequestsTime; -DWORD ThreadpoolMgr::NextCompletedWorkRequestsTime; +ThreadpoolMgr::WorkRequestDataT ThreadpoolMgr::WorkRequestData; LARGE_INTEGER ThreadpoolMgr::CurrentSampleStartTime; - unsigned int ThreadpoolMgr::WorkerThreadSpinLimit; bool ThreadpoolMgr::IsHillClimbingDisabled; int ThreadpoolMgr::ThreadAdjustmentInterval; @@ -924,7 +922,7 @@ void ThreadpoolMgr::AdjustMaxWorkersActive() DWORD currentTicks = GetTickCount(); LONG totalNumCompletions = (LONG)Thread::GetTotalWorkerThreadPoolCompletionCount(); - LONG numCompletions = totalNumCompletions - VolatileLoad(&PriorCompletedWorkRequests); + LONG numCompletions = totalNumCompletions - VolatileLoad(&WorkRequestData.PriorCompletedWorkRequests); LARGE_INTEGER startTime = CurrentSampleStartTime; LARGE_INTEGER endTime; @@ -986,10 +984,12 @@ void ThreadpoolMgr::AdjustMaxWorkersActive() } } - PriorCompletedWorkRequests = totalNumCompletions; - NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval; - MemoryBarrier(); // flush previous writes (especially NextCompletedWorkRequestsTime) - PriorCompletedWorkRequestsTime = currentTicks; + WorkRequestData.PriorCompletedWorkRequests = totalNumCompletions; + WorkRequestData.NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval; + // make sure that NextCompletedWorkRequestsTime is updated before PriorCompletedWorkRequestsTime + // so that reader never sees newer PriorCompletedWorkRequestsTime while having stale NextCompletedWorkRequestsTime + // NB: we are holding the ThreadAdjustmentLock and therefore the order cannot be violated by two threads writing. + VolatileStore(&WorkRequestData.PriorCompletedWorkRequestsTime, currentTicks); CurrentSampleStartTime = endTime;; } } @@ -1011,13 +1011,14 @@ void ThreadpoolMgr::MaybeAddWorkingWorker() while (true) { newCounts = counts; - newCounts.NumWorking = max(counts.NumWorking, min(counts.NumWorking + 1, counts.MaxWorking)); - newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking); - newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive)); - if (newCounts == counts) + if (counts.NumWorking + 1 > counts.MaxWorking) return; + newCounts.NumWorking++; + newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking); + newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive)); + ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); if (oldCounts == counts) diff --git a/src/vm/win32threadpool.h b/src/vm/win32threadpool.h index 12b8e204808d..a46fd4a443cd 100644 --- a/src/vm/win32threadpool.h +++ b/src/vm/win32threadpool.h @@ -106,7 +106,9 @@ class ThreadpoolMgr friend struct _DacGlobals; public: - struct ThreadCounter + + // ThreadCounter is always cacheline aligned + struct ALIGNED(MAX_CACHE_LINE_SIZE) ThreadCounter { static const int MaxPossibleCount = 0x7fff; @@ -299,7 +301,7 @@ class ThreadpoolMgr static inline void UpdateLastDequeueTime() { LIMITED_METHOD_CONTRACT; - VolatileStore(&LastDequeueTime, (unsigned int)GetTickCount()); + LastDequeueTime = (unsigned int)GetTickCount(); } static BOOL CreateTimerQueueTimer(PHANDLE phNewTimer, @@ -851,9 +853,11 @@ class ThreadpoolMgr { WRAPPER_NO_CONTRACT; - DWORD priorTime = PriorCompletedWorkRequestsTime; - MemoryBarrier(); // read fresh value for NextCompletedWorkRequestsTime below - DWORD requiredInterval = NextCompletedWorkRequestsTime - priorTime; + // make sure that PriorCompletedWorkRequestsTime is read before NextCompletedWorkRequestsTime + // to make sure that NextCompletedWorkRequestsTime is not older than PriorCompletedWorkRequestsTime + // NB: we write them in reverse order while holding a lock. + DWORD priorTime = VolatileLoad(&WorkRequestData.PriorCompletedWorkRequestsTime); + DWORD requiredInterval = WorkRequestData.NextCompletedWorkRequestsTime - priorTime; DWORD elapsedInterval = GetTickCount() - priorTime; if (elapsedInterval >= requiredInterval) { @@ -1016,13 +1020,21 @@ class ThreadpoolMgr static HillClimbing HillClimbingInstance; - DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) static LONG PriorCompletedWorkRequests; - static DWORD PriorCompletedWorkRequestsTime; - static DWORD NextCompletedWorkRequestsTime; + // the following 3 hot variables are nearly always used together. + // put on a separate cache line. + struct ALIGNED(MAX_CACHE_LINE_SIZE) WorkRequestDataT + { + public: + LONG PriorCompletedWorkRequests; + DWORD PriorCompletedWorkRequestsTime; + DWORD NextCompletedWorkRequestsTime; + }; + + static WorkRequestDataT WorkRequestData; static LARGE_INTEGER CurrentSampleStartTime; - static unsigned int WorkerThreadSpinLimit; + static unsigned int ThreadpoolMgr::WorkerThreadSpinLimit; static bool IsHillClimbingDisabled; static int ThreadAdjustmentInterval; @@ -1044,7 +1056,7 @@ class ThreadpoolMgr static const DWORD WorkerTimeout = 20 * 1000; static const DWORD WorkerTimeoutAppX = 5 * 1000; // shorter timeout to allow threads to exit prior to app suspension - DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_DECL(ThreadCounter,WorkerCounter); + SVAL_DECL(ThreadCounter, WorkerCounter); // // WorkerSemaphore is an UnfairSemaphore because: