Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

[Wip] Improve Threadpool QUWI throughput #5943

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/mscorlib/src/System/IO/Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -652,16 +652,15 @@ public ReadWriteTask(
}

[SecurityCritical] // necessary for CoreCLR
private static void InvokeAsyncCallback(object completedTask)
private static void InvokeAsyncCallback(ReadWriteTask completedTask)
{
var rwc = (ReadWriteTask)completedTask;
var callback = rwc._callback;
rwc._callback = null;
callback(rwc);
var callback = completedTask._callback;
completedTask._callback = null;
callback(completedTask);
}

[SecurityCritical] // necessary for CoreCLR
private static ContextCallback s_invokeAsyncCallback;
private static ContextCallback<ReadWriteTask> s_invokeAsyncCallback;

[SecuritySafeCritical] // necessary for ExecutionContext.Run
void ITaskCompletionAction.Invoke(Task completingTask)
Expand All @@ -680,10 +679,9 @@ void ITaskCompletionAction.Invoke(Task completingTask)
{
_context = null;

var invokeAsyncCallback = s_invokeAsyncCallback;
if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign race condition
var invokeAsyncCallback = s_invokeAsyncCallback ?? (s_invokeAsyncCallback = InvokeAsyncCallback); // benign race condition

using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
using(context) ExecutionContext.Run(context, invokeAsyncCallback, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMac
// This allows us to undo any ExecutionContext changes made in MoveNext,
// so that they won't "leak" out of the first await.

Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
RuntimeHelpers.PrepareConstrainedRegions();
Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = new ExecutionContextSwitcher(currentThread);
try
{
ExecutionContext.EstablishCopyOnWriteScope(currentThread, ref ecs);
stateMachine.MoveNext();
}
finally
Expand Down Expand Up @@ -308,12 +307,11 @@ public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMac
// This allows us to undo any ExecutionContext changes made in MoveNext,
// so that they won't "leak" out of the first await.

Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
RuntimeHelpers.PrepareConstrainedRegions();
Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = new ExecutionContextSwitcher(currentThread);
try
{
ExecutionContext.EstablishCopyOnWriteScope(currentThread, ref ecs);
stateMachine.MoveNext();
}
finally
Expand Down Expand Up @@ -464,12 +462,11 @@ public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMac
// This allows us to undo any ExecutionContext changes made in MoveNext,
// so that they won't "leak" out of the first await.

Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
RuntimeHelpers.PrepareConstrainedRegions();
Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecs = new ExecutionContextSwitcher(currentThread);
try
{
ExecutionContext.EstablishCopyOnWriteScope(currentThread, ref ecs);
stateMachine.MoveNext();
}
finally
Expand Down Expand Up @@ -1062,7 +1059,7 @@ internal void RunWithCapturedContext()
try
{
// Use the context and callback to invoke m_stateMachine.MoveNext.
ExecutionContext.Run(m_context, InvokeMoveNextCallback, m_stateMachine, preserveSyncCtx: true);
ExecutionContext.Run(m_context, InvokeMoveNextCallback, m_stateMachine);
}
finally { m_context.Dispose(); }
}
Expand Down Expand Up @@ -1091,26 +1088,26 @@ internal MoveNextRunner(IAsyncStateMachine stateMachine)
internal void RunWithDefaultContext()
{
Contract.Assert(m_stateMachine != null, "The state machine must have been set before calling Run.");
ExecutionContext.Run(ExecutionContext.PreAllocatedDefault, InvokeMoveNextCallback, m_stateMachine, preserveSyncCtx: true);
ExecutionContext.Run(ExecutionContext.PreAllocatedDefault, InvokeMoveNextCallback, m_stateMachine);
}

/// <summary>Gets a delegate to the InvokeMoveNext method.</summary>
protected static ContextCallback InvokeMoveNextCallback
protected static ContextCallback<IAsyncStateMachine> InvokeMoveNextCallback
{
[SecuritySafeCritical]
get { return s_invokeMoveNext ?? (s_invokeMoveNext = InvokeMoveNext); }
}

/// <summary>Cached delegate used with ExecutionContext.Run.</summary>
[SecurityCritical]
private static ContextCallback s_invokeMoveNext; // lazily-initialized due to SecurityCritical attribution
private static ContextCallback<IAsyncStateMachine> s_invokeMoveNext; // lazily-initialized due to SecurityCritical attribution

/// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
/// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
[SecurityCritical] // necessary for ContextCallback in CoreCLR
private static void InvokeMoveNext(object stateMachine)
private static void InvokeMoveNext(IAsyncStateMachine stateMachine)
{
((IAsyncStateMachine)stateMachine).MoveNext();
stateMachine.MoveNext();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ private static void ThrowForNonSuccess(Task task)
internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext)
{
if (continuation == null) throw new ArgumentNullException("continuation");
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;

// If TaskWait* ETW events are enabled, trace a beginning event for this await
// and set up an ending event to be traced when the asynchronous await completes.
Expand All @@ -216,7 +215,7 @@ internal static void OnCompletedInternal(Task task, Action continuation, bool co
}

// Set the continuation onto the awaited task.
task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref stackMark);
task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}

/// <summary>
Expand Down
10 changes: 4 additions & 6 deletions src/mscorlib/src/System/Threading/CancellationTokenSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ internal CancellationCallbackInfo(

// Cached callback delegate that's lazily initialized due to ContextCallback being SecurityCritical
[SecurityCritical]
private static ContextCallback s_executionContextCallback;
private static ContextCallback<CancellationCallbackInfo> s_executionContextCallback;

/// <summary>
/// InternalExecuteCallbackSynchronously_GeneralPath
Expand All @@ -1042,8 +1042,8 @@ internal void ExecuteCallback()
if (TargetExecutionContext != null)
{
// Lazily initialize the callback delegate; benign race condition
var callback = s_executionContextCallback;
if (callback == null) s_executionContextCallback = callback = new ContextCallback(ExecutionContextCallback);
var callback = s_executionContextCallback ??
(s_executionContextCallback = new ContextCallback<CancellationCallbackInfo>(ExecutionContextCallback));

ExecutionContext.Run(
TargetExecutionContext,
Expand All @@ -1060,10 +1060,8 @@ internal void ExecuteCallback()
// the worker method to actually run the callback
// The signature is such that it can be used as a 'ContextCallback'
[SecurityCritical]
private static void ExecutionContextCallback(object obj)
private static void ExecutionContextCallback(CancellationCallbackInfo callbackInfo)
{
CancellationCallbackInfo callbackInfo = obj as CancellationCallbackInfo;
Contract.Assert(callbackInfo != null);
callbackInfo.Callback(callbackInfo.StateForCallback);
}
}
Expand Down
125 changes: 91 additions & 34 deletions src/mscorlib/src/System/Threading/ExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,23 @@ namespace System.Threading
[System.Runtime.InteropServices.ComVisible(true)]
public delegate void ContextCallback(Object state);

internal delegate void ContextCallback<T>(T state);

#if FEATURE_CORECLR

[SecurityCritical]
internal struct ExecutionContextSwitcher
{
internal ExecutionContext m_ec;
internal SynchronizationContext m_sc;
private ExecutionContext m_ec;
private SynchronizationContext m_sc;

public ExecutionContextSwitcher(Thread currentThread)
{
Contract.Assert(currentThread == Thread.CurrentThread);

m_sc = currentThread.SynchronizationContext;
m_ec = currentThread.ExecutionContext;
}

internal void Undo(Thread currentThread)
{
Expand All @@ -55,11 +65,8 @@ internal void Undo(Thread currentThread)
{
currentThread.SynchronizationContext = m_sc;
}

if (currentThread.ExecutionContext != m_ec)
{
ExecutionContext.Restore(currentThread, m_ec);
}

ExecutionContext.Restore(currentThread, m_ec);
}
}

Expand All @@ -85,21 +92,26 @@ private ExecutionContext(Dictionary<IAsyncLocal, object> localValues, IAsyncLoca
[SecuritySafeCritical]
public static ExecutionContext Capture()
{
return Thread.CurrentThread.ExecutionContext ?? ExecutionContext.Default;
return Thread.CurrentThread.ExecutionContext;
}

[SecurityCritical]
[FriendAccessAllowed]
internal static void Run(ExecutionContext executionContext, ContextCallback callback, Object state, bool preserveSyncCtx)
{
Run(executionContext, callback, state);
}

[SecurityCritical]
[HandleProcessCorruptedStateExceptions]
public static void Run(ExecutionContext executionContext, ContextCallback callback, Object state)
{
if (executionContext == null)
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_NullContext"));
if (executionContext == null) ThrowInvalidOperationNullContextException();

Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecsw = default(ExecutionContextSwitcher);
ExecutionContextSwitcher ecsw = new ExecutionContextSwitcher(currentThread);
try
{
EstablishCopyOnWriteScope(currentThread, ref ecsw);
ExecutionContext.Restore(currentThread, executionContext);
callback(state);
}
Expand All @@ -116,30 +128,56 @@ public static void Run(ExecutionContext executionContext, ContextCallback callba
}

[SecurityCritical]
[HandleProcessCorruptedStateExceptions]
internal static void Run<T>(ExecutionContext executionContext, ContextCallback<T> callback, T state)
{
if (executionContext == null) ThrowInvalidOperationNullContextException();

Thread currentThread = Thread.CurrentThread;
ExecutionContextSwitcher ecsw = new ExecutionContextSwitcher(currentThread);
try
{
ExecutionContext.Restore(currentThread, executionContext);
callback(state);
}
catch
{
// Note: we have a "catch" rather than a "finally" because we want
// to stop the first pass of EH here. That way we can restore the previous
// context before any of our callers' EH filters run. That means we need to
// end the scope separately in the non-exceptional case below.
ecsw.Undo(currentThread);
throw;
}
ecsw.Undo(currentThread);
}

private static void ThrowInvalidOperationNullContextException()
{
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_NullContext"));
}

[SecurityCritical]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void Restore(Thread currentThread, ExecutionContext executionContext)
{
Contract.Assert(currentThread == Thread.CurrentThread);

ExecutionContext previous = currentThread.ExecutionContext ?? Default;
currentThread.ExecutionContext = executionContext;

// New EC could be null if that's what ECS.Undo saved off.
// For the purposes of dealing with context change, treat this as the default EC
executionContext = executionContext ?? Default;

if (previous != executionContext)
var previousContext = currentThread.ExecutionContext;

if (executionContext != previousContext)
{
OnContextChanged(previous, executionContext);
RestoreChanged(currentThread, executionContext, previousContext);
}
}

[SecurityCritical]
static internal void EstablishCopyOnWriteScope(Thread currentThread, ref ExecutionContextSwitcher ecsw)
private static void RestoreChanged(Thread currentThread, ExecutionContext executionContext, ExecutionContext previousContext)
{
Contract.Assert(currentThread == Thread.CurrentThread);
ecsw.m_ec = currentThread.ExecutionContext;
ecsw.m_sc = currentThread.SynchronizationContext;

currentThread.ExecutionContext = executionContext;
OnContextChanged(previousContext, executionContext);
}

[SecurityCritical]
Expand Down Expand Up @@ -193,8 +231,11 @@ private static void OnContextChanged(ExecutionContext previous, ExecutionContext
internal static object GetLocalValue(IAsyncLocal local)
{
ExecutionContext current = Thread.CurrentThread.ExecutionContext;
if (current == null)
if (current == ExecutionContext.Default)
{
// Fast-path Default context has no values
return null;
}

object value;
current.m_localValues.TryGetValue(local, out value);
Expand All @@ -204,7 +245,28 @@ internal static object GetLocalValue(IAsyncLocal local)
[SecurityCritical]
internal static void SetLocalValue(IAsyncLocal local, object newValue, bool needChangeNotifications)
{
ExecutionContext current = Thread.CurrentThread.ExecutionContext ?? ExecutionContext.Default;
ExecutionContext current = Thread.CurrentThread.ExecutionContext;
if (current == ExecutionContext.Default)
{
// Fast-path for first/single AsyncLocal store
if (newValue == null && !needChangeNotifications) return;

var values = new Dictionary<IAsyncLocal, object>(1);
values[local] = newValue;

if (!needChangeNotifications)
{
Thread.CurrentThread.ExecutionContext = new ExecutionContext(values, Array.Empty<IAsyncLocal>());
}
else
{
Thread.CurrentThread.ExecutionContext = new ExecutionContext(values, new IAsyncLocal[] { local });

local.OnValueChanged(null, newValue, false);
}

return;
}

object previousValue;
bool hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
Expand All @@ -219,7 +281,9 @@ internal static void SetLocalValue(IAsyncLocal local, object newValue, bool need
Dictionary<IAsyncLocal, object> newValues = new Dictionary<IAsyncLocal, object>(current.m_localValues.Count + (hadPreviousValue ? 0 : 1));

foreach (KeyValuePair<IAsyncLocal, object> pair in current.m_localValues)
{
newValues.Add(pair.Key, pair.Value);
}

newValues[local] = newValue;

Expand Down Expand Up @@ -272,13 +336,6 @@ internal static ExecutionContext FastCapture()
return Capture();
}

[SecurityCritical]
[FriendAccessAllowed]
internal static void Run(ExecutionContext executionContext, ContextCallback callback, Object state, bool preserveSyncCtx)
{
Run(executionContext, callback, state);
}

[SecurityCritical]
internal bool IsDefaultFTContext(bool ignoreSyncCtx)
{
Expand Down
Loading