Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Threading] Threadpool, lifo semaphore #1190

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion sources/core/Stride.Core/Threading/ThreadPool.SemaphoreW.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public sealed partial class ThreadPool
/// <summary>
/// Mostly lifted from dotnet's LowLevelLifoSemaphore
/// </summary>
private class SemaphoreW
private sealed class SemaphoreW : ISemaphore
{
private const int SpinSleep0Threshold = 10;

Expand All @@ -35,6 +35,8 @@ private class SemaphoreW
static SemaphoreW()
{
// Workaround as Thread.OptimalMaxSpinWaitsPerSpinIteration is internal and only implemented in core
// Note that as of .Net6 (https://github.com/dotnet/runtime/issues/53509 & https://github.com/dotnet/runtime/pull/55295)
// the underlying value is periodically updated in case the timing changes, this property shouldn't be used under those runtimes
BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static;
var f = typeof(Thread).GetProperty("OptimalMaxSpinWaitsPerSpinIteration", flags);
int opti = 7;
Expand All @@ -57,6 +59,11 @@ public SemaphoreW(int spinCountParam)

lifoSemaphore = new Semaphore(0, int.MaxValue);
}

public void Dispose()
{
lifoSemaphore?.Dispose();
}

public void Wait(int timeout = -1) => internals.Wait(spinCount, lifoSemaphore, timeout);

Expand Down
74 changes: 63 additions & 11 deletions sources/core/Stride.Core/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using Stride.Core.Annotations;
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;

namespace Stride.Core.Threading
Expand All @@ -19,14 +21,15 @@ public sealed partial class ThreadPool : IDisposable
/// </summary>
public static ThreadPool Instance = new ThreadPool();

/// <summary> Is the thread reading this property a worker thread </summary>
public static bool IsWorkerThread => isWorkerThread;

private static readonly bool SingleCore;
[ThreadStatic]
private static bool isWorkedThread;
/// <summary> Is the thread reading this property a worker thread </summary>
public static bool IsWorkedThread => isWorkedThread;
private static bool isWorkerThread;

private readonly ConcurrentQueue<Action> workItems = new ConcurrentQueue<Action>();
private readonly SemaphoreW semaphore;
private readonly ISemaphore semaphore;

private long completionCounter;
private int workScheduled, threadsBusy;
Expand All @@ -44,8 +47,29 @@ public sealed partial class ThreadPool : IDisposable

public ThreadPool(int? threadCount = null)
{
semaphore = new SemaphoreW(spinCountParam:70);

int spinCount = 70;
if(RuntimeInformation.ProcessArchitecture is Architecture.Arm or Architecture.Arm64)
{
// Dotnet:
// On systems with ARM processors, more spin-waiting seems to be necessary to avoid perf regressions from incurring
// the full wait when work becomes available soon enough. This is more noticeable after reducing the number of
// thread requests made to the thread pool because otherwise the extra thread requests cause threads to do more
// busy-waiting instead and adding to contention in trying to look for work items, which is less preferable.
spinCount *= 4;
}
try
{
semaphore = new DotnetLifoSemaphore(spinCount);
}
catch
{
// For net6+ this should not happen, logging instead of throwing as this is just a performance regression
if(Environment.Version.Major >= 6)
Console.Out?.WriteLine($"{typeof(ThreadPool).FullName}: Falling back to suboptimal semaphore");

semaphore = new SemaphoreW(spinCountParam:70);
}

WorkerThreadsCount = threadCount ?? (Environment.ProcessorCount == 1 ? 1 : Environment.ProcessorCount - 1);
leftToDispose = WorkerThreadsCount;
for (int i = 0; i < WorkerThreadsCount; i++)
Expand Down Expand Up @@ -129,7 +153,7 @@ private void NewWorker()

private void WorkerThreadScope()
{
isWorkedThread = true;
isWorkerThread = true;
try
{
do
Expand Down Expand Up @@ -170,12 +194,9 @@ public void Dispose()
}

semaphore.Release(WorkerThreadsCount);
semaphore.Dispose();
while (Volatile.Read(ref leftToDispose) != 0)
{
if (semaphore.SignalCount == 0)
{
semaphore.Release(1);
}
Thread.Yield();
}

Expand All @@ -185,5 +206,36 @@ public void Dispose()

}
}



private interface ISemaphore : IDisposable
{
public void Release( int Count );
public void Wait( int timeout = - 1 );
}



private sealed class DotnetLifoSemaphore : ISemaphore
{
private readonly IDisposable semaphore;
private readonly Func<int, bool, bool> wait;
private readonly Action<int> release;

public DotnetLifoSemaphore(int spinCount)
{
Type lifoType = Type.GetType("System.Threading.LowLevelLifoSemaphore");
semaphore = Activator.CreateInstance(lifoType, new object[]{ 0, short.MaxValue, spinCount, new Action( () => {} ) }) as IDisposable;
wait = lifoType.GetMethod("Wait", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Func<int, bool, bool>>(semaphore);
release = lifoType.GetMethod("Release", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Action<int>>(semaphore);
}



public void Dispose() => semaphore.Dispose();
public void Release(int count) => release(count);
public void Wait(int timeout = -1) => wait(timeout, true);
}
}
}