Skip to content

Commit

Permalink
Support DisableEagerActivityExecution option (#366)
Browse files Browse the repository at this point in the history
Fixes #365
  • Loading branch information
cretz authored Nov 22, 2024
1 parent c30a2db commit 2400644
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: MetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes));
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,21 @@ public TemporalWorkerOptions()
/// </remarks>
public WorkerTuner? Tuner { get; set; }

/// <summary>
/// Gets or sets a value indicating whether eager activity executions will be disabled from
/// a workflow.
/// </summary>
/// <remarks>
/// Eager activity execution is an optimization on some servers that sends activities back
/// to the same worker as the calling workflow if they can run there.
/// </remarks>
/// <remarks>
/// This should be set to <c>true</c> for <see cref="MaxTaskQueueActivitiesPerSecond" /> to
/// work and in a future version of this API may be implied as such (i.e. this setting will
/// be ignored if that setting is set).
/// </remarks>
public bool DisableEagerActivityExecution { get; set; }

/// <summary>
/// Gets the TEMPORAL_DEBUG environment variable.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly Action<WorkflowInstance> onTaskStarting;
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
private readonly bool disableEagerActivityExecution;
private readonly Handlers inProgressHandlers = new();
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
Expand Down Expand Up @@ -190,6 +191,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
Random = new(details.Start.RandomnessSeed);
TracingEventsEnabled = !details.DisableTracingEvents;
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
disableEagerActivityExecution = details.DisableEagerActivityExecution;
}

/// <summary>
Expand Down Expand Up @@ -1756,6 +1758,7 @@ public override Task<TResult> ScheduleActivityAsync<TResult>(
Arguments = { instance.PayloadConverter.ToPayloads(input.Args) },
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
CancellationType = (Bridge.Api.WorkflowCommands.ActivityCancellationType)input.Options.CancellationType,
DoNotEagerlyExecute = instance.disableEagerActivityExecution || input.Options.DisableEagerActivityExecution,
};
if (input.Headers is IDictionary<string, Payload> headers)
{
Expand Down
4 changes: 3 additions & 1 deletion src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace Temporalio.Worker
/// <param name="OnTaskCompleted">Callback for every instance task complete.</param>
/// <param name="RuntimeMetricMeter">Lazy runtime-level metric meter.</param>
/// <param name="WorkerLevelFailureExceptionTypes">Failure exception types at worker level.</param>
/// <param name="DisableEagerActivityExecution">Whether to disable eager at the worker level.</param>
internal record WorkflowInstanceDetails(
string Namespace,
string TaskQueue,
Expand All @@ -41,5 +42,6 @@ internal record WorkflowInstanceDetails(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableEagerActivityExecution);
}
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: new(() => runtime.MetricMeter),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableEagerActivityExecution: false),
(runId, removeFromCache) => SetResult(removeFromCache));
}
catch
Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: options.RuntimeMetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes));
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes,
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
}
}
}
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ internal record WorkflowWorkerOptions(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableEagerActivityExecution);
}
14 changes: 14 additions & 0 deletions src/Temporalio/Workflows/ActivityOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ public class ActivityOptions : ICloneable
/// </summary>
public VersioningIntent VersioningIntent { get; set; } = VersioningIntent.Unspecified;

/// <summary>
/// Gets or sets a value indicating whether eager activity execution will be disabled for
/// this activity.
/// </summary>
/// <remarks>
/// Eager activity execution is an optimization on some servers that sends activities back
/// to the same worker as the calling workflow if they can run there.
/// </remarks>
/// <remarks>
/// If <c>false</c> (the default), eager execution may still be disabled at the worker level
/// or may not be requested due to lack of available slots.
/// </remarks>
public bool DisableEagerActivityExecution { get; set; }

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
Expand Down

0 comments on commit 2400644

Please sign in to comment.