-
-
Notifications
You must be signed in to change notification settings - Fork 963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dispatcher] improve api, reduce overhead, improve performances for items > 1k #2083
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,14 @@ | ||
// Copyright (c) .NET Foundation and Contributors (https://dotnetfoundation.org/ & https://stride3d.net) and Silicon Studio Corp. (https://www.siliconstudio.co.jp) | ||
// Distributed under the MIT license. See the LICENSE.md file in the project root for more information. | ||
|
||
#pragma warning disable CS8500 // Pointer not constrained to managed type | ||
|
||
using Stride.Core.Annotations; | ||
using Stride.Core.Diagnostics; | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Reflection; | ||
using System.Runtime.InteropServices; | ||
using System.Threading; | ||
|
||
namespace Stride.Core.Threading | ||
|
@@ -28,9 +32,9 @@ public sealed partial class ThreadPool : IDisposable | |
|
||
private static readonly ProfilingKey ProcessWorkItemKey = new ProfilingKey($"{nameof(ThreadPool)}.ProcessWorkItem"); | ||
|
||
private readonly ConcurrentQueue<Action> workItems = new ConcurrentQueue<Action>(); | ||
private readonly SemaphoreW semaphore; | ||
private readonly ConcurrentQueue<Work> workItems = new ConcurrentQueue<Work>(); | ||
private readonly ISemaphore semaphore; | ||
|
||
private long completionCounter; | ||
private int workScheduled, threadsBusy; | ||
private int disposing; | ||
|
@@ -47,8 +51,30 @@ public sealed partial class ThreadPool : IDisposable | |
|
||
public ThreadPool(int? threadCount = null) | ||
{ | ||
semaphore = new SemaphoreW(spinCountParam:70); | ||
|
||
int spinCount = 70; | ||
|
||
if(RuntimeInformation.ProcessArchitecture is Architecture.Arm or Architecture.Arm64) | ||
{ | ||
// Dotnet: | ||
// On systems with ARM processors, more spin-waiting seems to be necessary to avoid perf regressions from incurring | ||
// the full wait when work becomes available soon enough. This is more noticeable after reducing the number of | ||
// thread requests made to the thread pool because otherwise the extra thread requests cause threads to do more | ||
// busy-waiting instead and adding to contention in trying to look for work items, which is less preferable. | ||
spinCount *= 4; | ||
} | ||
try | ||
{ | ||
semaphore = new DotnetLifoSemaphore(spinCount); | ||
} | ||
catch | ||
{ | ||
// For net6+ this should not happen, logging instead of throwing as this is just a performance regression | ||
if(Environment.Version.Major >= 6) | ||
Console.Out?.WriteLine($"{typeof(ThreadPool).FullName}: Falling back to suboptimal semaphore"); | ||
|
||
semaphore = new SemaphoreW(spinCountParam:70); | ||
} | ||
|
||
WorkerThreadsCount = threadCount ?? (Environment.ProcessorCount == 1 ? 1 : Environment.ProcessorCount - 1); | ||
leftToDispose = WorkerThreadsCount; | ||
for (int i = 0; i < WorkerThreadsCount; i++) | ||
|
@@ -66,7 +92,7 @@ static ThreadPool() | |
/// Queue an action to run on one of the available threads, | ||
/// it is strongly recommended that the action takes less than a millisecond. | ||
/// </summary> | ||
public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | ||
public unsafe void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | ||
{ | ||
// Throw right here to help debugging | ||
if (workItem == null) | ||
|
@@ -85,10 +111,55 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | |
} | ||
|
||
Interlocked.Add(ref workScheduled, amount); | ||
var work = new Work { WorkHandler = &ActionHandler, Data = workItem }; | ||
for (int i = 0; i < amount; i++) | ||
{ | ||
PooledDelegateHelper.AddReference(workItem); | ||
workItems.Enqueue(workItem); | ||
workItems.Enqueue(work); | ||
} | ||
semaphore.Release(amount); | ||
} | ||
|
||
static void ActionHandler(object param) | ||
{ | ||
Action action = (Action)param; | ||
try | ||
{ | ||
action(); | ||
} | ||
finally | ||
{ | ||
PooledDelegateHelper.Release(action); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Queue some work item to run on one of the available threads, | ||
/// it is strongly recommended that the action takes less than a millisecond. | ||
/// Additionally, the parameter provided must be fixed from this call onward until the action has finished executing | ||
/// </summary> | ||
public unsafe void QueueUnsafeWorkItem(object parameter, delegate*<object, void> obj, int amount = 1) | ||
{ | ||
if (parameter == null) | ||
{ | ||
throw new NullReferenceException(nameof(parameter)); | ||
} | ||
|
||
if (amount < 1) | ||
{ | ||
throw new ArgumentOutOfRangeException(nameof(amount)); | ||
} | ||
|
||
if (disposing > 0) | ||
{ | ||
throw new ObjectDisposedException(ToString()); | ||
} | ||
|
||
Interlocked.Add(ref workScheduled, amount); | ||
var work = new Work { WorkHandler = obj, Data = parameter }; | ||
for (int i = 0; i < amount; i++) | ||
{ | ||
workItems.Enqueue(work); | ||
} | ||
semaphore.Release(amount); | ||
} | ||
|
@@ -98,20 +169,19 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | |
/// If you absolutely have to block inside one of the threadpool's thread for whatever | ||
/// reason do a busy loop over this function. | ||
/// </summary> | ||
public bool TryCooperate() | ||
public unsafe bool TryCooperate() | ||
{ | ||
if (workItems.TryDequeue(out var workItem)) | ||
{ | ||
Interlocked.Increment(ref threadsBusy); | ||
Interlocked.Decrement(ref workScheduled); | ||
try | ||
{ | ||
using var _ = Profiler.Begin(ProcessWorkItemKey); | ||
workItem.Invoke(); | ||
using (Profiler.Begin(ProcessWorkItemKey)) | ||
workItem.WorkHandler(workItem.Data); | ||
} | ||
finally | ||
{ | ||
PooledDelegateHelper.Release(workItem); | ||
Interlocked.Decrement(ref threadsBusy); | ||
Interlocked.Increment(ref completionCounter); | ||
} | ||
|
@@ -174,12 +244,9 @@ public void Dispose() | |
} | ||
|
||
semaphore.Release(WorkerThreadsCount); | ||
semaphore.Dispose(); | ||
while (Volatile.Read(ref leftToDispose) != 0) | ||
{ | ||
if (semaphore.SignalCount == 0) | ||
{ | ||
semaphore.Release(1); | ||
} | ||
Thread.Yield(); | ||
} | ||
|
||
|
@@ -189,5 +256,36 @@ public void Dispose() | |
|
||
} | ||
} | ||
|
||
unsafe struct Work | ||
{ | ||
public object Data; | ||
public delegate*<object, void> WorkHandler; | ||
} | ||
|
||
private interface ISemaphore : IDisposable | ||
{ | ||
public void Release( int Count ); | ||
public void Wait( int timeout = - 1 ); | ||
} | ||
|
||
private sealed class DotnetLifoSemaphore : ISemaphore | ||
{ | ||
private readonly IDisposable semaphore; | ||
private readonly Func<int, bool, bool> wait; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious, can C# 9.0 function pointers (and GetFunctionPointer() be useful in this scenario? (not sure it would make an actual perf difference though, but curious as if usable enough for this use case, as this would mean I can probably use it in some other places) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From my limited testing, yes, although I do not know the implications this has for JIT and such; I do remember that the address static function pointers lay on when taking its address is not fixed. If the method is 'moved' after JIT took care of the method, then we might have to retrieve the function pointer from its runtime method handle on every call to make sure we run the optimal version ... |
||
private readonly Action<int> release; | ||
|
||
public DotnetLifoSemaphore(int spinCount) | ||
{ | ||
Type lifoType = Type.GetType("System.Threading.LowLevelLifoSemaphore"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to add a comment here that we use reflection to access an internal type |
||
semaphore = Activator.CreateInstance(lifoType, new object[]{ 0, short.MaxValue, spinCount, new Action( () => {} ) }) as IDisposable; | ||
wait = lifoType.GetMethod("Wait", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Func<int, bool, bool>>(semaphore); | ||
release = lifoType.GetMethod("Release", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Action<int>>(semaphore); | ||
} | ||
|
||
public void Dispose() => semaphore.Dispose(); | ||
public void Release(int count) => release(count); | ||
public void Wait(int timeout = -1) => wait(timeout, true); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log errors to Stride's logging system instead of Console? A lot of the time on Windows you're not going to have a console attached.