Skip to content

Commit

Permalink
Update with start (#381)
Browse files Browse the repository at this point in the history
Fixes #346
  • Loading branch information
cretz authored Dec 17, 2024
1 parent d073176 commit 855047e
Show file tree
Hide file tree
Showing 16 changed files with 1,314 additions and 152 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ Some things to note about the above code:
* A shortcut extension `ExecuteWorkflowAsync` is available that is just `StartWorkflowAsync` + `GetResultAsync`.
* `SignalWithStart` method is present on the workflow options to make the workflow call a signal-with-start call which
means it will only start the workflow if it's not running, but send a signal to it regardless.
* Separate `StartUpdateWithStartWorkflowAsync` and `ExecuteUpdateWithStartWorkflowAsync` methods are present on the
client to make the workflow call an update-with-start call which means it may start the workflow if it's not running,
but perform an update on it regardless.

#### Invoking Activities

Expand Down
65 changes: 64 additions & 1 deletion src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ protected virtual IDictionary<string, Payload> HeadersFromContext(
return Enumerable.Empty<KeyValuePair<string, object?>>();
}

/// <summary>
/// Create tag collection for the given workflow and update ID.
/// </summary>
/// <param name="workflowId">Workflow ID.</param>
/// <param name="updateId">Update ID.</param>
/// <returns>Tags.</returns>
protected virtual IEnumerable<KeyValuePair<string, object?>> CreateUpdateTags(
string workflowId, string? updateId)
{
var ret = new List<KeyValuePair<string, object?>>(2);
if (Options.TagNameWorkflowId is string wfName)
{
ret.Add(new(wfName, workflowId));
}
if (Options.TagNameUpdateId is string updateName && updateId is { } nonNullUpdateId)
{
ret.Add(new(updateName, nonNullUpdateId));
}
return ret;
}

/// <summary>
/// Create tag collection from the current workflow environment. Must be called within a
/// workflow.
Expand Down Expand Up @@ -208,6 +229,48 @@ public override async Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsyn
}
}

public override async Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateWithStartWorkflowAsync<TUpdateResult>(
StartUpdateWithStartWorkflowInput input)
{
// Ignore if for some reason the start operation is not set by this interceptor
if (input.Options.StartWorkflowOperation == null)
{
return await base.StartUpdateWithStartWorkflowAsync<TUpdateResult>(input).ConfigureAwait(false);
}

using (var activity = ClientSource.StartActivity(
$"UpdateWithStartWorkflow:{input.Options.StartWorkflowOperation.Workflow}",
kind: ActivityKind.Client,
parentContext: default,
tags: root.CreateUpdateTags(
workflowId: input.Options.StartWorkflowOperation.Options.Id!,
updateId: input.Options.Id)))
{
// We want the header on _both_ start and update
if (HeadersFromContext(input.Headers) is Dictionary<string, Payload> updateHeaders)
{
input = input with { Headers = updateHeaders };
}
if (HeadersFromContext(input.Options.StartWorkflowOperation.Headers) is Dictionary<string, Payload> startHeaders)
{
// We copy the operation but still mutate the existing headers. This is
// similar to what is done by other interceptors (they copy the input
// object but still mutate the original header dictionary if there).
input.Options.StartWorkflowOperation = (WithStartWorkflowOperation)input.Options.StartWorkflowOperation.Clone();
input.Options.StartWorkflowOperation.Headers = startHeaders;
}
try
{
return await base.StartUpdateWithStartWorkflowAsync<TUpdateResult>(input).ConfigureAwait(false);
}
catch (Exception e)
{
RecordExceptionWithStatus(activity, e);
throw;
}
}
}

public override async Task SignalWorkflowAsync(SignalWorkflowInput input)
{
using (var activity = ClientSource.StartActivity(
Expand Down Expand Up @@ -263,7 +326,7 @@ public override async Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsy
$"UpdateWorkflow:{input.Update}",
kind: ActivityKind.Client,
parentContext: default,
tags: root.CreateWorkflowTags(input.Id)))
tags: root.CreateUpdateTags(workflowId: input.Id, updateId: input.Options.Id)))
{
if (HeadersFromContext(input.Headers) is Dictionary<string, Payload> headers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class TracingInterceptorOptions : ICloneable
/// </summary>
public string? TagNameActivityId { get; set; } = "temporalActivityID";

/// <summary>
/// Gets or sets the tag name for update IDs. If null, no tag is created.
/// </summary>
public string? TagNameUpdateId { get; set; } = "temporalUpdateID";

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
Expand Down
78 changes: 78 additions & 0 deletions src/Temporalio/Client/ITemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,84 @@ WorkflowHandle<TWorkflow> GetWorkflowHandle<TWorkflow>(
WorkflowHandle<TWorkflow, TResult> GetWorkflowHandle<TWorkflow, TResult>(
string id, string? runId = null, string? firstExecutionRunId = null);

/// <summary>
/// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the
/// workflow at the same time. Note that in some cases this call may fail but the workflow
/// will still be started.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <exception cref="ArgumentException">Invalid run call or options.</exception>
/// <exception cref="Exceptions.WorkflowAlreadyStartedException">
/// Workflow was already started according to ID reuse and conflict policy.
/// </exception>
/// <exception cref="Exceptions.RpcException">Server-side error.</exception>
/// <remarks>WARNING: Workflow update with start is experimental and APIs may change.
/// </remarks>
Task<WorkflowUpdateHandle> StartUpdateWithStartWorkflowAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> updateCall,
WorkflowStartUpdateWithStartOptions options);

/// <summary>
/// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the
/// workflow at the same time. Note that in some cases this call may fail but the workflow
/// will still be started.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <exception cref="ArgumentException">Invalid run call or options.</exception>
/// <exception cref="Exceptions.WorkflowAlreadyStartedException">
/// Workflow was already started according to ID reuse and conflict policy.
/// </exception>
/// <exception cref="Exceptions.RpcException">Server-side error.</exception>
/// <remarks>WARNING: Workflow update with start is experimental and APIs may change.
/// </remarks>
Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateWithStartWorkflowAsync<TWorkflow, TUpdateResult>(
Expression<Func<TWorkflow, Task<TUpdateResult>>> updateCall,
WorkflowStartUpdateWithStartOptions options);

/// <summary>
/// Start an update using its name, possibly starting the workflow at the same time. Note
/// that in some cases this call may fail but the workflow will still be started.
/// </summary>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <exception cref="ArgumentException">Invalid run call or options.</exception>
/// <exception cref="Exceptions.WorkflowAlreadyStartedException">
/// Workflow was already started according to ID reuse and conflict policy.
/// </exception>
/// <exception cref="Exceptions.RpcException">Server-side error.</exception>
/// <remarks>WARNING: Workflow update with start is experimental and APIs may change.
/// </remarks>
Task<WorkflowUpdateHandle> StartUpdateWithStartWorkflowAsync(
string update, IReadOnlyCollection<object?> args, WorkflowStartUpdateWithStartOptions options);

/// <summary>
/// Start an update using its name, possibly starting the workflow at the same time. Note
/// that in some cases this call may fail but the workflow will still be started.
/// </summary>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <exception cref="ArgumentException">Invalid run call or options.</exception>
/// <exception cref="Exceptions.WorkflowAlreadyStartedException">
/// Workflow was already started according to ID reuse and conflict policy.
/// </exception>
/// <exception cref="Exceptions.RpcException">Server-side error.</exception>
/// <remarks>WARNING: Workflow update with start is experimental and APIs may change.
/// </remarks>
Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateWithStartWorkflowAsync<TUpdateResult>(
string update, IReadOnlyCollection<object?> args, WorkflowStartUpdateWithStartOptions options);

#if NETCOREAPP3_0_OR_GREATER
/// <summary>
/// List workflows with the given query.
Expand Down
Loading

0 comments on commit 855047e

Please sign in to comment.