Skip to content

Commit

Permalink
Warn on unfinished workflow handlers (#294)
Browse files Browse the repository at this point in the history
Fixes #261
  • Loading branch information
cretz authored Jul 2, 2024
1 parent 0d0334f commit 72f6fb2
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 39 deletions.
179 changes: 148 additions & 31 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -68,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
private readonly bool disableCompletionCommandReordering;
private readonly Handlers inProgressHandlers = new();
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
private Lazy<object?[]>? startArgs;
Expand Down Expand Up @@ -204,6 +206,9 @@ public WorkflowInstance(WorkflowInstanceDetails details)
/// </summary>
public bool TracingEventsEnabled { get; private init; }

/// <inheritdoc />
public bool AllHandlersFinished => inProgressHandlers.Count == 0;

/// <inheritdoc />
public CancellationToken CancellationToken => cancellationTokenSource.Token;

Expand Down Expand Up @@ -576,7 +581,13 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
}

// Maybe apply workflow completion command reordering logic
ApplyCompletionCommandReordering(act, completion);
ApplyCompletionCommandReordering(act, completion, out var workflowComplete);

// Log warnings if we have completed
if (workflowComplete && !IsReplaying)
{
inProgressHandlers.WarnIfAnyLeftOver(Info.WorkflowId, logger);
}

// Unset the completion
var toReturn = completion;
Expand Down Expand Up @@ -886,6 +897,10 @@ private void ApplyDoUpdate(DoUpdate update)
// Queue it up so it can run in workflow environment
_ = QueueNewTaskAsync(() =>
{
// Make sure we have loaded the instance which may invoke the constructor thereby
// letting the constructor register update handlers at runtime
var ignored = Instance;

// Set the current update for the life of this task
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name);

Expand Down Expand Up @@ -998,9 +1013,12 @@ private void ApplyDoUpdate(DoUpdate update)
Definition: updateDefn,
Args: argsForUpdate,
Headers: update.Headers));
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
update.Name, update.Id, updateDefn.UnfinishedPolicy));
return task.ContinueWith(
_ =>
{
inProgressHandlers.Remove(inProgress);
// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
Expand Down Expand Up @@ -1080,6 +1098,10 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
// Queue it up so it can run in workflow environment
_ = QueueNewTaskAsync(() =>
{
// Make sure we have loaded the instance which may invoke the constructor thereby
// letting the constructor register query handlers at runtime
var ignored = Instance;

var origCmdCount = completion?.Successful?.Commands?.Count;
try
{
Expand Down Expand Up @@ -1241,11 +1263,21 @@ private void ApplySignalWorkflow(SignalWorkflow signal)
return;
}

await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: args,
Headers: signal.Headers)).ConfigureAwait(true);
// Handle signal
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
signal.SignalName, null, signalDefn.UnfinishedPolicy));
try
{
await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: args,
Headers: signal.Headers)).ConfigureAwait(true);
}
finally
{
inProgressHandlers.Remove(inProgress);
}
}));
}

Expand Down Expand Up @@ -1394,7 +1426,9 @@ private string GetStackTrace()
}

private void ApplyCompletionCommandReordering(
WorkflowActivation act, WorkflowActivationCompletion completion)
WorkflowActivation act,
WorkflowActivationCompletion completion,
out bool workflowComplete)
{
// In earlier versions of the SDK we allowed commands to be sent after workflow
// completion. These ended up being removed effectively making the result of the
Expand All @@ -1404,40 +1438,42 @@ private void ApplyCompletionCommandReordering(
//
// Note this only applies for successful activations that don't have completion
// reordering disabled and that are either not replaying or have the flag set.
if (completion.Successful == null || disableCompletionCommandReordering)
{
return;
}
if (IsReplaying && !act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion))
{
return;
}

// We know we're on a newer SDK and can move completion to the end if we need to. First,
// find the completion command.
// Find the index of the completion command
var completionCommandIndex = -1;
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
if (completion.Successful != null)
{
var cmd = completion.Successful.Commands[i];
// Set completion index if the command is a completion
if (cmd.CancelWorkflowExecution != null ||
cmd.CompleteWorkflowExecution != null ||
cmd.ContinueAsNewWorkflowExecution != null ||
cmd.FailWorkflowExecution != null)
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
{
completionCommandIndex = i;
break;
var cmd = completion.Successful.Commands[i];
// Set completion index if the command is a completion
if (cmd.CancelWorkflowExecution != null ||
cmd.CompleteWorkflowExecution != null ||
cmd.ContinueAsNewWorkflowExecution != null ||
cmd.FailWorkflowExecution != null)
{
completionCommandIndex = i;
break;
}
}
}

// If there is no completion command or it's already at the end, nothing to do
if (completionCommandIndex == -1 ||
completionCommandIndex == completion.Successful.Commands.Count - 1)
workflowComplete = completionCommandIndex >= 0;

// This only applies for successful activations that have a completion not at the end,
// don't have completion reordering disabled, and that are either not replaying or have
// the flag set.
if (completion.Successful == null ||
completionCommandIndex == -1 ||
completionCommandIndex == completion.Successful.Commands.Count - 1 ||
disableCompletionCommandReordering ||
(IsReplaying && !act.AvailableInternalFlags.Contains(
(uint)WorkflowLogicFlag.ReorderWorkflowCompletion)))
{
return;
}

// Now we know the completion is in the wrong spot, so set the SDK flag and move it
// Now we know the completion is in the wrong spot and we're on a newer SDK, so set the
// SDK flag and move it
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion);
var compCmd = completion.Successful.Commands[completionCommandIndex];
completion.Successful.Commands.RemoveAt(completionCommandIndex);
Expand Down Expand Up @@ -2230,5 +2266,86 @@ public override Task SignalAsync(
public override Task CancelAsync() =>
instance.outbound.Value.CancelExternalWorkflowAsync(new(Id: Id, RunId: RunId));
}

private class Handlers : LinkedList<Handlers.Handler>
{
#pragma warning disable SA1118 // We're ok w/ string literals spanning lines
private static readonly Action<ILogger, string, WarnableSignals, Exception?> SignalWarning =
LoggerMessage.Define<string, WarnableSignals>(
LogLevel.Warning,
0,
"Workflow {Id} finished while signal handlers are still running. This may " +
"have interrupted work that the signal handler was doing. You can wait for " +
"all update and signal handlers to complete by using `await " +
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
"Alternatively, if both you and the clients sending the signal are okay with " +
"interrupting running handlers when the workflow finishes, and causing " +
"clients to receive errors, then you can disable this warning via the signal " +
"handler attribute: " +
"`[WorkflowSignal(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
"following signals were unfinished (and warnings were not disabled for their " +
"handler): {Handlers}");

private static readonly Action<ILogger, string, WarnableUpdates, Exception?> UpdateWarning =
LoggerMessage.Define<string, WarnableUpdates>(
LogLevel.Warning,
0,
"Workflow {Id} finished while update handlers are still running. This may " +
"have interrupted work that the update handler was doing, and the client " +
"that sent the update will receive a 'workflow execution already completed' " +
"RpcException instead of the update result. You can wait for all update and " +
"signal handlers to complete by using `await " +
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
"Alternatively, if both you and the clients sending the update are okay with " +
"interrupting running handlers when the workflow finishes, and causing " +
"clients to receive errors, then you can disable this warning via the update " +
"handler attribute: " +
"`[WorkflowUpdate(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
"following updates were unfinished (and warnings were not disabled for their " +
"handler): {Handlers}");
#pragma warning restore SA1118

public void WarnIfAnyLeftOver(string id, ILogger logger)
{
var signals = this.
Where(h => h.UpdateId == null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
GroupBy(h => h.Name).
Select(h => (h.Key, h.Count())).
ToArray();
if (signals.Length > 0)
{
SignalWarning(logger, id, new WarnableSignals { NamesAndCounts = signals }, null);
}
var updates = this.
Where(h => h.UpdateId != null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
Select(h => (h.Name, h.UpdateId!)).
ToArray();
if (updates.Length > 0)
{
UpdateWarning(logger, id, new WarnableUpdates { NamesAndIds = updates }, null);
}
}

public readonly struct WarnableSignals
{
public (string, int)[] NamesAndCounts { get; init; }

public override string ToString() => JsonSerializer.Serialize(
NamesAndCounts.Select(v => new { name = v.Item1, count = v.Item2 }).ToArray());
}

public readonly struct WarnableUpdates
{
public (string, string)[] NamesAndIds { get; init; }

public override string ToString() => JsonSerializer.Serialize(
NamesAndIds.Select(v => new { name = v.Item1, id = v.Item2 }).ToArray());
}

public record Handler(
string Name,
string? UpdateId,
HandlerUnfinishedPolicy UnfinishedPolicy);
}
}
}
27 changes: 27 additions & 0 deletions src/Temporalio/Workflows/HandlerUnfinishedPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace Temporalio.Workflows
{
/// <summary>
/// Actions taken if a workflow terminates with running handlers.
/// </summary>
/// <remarks>
/// Policy defining actions taken when a workflow exits while update or signal handlers are
/// running. The workflow exit may be due to successful return, failure, cancellation, or
/// continue-as-new.
/// </remarks>
public enum HandlerUnfinishedPolicy
{
/// <summary>
/// Issue a warning in addition to abandoning.
/// </summary>
WarnAndAbandon,

/// <summary>
/// Abandon the handler.
/// </summary>
/// <remarks>
/// In the case of an update handler this means that the client will receive an error rather
/// than the update result.
/// </remarks>
Abandon,
}
}
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/IWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ namespace Temporalio.Workflows
/// </summary>
internal interface IWorkflowContext
{
/// <summary>
/// Gets a value indicating whether <see cref="Workflow.AllHandlersFinished" /> is true.
/// </summary>
bool AllHandlersFinished { get; }

/// <summary>
/// Gets value for <see cref="Workflow.CancellationToken" />.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ namespace Temporalio.Workflows
/// </summary>
public static class Workflow
{
/// <summary>
/// Gets a value indicating whether all update and signal handlers have finished executing.
/// </summary>
/// <remarks>
/// Consider waiting on this condition before workflow return or continue-as-new, to prevent
/// interruption of in-progress handlers by workflow return:
/// <c>await Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)</c>.
/// </remarks>
public static bool AllHandlersFinished => Context.AllHandlersFinished;

/// <summary>
/// Gets the cancellation token for the workflow.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Temporalio/Workflows/WorkflowSignalAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,11 @@ public WorkflowSignalAttribute(string name)
/// an array of <see cref="Converters.IRawValue" />.
/// </summary>
public bool Dynamic { get; set; }

/// <summary>
/// Gets or sets the actions taken if a workflow exits with a running instance of this
/// handler. Default is <see cref="HandlerUnfinishedPolicy.WarnAndAbandon" />.
/// </summary>
public HandlerUnfinishedPolicy UnfinishedPolicy { get; set; } = HandlerUnfinishedPolicy.WarnAndAbandon;
}
}
22 changes: 18 additions & 4 deletions src/Temporalio/Workflows/WorkflowSignalDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ public class WorkflowSignalDefinition
{
private static readonly ConcurrentDictionary<MethodInfo, WorkflowSignalDefinition> Definitions = new();

private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del)
private WorkflowSignalDefinition(
string? name,
MethodInfo? method,
Delegate? del,
HandlerUnfinishedPolicy unfinishedPolicy)
{
Name = name;
Method = method;
Delegate = del;
UnfinishedPolicy = unfinishedPolicy;
}

/// <summary>
Expand All @@ -39,6 +44,11 @@ private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del
/// </summary>
internal Delegate? Delegate { get; private init; }

/// <summary>
/// Gets the unfinished policy.
/// </summary>
internal HandlerUnfinishedPolicy UnfinishedPolicy { get; private init; }

/// <summary>
/// Get a signal definition from a method or fail. The result is cached.
/// </summary>
Expand All @@ -63,12 +73,16 @@ public static WorkflowSignalDefinition FromMethod(MethodInfo method)
/// </summary>
/// <param name="name">Signal name. Null for dynamic signal.</param>
/// <param name="del">Signal delegate.</param>
/// <param name="unfinishedPolicy">Actions taken if a workflow exits with a running instance
/// of this handler.</param>
/// <returns>Signal definition.</returns>
public static WorkflowSignalDefinition CreateWithoutAttribute(
string? name, Delegate del)
string? name,
Delegate del,
HandlerUnfinishedPolicy unfinishedPolicy = HandlerUnfinishedPolicy.WarnAndAbandon)
{
AssertValid(del.Method, dynamic: name == null);
return new(name, null, del);
return new(name, null, del, unfinishedPolicy);
}

/// <summary>
Expand Down Expand Up @@ -103,7 +117,7 @@ private static WorkflowSignalDefinition CreateFromMethod(MethodInfo method)
name = name.Substring(0, name.Length - 5);
}
}
return new(name, method, null);
return new(name, method, null, attr.UnfinishedPolicy);
}

private static void AssertValid(MethodInfo method, bool dynamic)
Expand Down
Loading

0 comments on commit 72f6fb2

Please sign in to comment.