Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Commit

Permalink
Cache pad threadpool queue segments (false sharing)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Jun 23, 2016
1 parent aef37e4 commit eec3a75
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions src/mscorlib/src/System/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,20 @@ internal struct PaddedWorkItem
public IThreadPoolWorkItem Item;
}

// Enusre full cache line occupied so no false sharing with other work items
[StructLayout(LayoutKind.Explicit, Size = 64)]
internal struct PaddedQueueSegment
{
[FieldOffset(0)]
public volatile QueueSegment Segment;
}

// Enusre full cache line occupied so no false sharing with other indicies
[StructLayout(LayoutKind.Explicit, Size = 64)]
internal struct PaddedIndex
internal struct PaddedInt
{
[FieldOffset(0)]
public volatile int Index;
public volatile int Value;
}

internal class WorkStealingQueue
Expand All @@ -158,14 +166,14 @@ internal class WorkStealingQueue
private const int START_INDEX = 0;
#endif

private PaddedIndex m_headIndex = new PaddedIndex() { Index = START_INDEX };
private PaddedIndex m_tailIndex = new PaddedIndex() { Index = START_INDEX };
private PaddedInt m_headIndex = new PaddedInt() { Value = START_INDEX };
private PaddedInt m_tailIndex = new PaddedInt() { Value = START_INDEX };

private SpinLock m_foreignLock = new SpinLock(false);

public void LocalPush(IThreadPoolWorkItem obj)
{
int tail = m_tailIndex.Index;
int tail = m_tailIndex.Value;

// We're going to increment the tail; if we'll overflow, then we need to reset our counts
if (tail == int.MaxValue)
Expand All @@ -175,7 +183,7 @@ public void LocalPush(IThreadPoolWorkItem obj)
{
m_foreignLock.Enter(ref lockTaken);

if (m_tailIndex.Index == int.MaxValue)
if (m_tailIndex.Value == int.MaxValue)
{
//
// Rather than resetting to zero, we'll just mask off the bits we don't care about.
Expand All @@ -187,9 +195,9 @@ public void LocalPush(IThreadPoolWorkItem obj)
// for the head to end up > than the tail, since you can't set any more bits than all of
// them.
//
m_headIndex.Index = m_headIndex.Index & m_mask;
m_tailIndex.Index = tail = m_tailIndex.Index & m_mask;
Contract.Assert(m_headIndex.Index <= m_tailIndex.Index);
m_headIndex.Value = m_headIndex.Value & m_mask;
m_tailIndex.Value = tail = m_tailIndex.Value & m_mask;
Contract.Assert(m_headIndex.Value <= m_tailIndex.Value);
}
}
finally
Expand All @@ -200,10 +208,10 @@ public void LocalPush(IThreadPoolWorkItem obj)
}

// When there are at least 2 elements' worth of space, we can take the fast path.
if (tail < m_headIndex.Index + m_mask)
if (tail < m_headIndex.Value + m_mask)
{
Volatile.Write(ref m_array[tail & m_mask].Item, obj);
m_tailIndex.Index = tail + 1;
m_tailIndex.Value = tail + 1;
}
else
{
Expand All @@ -213,8 +221,8 @@ public void LocalPush(IThreadPoolWorkItem obj)
{
m_foreignLock.Enter(ref lockTaken);

int head = m_headIndex.Index;
int count = m_tailIndex.Index - m_headIndex.Index;
int head = m_headIndex.Value;
int count = m_tailIndex.Value - m_headIndex.Value;

// If there is still space (one left), just add the element.
if (count >= m_mask)
Expand All @@ -226,13 +234,13 @@ public void LocalPush(IThreadPoolWorkItem obj)

// Reset the field values, incl. the mask.
m_array = newArray;
m_headIndex.Index = 0;
m_tailIndex.Index = tail = count;
m_headIndex.Value = 0;
m_tailIndex.Value = tail = count;
m_mask = (m_mask << 1) | 1;
}

Volatile.Write(ref m_array[tail & m_mask].Item, obj);
m_tailIndex.Index = tail + 1;
m_tailIndex.Value = tail + 1;
}
finally
{
Expand All @@ -246,7 +254,7 @@ public void LocalPush(IThreadPoolWorkItem obj)
public bool LocalFindAndPop(IThreadPoolWorkItem obj)
{
// Fast path: check the tail. If equal, we can skip the lock.
if (m_array[(m_tailIndex.Index - 1) & m_mask].Item == obj)
if (m_array[(m_tailIndex.Value - 1) & m_mask].Item == obj)
{
IThreadPoolWorkItem unused;
if (LocalPop(out unused))
Expand All @@ -265,7 +273,7 @@ public bool LocalFindAndPop(IThreadPoolWorkItem obj)
// problem (although degenerate cases are clearly an issue) because local work
// queues tend to be somewhat shallow in length, and because if we fail to find
// the work item, we are about to block anyway (which is very expensive).
for (int i = m_tailIndex.Index - 2; i >= m_headIndex.Index; i--)
for (int i = m_tailIndex.Value - 2; i >= m_headIndex.Value; i--)
{
if (m_array[i & m_mask].Item == obj)
{
Expand All @@ -285,10 +293,10 @@ public bool LocalFindAndPop(IThreadPoolWorkItem obj)
// And then check to see if we can fix up the indexes (if we're at
// the edge). If we can't, we just leave nulls in the array and they'll
// get filtered out eventually (but may lead to superflous resizing).
if (i == m_tailIndex.Index)
m_tailIndex.Index -= 1;
else if (i == m_headIndex.Index)
m_headIndex.Index += 1;
if (i == m_tailIndex.Value)
m_tailIndex.Value -= 1;
else if (i == m_headIndex.Value)
m_headIndex.Value += 1;

return true;
}
Expand All @@ -309,18 +317,18 @@ public bool LocalPop(out IThreadPoolWorkItem obj)
while (true)
{
// Decrement the tail using a fence to ensure subsequent read doesn't come before.
int tail = m_tailIndex.Index;
if (m_headIndex.Index >= tail)
int tail = m_tailIndex.Value;
if (m_headIndex.Value >= tail)
{
obj = null;
return false;
}

tail -= 1;
Interlocked.Exchange(ref m_tailIndex.Index, tail);
Interlocked.Exchange(ref m_tailIndex.Value, tail);

// If there is no interaction with a take, we can head down the fast path.
if (m_headIndex.Index <= tail)
if (m_headIndex.Value <= tail)
{
int idx = tail & m_mask;
obj = Volatile.Read(ref m_array[idx].Item);
Expand All @@ -339,7 +347,7 @@ public bool LocalPop(out IThreadPoolWorkItem obj)
{
m_foreignLock.Enter(ref lockTaken);

if (m_headIndex.Index <= tail)
if (m_headIndex.Value <= tail)
{
// Element still available. Take it.
int idx = tail & m_mask;
Expand All @@ -354,7 +362,7 @@ public bool LocalPop(out IThreadPoolWorkItem obj)
else
{
// If we encountered a race condition and element was stolen, restore the tail.
m_tailIndex.Index = tail + 1;
m_tailIndex.Value = tail + 1;
obj = null;
return false;
}
Expand All @@ -379,7 +387,7 @@ private bool TrySteal(out IThreadPoolWorkItem obj, ref bool missedSteal, int mil

while (true)
{
if (m_headIndex.Index >= m_tailIndex.Index)
if (m_headIndex.Value >= m_tailIndex.Value)
return false;

bool taken = false;
Expand All @@ -389,10 +397,10 @@ private bool TrySteal(out IThreadPoolWorkItem obj, ref bool missedSteal, int mil
if (taken)
{
// Increment head, and ensure read of tail doesn't move before it (fence).
int head = m_headIndex.Index;
Interlocked.Exchange(ref m_headIndex.Index, head + 1);
int head = m_headIndex.Value;
Interlocked.Exchange(ref m_headIndex.Value, head + 1);

if (head < m_tailIndex.Index)
if (head < m_tailIndex.Value)
{
int idx = head & m_mask;
obj = Volatile.Read(ref m_array[idx].Item);
Expand All @@ -406,7 +414,7 @@ private bool TrySteal(out IThreadPoolWorkItem obj, ref bool missedSteal, int mil
else
{
// Failed, restore head.
m_headIndex.Index = head;
m_headIndex.Value = head;
obj = null;
missedSteal = true;
}
Expand Down Expand Up @@ -436,17 +444,17 @@ internal class QueueSegment
// Holds the indexes of the lowest and highest valid elements of the nodes array.
// The low index is in the lower 16 bits, high index is in the upper 16 bits.
// Use GetIndexes and CompareExchangeIndexes to manipulate this.
private PaddedIndex indexes = new PaddedIndex();
private PaddedInt indexes = new PaddedInt();

// The next segment in the queue.
public volatile QueueSegment Next;
public PaddedQueueSegment Next;


const int SixteenBits = 0xffff;

void GetIndexes(out int upper, out int lower)
{
int i = indexes.Index;
int i = indexes.Value;
upper = (i >> 16) & SixteenBits;
lower = i & SixteenBits;

Expand All @@ -470,7 +478,7 @@ bool CompareExchangeIndexes(ref int prevUpper, int newUpper, ref int prevLower,

int oldIndexes = (prevUpper << 16) | (prevLower & SixteenBits);
int newIndexes = (newUpper << 16) | (newLower & SixteenBits);
int prevIndexes = Interlocked.CompareExchange(ref indexes.Index, newIndexes, oldIndexes);
int prevIndexes = Interlocked.CompareExchange(ref indexes.Value, newIndexes, oldIndexes);
prevUpper = (prevIndexes >> 16) & SixteenBits;
prevLower = prevIndexes & SixteenBits;
return prevIndexes == oldIndexes;
Expand Down Expand Up @@ -558,17 +566,17 @@ public bool TryDequeue(out IThreadPoolWorkItem node)
}

// The head and tail of the queue. We enqueue to the head, and dequeue from the tail.
internal volatile QueueSegment queueHead;
internal volatile QueueSegment queueTail;
internal PaddedQueueSegment queueHead;
internal PaddedQueueSegment queueTail;
internal bool loggingEnabled;

internal static SparseArray<WorkStealingQueue> allThreadQueues = new SparseArray<WorkStealingQueue>(16);

private volatile int numOutstandingThreadRequests = 0;
private PaddedInt numOutstandingThreadRequests = new PaddedInt() { Value = 0 };

public ThreadPoolWorkQueue()
{
queueTail = queueHead = new QueueSegment();
queueTail.Segment = queueHead.Segment = new QueueSegment();
loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool|FrameworkEventSource.Keywords.ThreadTransfer);
}

Expand All @@ -588,10 +596,10 @@ internal void EnsureThreadRequested()
// Note that there is a separate count in the VM which will also be incremented in this case,
// which is handled by RequestWorkerThread.
//
int count = numOutstandingThreadRequests;
int count = numOutstandingThreadRequests.Value;
while (count < ThreadPoolGlobals.processorCount)
{
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count+1, count);
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests.Value, count+1, count);
if (prev == count)
{
ThreadPool.RequestWorkerThread();
Expand All @@ -610,10 +618,10 @@ internal void MarkThreadRequestSatisfied()
// Note that there is a separate count in the VM which has already been decremented by the VM
// by the time we reach this point.
//
int count = numOutstandingThreadRequests;
int count = numOutstandingThreadRequests.Value;
while (count > 0)
{
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests.Value, count - 1, count);
if (prev == count)
{
break;
Expand All @@ -638,16 +646,16 @@ public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
}
else
{
QueueSegment head = queueHead;
QueueSegment head = queueHead.Segment;

while (!head.TryEnqueue(callback))
{
Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);
Interlocked.CompareExchange(ref head.Next.Segment, new QueueSegment(), null);

while (head.Next != null)
while (head.Next.Segment != null)
{
Interlocked.CompareExchange(ref queueHead, head.Next, head);
head = queueHead;
Interlocked.CompareExchange(ref queueHead.Segment, head.Next.Segment, head);
head = queueHead.Segment;
}
}
}
Expand Down Expand Up @@ -677,7 +685,7 @@ public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem

if (null == callback)
{
QueueSegment tail = queueTail;
QueueSegment tail = queueTail.Segment;
while (true)
{
if (tail.TryDequeue(out callback))
Expand All @@ -686,14 +694,14 @@ public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem
break;
}

if (null == tail.Next || !tail.IsUsedUp())
if (null == tail.Next.Segment || !tail.IsUsedUp())
{
break;
}
else
{
Interlocked.CompareExchange(ref queueTail, tail.Next, tail);
tail = queueTail;
Interlocked.CompareExchange(ref queueTail.Segment, tail.Next.Segment, tail);
tail = queueTail.Segment;
}
}
}
Expand Down Expand Up @@ -1740,7 +1748,7 @@ internal static bool TryPopCustomWorkItem(IThreadPoolWorkItem workItem)
[SecurityCritical]
internal static IEnumerable<IThreadPoolWorkItem> GetQueuedWorkItems()
{
return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail);
return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail.Segment);
}

internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail)
Expand Down Expand Up @@ -1768,7 +1776,7 @@ internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(Thread
// Now the global queue
for (ThreadPoolWorkQueue.QueueSegment segment = globalQueueTail;
segment != null;
segment = segment.Next)
segment = segment.Next.Segment)
{
ThreadPoolWorkQueue.PaddedWorkItem[] items = segment.nodes;
for (int i = 0; i < items.Length; i++)
Expand All @@ -1790,7 +1798,7 @@ internal static IEnumerable<IThreadPoolWorkItem> GetLocallyQueuedWorkItems()
[SecurityCritical]
internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems()
{
return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail);
return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail.Segment);
}

private static object[] ToObjectArray(IEnumerable<IThreadPoolWorkItem> workitems)
Expand Down

0 comments on commit eec3a75

Please sign in to comment.