Skip to content

Commit

Permalink
Merge pull request #53 from OrleansContrib/feature/50/cancellationToken
Browse files Browse the repository at this point in the history
Adds cancellation token support for long running work grains.
  • Loading branch information
Kritner authored Feb 10, 2024
2 parents 495ae24 + 32c6e89 commit 135ad58
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 32 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ This package introduces a few "requirements" against Orleans:
Create an interface for the grain, which implements `ISyncWorker<TRequest, TResult>`, as well as one of the `IGrainWith...Key` interfaces. Then create a new class that extends the `SyncWorker<TRequest, TResult>` abstract class, and implements the new interface that was introduced:

```cs
public interface IPasswordVerifierGrain : ISyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IGrainWithGuidKey
public interface IPasswordVerifierGrain
: ISyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IGrainWithGuidKey;

public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IPasswordVerifierGrain
{
Expand All @@ -79,7 +80,8 @@ public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, Passwor
_passwordVerifier = passwordVerifier;
}

protected override async Task<PasswordVerifierResult> PerformWork(PasswordVerifierRequest request)
protected override async Task<PasswordVerifierResult> PerformWork(
PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken)
{
var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password);

Expand All @@ -89,6 +91,7 @@ public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, Passwor
};
}
}

public class PasswordVerifierRequest
{
public string Password { get; set; }
Expand Down
58 changes: 58 additions & 0 deletions samples/Orleans.SyncWork.Demo.Services/Grains/CancellableGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Orleans.SyncWork.Demo.Services.Grains;

public class CancellableGrain : SyncWorker<SampleCancellationRequest, SampleCancellationResult>, ICancellableGrain
{
public CancellableGrain(
ILogger<CancellableGrain> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<SampleCancellationResult> PerformWork(
SampleCancellationRequest request, GrainCancellationToken grainCancellationToken)
{
var startingValue = request.StartingValue;

for (var i = 0; i < request.EnumerationMax; i++)
{
if (grainCancellationToken.CancellationToken.IsCancellationRequested)
{
Logger.LogInformation("Task cancelled on iteration {Iteration}", i);

if (request.ThrowOnCancel)
throw new OperationCanceledException(grainCancellationToken.CancellationToken);

return new SampleCancellationResult() { EndingValue = startingValue };
}

startingValue += 1;
await Task.Delay(request.EnumerationDelay);
}

return new SampleCancellationResult() { EndingValue = startingValue };
}
}

[GenerateSerializer]
public class SampleCancellationRequest
{
[Id(0)]
public TimeSpan EnumerationDelay { get; init; }
[Id(1)]
public int StartingValue { get; init; }
[Id(2)]
public int EnumerationMax { get; init; } = 1_000;
[Id(3)]
public bool ThrowOnCancel { get; init; }
}

[GenerateSerializer]
public class SampleCancellationResult
{
[Id(0)]
public int EndingValue { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Orleans.SyncWork.Demo.Services.Grains;

public interface ICancellableGrain
: ISyncWorker<SampleCancellationRequest, SampleCancellationResult>, IGrainWithGuidKey;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public PasswordVerifierGrain(
_passwordVerifier = passwordVerifier;
}

protected override async Task<PasswordVerifierResult> PerformWork(PasswordVerifierRequest request)
protected override async Task<PasswordVerifierResult> PerformWork(
PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken)
{
var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

namespace Orleans.SyncWork.Demo.Services.TestGrains;

public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable : SyncWorker<TestDelayExceptionRequest, TestDelayExceptionResult>, IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable
public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable :
SyncWorker<TestDelayExceptionRequest, TestDelayExceptionResult>,
IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable
{
public GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable(
ILogger<GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler
) : base(logger, limitedConcurrencyScheduler) { }
) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<TestDelayExceptionResult> PerformWork(TestDelayExceptionRequest request)
protected override async Task<TestDelayExceptionResult> PerformWork(TestDelayExceptionRequest request,
GrainCancellationToken grainCancellationToken)
{
Logger.LogInformation($"Waiting {request.MsDelayPriorToResult} on {this.IdentityString}");
await Task.Delay(request.MsDelayPriorToResult);
Expand All @@ -35,4 +40,3 @@ public class TestDelayExceptionRequest

[GenerateSerializer]
public class TestDelayExceptionResult;

Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@

namespace Orleans.SyncWork.Demo.Services.TestGrains;

public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable : SyncWorker<TestDelaySuccessRequest, TestDelaySuccessResult>, IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable
public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable :
SyncWorker<TestDelaySuccessRequest, TestDelaySuccessResult>,
IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable
{
public GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable(
ILogger<GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler
) : base(logger, limitedConcurrencyScheduler) { }
) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<TestDelaySuccessResult> PerformWork(TestDelaySuccessRequest request)
protected override async Task<TestDelaySuccessResult> PerformWork(TestDelaySuccessRequest request,
GrainCancellationToken grainCancellationToken)
{
await Task.Delay(request.MsDelayPriorToResult);

return new TestDelaySuccessResult()
{
Started = request.Started,
Ended = DateTime.UtcNow
};
return new TestDelaySuccessResult() { Started = request.Started, Ended = DateTime.UtcNow };
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.SyncWork/Enums/SyncWorkStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public enum SyncWorkStatus
/// <summary>
/// The work has been completed, though an exception was thrown.
/// </summary>
Faulted
Faulted,
}
14 changes: 14 additions & 0 deletions src/Orleans.SyncWork/ISyncWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ public interface ISyncWorker<in TRequest, TResult> : IGrain
/// <returns>true if work is started, false if it was already started.</returns>
Task<bool> Start(TRequest request);
/// <summary>
/// <para>
/// Start long running work with the provided parameter.
/// </para>
/// <para>
/// Supports cancellation, but any cancellation logic is up to the grain implementation. It could
/// conceivably return a result at the point of cancellation, or throw, depending on what makes sense for
/// the particular grain.
/// </para>
/// </summary>
/// <param name="request">The parameter containing all necessary information to start the workload.</param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>true if work is started, false if it was already started.</returns>
Task<bool> Start(TRequest request, GrainCancellationToken grainCancellationToken);
/// <summary>
/// Gets the long running work status.
/// </summary>
/// <returns>The status of the long running work.</returns>
Expand Down
25 changes: 17 additions & 8 deletions src/Orleans.SyncWork/SyncWorker.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.SyncWork.Enums;
Expand Down Expand Up @@ -40,7 +39,15 @@ protected SyncWorker(ILogger logger, LimitedConcurrencyLevelTaskScheduler limite
}

/// <inheritdoc />
public Task<bool> Start(TRequest request)
public async Task<bool> Start(TRequest request)
{
var token = new GrainCancellationTokenSource().Token;

return await Start(request, token);
}

/// <inheritdoc />
public Task<bool> Start(TRequest request, GrainCancellationToken grainCancellationToken)
{
if (_task != null)
{
Expand All @@ -50,7 +57,7 @@ public Task<bool> Start(TRequest request)

Logger.LogDebug("{Method}: Starting task, set status to running.", nameof(Start));
_status = SyncWorkStatus.Running;
_task = CreateTask(request);
_task = CreateTask(request, grainCancellationToken);

return Task.FromResult(true);
}
Expand Down Expand Up @@ -94,22 +101,24 @@ public Task<bool> Start(TRequest request)
/// The method that actually performs the long running work.
/// </summary>
/// <param name="request">The request/parameters used for the execution of the method.</param>
/// <returns></returns>
protected abstract Task<TResult> PerformWork(TRequest request);
/// <param name="grainCancellationToken">The cancellation token.</param>
/// <returns>A result once available.</returns>
protected abstract Task<TResult> PerformWork(TRequest request, GrainCancellationToken grainCancellationToken);

/// <summary>
/// The task creation that fires off the long running work to the <see cref="LimitedConcurrencyLevelTaskScheduler"/>.
/// </summary>
/// <param name="request">The request to use for the invoke of the long running work.</param>
/// <param name="grainCancellationToken">The cancellation token.</param>
/// <returns>a <see cref="Task"/> representing the fact that the work has been dispatched.</returns>
private Task CreateTask(TRequest request)
private Task CreateTask(TRequest request, GrainCancellationToken grainCancellationToken)
{
return Task.Factory.StartNew(async () =>
{
try
{
Logger.LogInformation("{Method}: Beginning work for task.", nameof(CreateTask));
_result = await PerformWork(request);
_result = await PerformWork(request, grainCancellationToken);
_exception = default;
_status = SyncWorkStatus.Completed;
Logger.LogInformation("{Method}: Completed work for task.", nameof(CreateTask));
Expand All @@ -121,6 +130,6 @@ private Task CreateTask(TRequest request)
_exception = e;
_status = SyncWorkStatus.Faulted;
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler);
}, grainCancellationToken.CancellationToken, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler);
}
}
57 changes: 53 additions & 4 deletions src/Orleans.SyncWork/SyncWorkerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ namespace Orleans.SyncWork;
public static class SyncWorkerExtensions
{
/// <summary>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available, then returns it.
///
/// <para>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available,
/// then returns it.
/// </para>
/// <para>
/// Polls the <see cref="ISyncWorker{TRequest, TResult}"/> every 1000ms for a result, until one is available.
/// </para>
/// </summary>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
Expand All @@ -23,7 +27,29 @@ public static class SyncWorkerExtensions
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request)
{
return worker.StartWorkAndPollUntilResult(request, 1000);
var grainCancellationToken = new GrainCancellationTokenSource().Token;
return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken);
}

/// <summary>
/// <para>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available,
/// then returns it.
/// </para>
/// <para>
/// Polls the <see cref="ISyncWorker{TRequest, TResult}"/> every 1000ms for a result, until one is available.
/// </para>
/// </summary>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
/// <param name="worker">The <see cref="ISyncWorker{TRequest, TResult}"/> doing the work.</param>
/// <param name="request">The request to be dispatched.</param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, GrainCancellationToken grainCancellationToken)
{
return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken);
}

/// <summary>
Expand All @@ -43,6 +69,29 @@ public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, int msDelayPerStatusPoll)
{
var grainCancellationToken = new GrainCancellationTokenSource().Token;
return await StartWorkAndPollUntilResult(worker, request, msDelayPerStatusPoll, grainCancellationToken);
}

/// <summary>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available, then returns it.
/// </summary>
/// <remarks>
/// Caution is advised when setting the msDelayPerStatusPoll "too low" - 1000 ms seems to be pretty safe,
/// but if the cluster is under *enough* load, that much grain polling could overwhelm it.
/// </remarks>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
/// <param name="worker">The <see cref="ISyncWorker{TRequest, TResult}"/> doing the work.</param>
/// <param name="request">The request to be dispatched.</param>
/// <param name="msDelayPerStatusPoll">
/// The ms delay per attempt to poll for a <see cref="SyncWorkStatus.Completed"/> or <see cref="SyncWorkStatus.Faulted"/> status.
/// </param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, int msDelayPerStatusPoll, GrainCancellationToken grainCancellationToken)
{
await worker.Start(request);
await Task.Delay(100);
Expand All @@ -64,7 +113,7 @@ public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>
case SyncWorkStatus.NotStarted:
throw new InvalidStateException("This shouldn't happen, but if it does, it probably means the cluster may have died and restarted, and/or a timeout occurred and the grain got reinstantiated without firing off the work.");
default:
throw new Exception("How did we even get here...?");
throw new InvalidStateException("How did we even get here...?");
}
}
}
Expand Down
Loading

0 comments on commit 135ad58

Please sign in to comment.