From 89021d0e5454cebb2bba742acb5410917ed6b6ed Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 26 Nov 2024 12:33:07 -0800 Subject: [PATCH] Add type safe API to execute an async update workflow request (#2320) Add typed safe async update API --- .../io/temporal/client/WorkflowClient.java | 285 ++++++++++++++++++ .../client/WorkflowClientInternalImpl.java | 121 ++++++++ .../client/WorkflowInvocationHandler.java | 62 ++++ .../TestMultiArgWorkflowUpdateFunctions.java | 224 ++++++++++++++ .../workflow/updateTest/TypedUpdateTest.java | 159 ++++++++++ .../workflow/updateTest/UpdateTest.java | 66 +++- 6 files changed, 913 insertions(+), 4 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowUpdateFunctions.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java index 80c8044b2..f3295a069 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java @@ -572,6 +572,291 @@ static WorkflowExecution start( return WorkflowClientInternalImpl.start(workflow, arg1, arg2, arg3, arg4, arg5, arg6); } + /** + * Start a zero argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc updateMethod, @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, options); + } + + /** + * Start a one argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc1 updateMethod, A1 arg1, @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, options); + } + + /** + * Start a two argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc2 updateMethod, + A1 arg1, + A2 arg2, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, options); + } + + /** + * Start a three argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, arg3, options); + } + + /** + * Start a four argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, arg3, arg4, options); + } + + /** + * Start a five argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param arg5 fifth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate( + updateMethod, arg1, arg2, arg3, arg4, arg5, options); + } + + /** + * Start a six argument workflow update with a void return type + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param arg5 fifth update method parameter + * @param arg6 sixth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + static WorkflowUpdateHandle startUpdate( + Functions.Proc6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate( + updateMethod, arg1, arg2, arg3, arg4, arg5, arg6, options); + } + + /** + * Start a zero argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func updateMethod, @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, options); + } + + /** + * Start a one argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func1 updateMethod, A1 arg1, @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, options); + } + + /** + * Start a two argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func2 updateMethod, + A1 arg1, + A2 arg2, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, options); + } + + /** + * Start a three argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, arg3, options); + } + + /** + * Start a four argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate(updateMethod, arg1, arg2, arg3, arg4, options); + } + + /** + * Start a five argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param arg5 firth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate( + updateMethod, arg1, arg2, arg3, arg4, arg5, options); + } + + /** + * Start a six argument update workflow request asynchronously. + * + * @param updateMethod method reference annotated with @UpdateMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update method parameter + * @param arg2 second update method parameter + * @param arg3 third update method parameter + * @param arg4 fourth update method parameter + * @param arg5 firth update method parameter + * @param arg6 sixth update method parameter + * @param options update options + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static WorkflowUpdateHandle startUpdate( + Functions.Func6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + @Nonnull UpdateOptions options) { + return WorkflowClientInternalImpl.startUpdate( + updateMethod, arg1, arg2, arg3, arg4, arg5, arg6, options); + } + /** * Executes zero argument workflow with void return type together with an update workflow request. * diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index f53daeb2f..1060bd87f 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -537,6 +537,127 @@ public static CompletableFuture execute( return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6)); } + public static WorkflowUpdateHandle startUpdate( + Functions.Proc updateMethod, UpdateOptions options) { + enforceNonWorkflowThread(); + WorkflowInvocationHandler.initAsyncInvocation(InvocationType.UPDATE, options); + try { + updateMethod.apply(); + return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowUpdateHandle.class); + } finally { + WorkflowInvocationHandler.closeAsyncInvocation(); + } + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc1 updateMethod, A1 arg1, UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc2 updateMethod, A1 arg1, A2 arg2, UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Proc6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func updateMethod, UpdateOptions options) { + return (WorkflowUpdateHandle) startUpdate((Functions.Proc) updateMethod::apply, options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func1 updateMethod, A1 arg1, UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func2 updateMethod, A1 arg1, A2 arg2, UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options); + } + + public static WorkflowUpdateHandle startUpdate( + Functions.Func6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + UpdateOptions options) { + return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options); + } + Stream streamHistory(WorkflowExecution execution) { Preconditions.checkNotNull(execution, "execution is required"); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index cd727e33e..09643dcc3 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -23,6 +23,8 @@ import static io.temporal.internal.common.InternalUtils.createNexusBoundStub; import com.google.common.base.Defaults; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; import io.temporal.common.CronSchedule; @@ -54,6 +56,7 @@ public enum InvocationType { EXECUTE, SIGNAL_WITH_START, START_NEXUS, + UPDATE, UPDATE_WITH_START } @@ -93,6 +96,9 @@ static void initAsyncInvocation(InvocationType type, T value) { } else if (type == InvocationType.START_NEXUS) { NexusStartWorkflowRequest request = (NexusStartWorkflowRequest) value; invocationContext.set(new StartNexusOperationInvocationHandler(request)); + } else if (type == InvocationType.UPDATE) { + UpdateOptions updateOptions = (UpdateOptions) value; + invocationContext.set(new UpdateInvocationHandler(updateOptions)); } else if (type == InvocationType.UPDATE_WITH_START) { UpdateWithStartWorkflowOperation operation = (UpdateWithStartWorkflowOperation) value; invocationContext.set(new UpdateWithStartInvocationHandler(operation)); @@ -391,6 +397,9 @@ public void invoke( case QUERY: throw new IllegalArgumentException( "SignalWithStart batch doesn't accept methods annotated with @QueryMethod"); + case UPDATE: + throw new IllegalArgumentException( + "SignalWithStart batch doesn't accept methods annotated with @UpdateMethod"); case WORKFLOW: batch.start(untyped, args); break; @@ -441,6 +450,59 @@ public R getResult(Class resultClass) { } } + private static class UpdateInvocationHandler implements SpecificInvocationHandler { + private final UpdateOptions options; + private Object result; + + public UpdateInvocationHandler(UpdateOptions options) { + Preconditions.checkNotNull(options, "options"); + this.options = options; + } + + @Override + public InvocationType getInvocationType() { + return InvocationType.UPDATE; + } + + @Override + public void invoke( + POJOWorkflowInterfaceMetadata workflowMetadata, + WorkflowStub untyped, + Method method, + Object[] args) { + UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class); + if (updateMethod == null) { + throw new IllegalArgumentException( + "Only a method annotated with @UpdateMethod can be used to start an Update."); + } + POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method); + UpdateOptions.Builder builder = UpdateOptions.newBuilder(options); + if (Strings.isNullOrEmpty(options.getUpdateName())) { + builder.setUpdateName(methodMetadata.getName()); + } else if (!options.getUpdateName().equals(methodMetadata.getName())) { + throw new IllegalArgumentException( + "Update name in the options doesn't match the method name: " + + options.getUpdateName() + + " != " + + methodMetadata.getName()); + } + if (options.getResultType() == null) { + builder.setResultType(method.getGenericReturnType()); + } + if (options.getResultClass() == null) { + builder.setResultClass(method.getReturnType()); + } + + result = untyped.startUpdate(builder.build(), args); + } + + @Override + @SuppressWarnings("unchecked") + public R getResult(Class resultClass) { + return (R) result; + } + } + private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler { enum State { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowUpdateFunctions.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowUpdateFunctions.java new file mode 100644 index 000000000..b9807ac47 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowUpdateFunctions.java @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.shared; + +import io.temporal.workflow.*; + +public class TestMultiArgWorkflowUpdateFunctions { + + @WorkflowInterface + public interface TestMultiArgUpdateWorkflow + extends TestNoArgsUpdateFunc, + Test1ArgUpdateFunc, + Test2ArgUpdateFunc, + Test3ArgUpdateFunc, + Test4ArgUpdateFunc, + Test5ArgUpdateFunc, + Test6ArgUpdateFunc, + TestNoArgsUpdateProc, + Test1ArgUpdateProc, + Test2ArgUpdateProc, + Test3ArgUpdateProc, + Test4ArgUpdateProc, + Test5ArgUpdateProc, + Test6ArgUpdateProc { + @WorkflowMethod + String execute(); + + @SignalMethod + void complete(); + } + + public interface TestNoArgsUpdateFunc { + @UpdateMethod + String func(); + } + + public interface Test1ArgUpdateFunc { + @UpdateMethod + String func1(String input); + } + + public interface Test2ArgUpdateFunc { + + @UpdateMethod + String func2(String a1, int a2); + } + + public interface Test3ArgUpdateFunc { + + @UpdateMethod + String func3(String a1, int a2, int a3); + } + + public interface Test4ArgUpdateFunc { + + @UpdateMethod + String func4(String a1, int a2, int a3, int a4); + } + + public interface Test5ArgUpdateFunc { + + @UpdateMethod + String func5(String a1, int a2, int a3, int a4, int a5); + } + + public interface Test6ArgUpdateFunc { + + @UpdateMethod + String func6(String a1, int a2, int a3, int a4, int a5, int a6); + } + + public interface ProcInvocationQueryable { + + @QueryMethod(name = "getTrace") + String query(); + } + + public interface TestNoArgsUpdateProc { + + @UpdateMethod + void proc(); + } + + public interface Test1ArgUpdateProc { + + @UpdateMethod + void proc1(String input); + } + + public interface Test2ArgUpdateProc { + + @UpdateMethod + void proc2(String a1, int a2); + } + + public interface Test3ArgUpdateProc { + + @UpdateMethod + void proc3(String a1, int a2, int a3); + } + + public interface Test4ArgUpdateProc { + + @UpdateMethod + void proc4(String a1, int a2, int a3, int a4); + } + + public interface Test5ArgUpdateProc { + + @UpdateMethod + void proc5(String a1, int a2, int a3, int a4, int a5); + } + + public interface Test6ArgUpdateProc { + + @UpdateMethod + void proc6(String a1, int a2, int a3, int a4, int a5, int a6); + } + + public static class TestMultiArgUpdateWorkflowImpl implements TestMultiArgUpdateWorkflow { + + private String procResult = ""; + private boolean signaled; + + @Override + public String func() { + return "func"; + } + + @Override + public String func1(String a1) { + return a1; + } + + @Override + public String func2(String a1, int a2) { + return a1 + a2; + } + + @Override + public String func3(String a1, int a2, int a3) { + return a1 + a2 + a3; + } + + @Override + public String func4(String a1, int a2, int a3, int a4) { + return a1 + a2 + a3 + a4; + } + + @Override + public String func5(String a1, int a2, int a3, int a4, int a5) { + return a1 + a2 + a3 + a4 + a5; + } + + @Override + public String func6(String a1, int a2, int a3, int a4, int a5, int a6) { + return a1 + a2 + a3 + a4 + a5 + a6; + } + + @Override + public void proc() { + procResult += "proc"; + } + + @Override + public void proc1(String a1) { + procResult += a1; + } + + @Override + public void proc2(String a1, int a2) { + procResult += a1 + a2; + } + + @Override + public void proc3(String a1, int a2, int a3) { + procResult += a1 + a2 + a3; + } + + @Override + public void proc4(String a1, int a2, int a3, int a4) { + procResult += a1 + a2 + a3 + a4; + } + + @Override + public void proc5(String a1, int a2, int a3, int a4, int a5) { + procResult += a1 + a2 + a3 + a4 + a5; + } + + @Override + public void proc6(String a1, int a2, int a3, int a4, int a5, int a6) { + procResult += a1 + a2 + a3 + a4 + a5 + a6; + } + + @Override + public String execute() { + Workflow.await(() -> signaled); + return procResult; + } + + @Override + public void complete() { + signaled = true; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java new file mode 100644 index 000000000..1ad77e8fe --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assert.*; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.shared.TestMultiArgWorkflowUpdateFunctions; +import java.util.*; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class TypedUpdateTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerOptions(WorkerOptions.newBuilder().build()) + .setWorkflowTypes( + TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflowImpl.class) + .build(); + + @Test + public void testTypedStubSync() { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow workflow = + workflowClient.newWorkflowStub( + TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow.class, options); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + Assert.assertEquals("func", workflow.func()); + Assert.assertEquals("input", workflow.func1("input")); + Assert.assertEquals("input2", workflow.func2("input", 2)); + Assert.assertEquals("input23", workflow.func3("input", 2, 3)); + Assert.assertEquals("input234", workflow.func4("input", 2, 3, 4)); + Assert.assertEquals("input2345", workflow.func5("input", 2, 3, 4, 5)); + Assert.assertEquals("input23456", workflow.func6("input", 2, 3, 4, 5, 6)); + + workflow.proc(); + workflow.proc1("input"); + workflow.proc2("input", 2); + workflow.proc3("input", 2, 3); + workflow.proc4("input", 2, 3, 4); + workflow.proc5("input", 2, 3, 4, 5); + workflow.proc6("input", 2, 3, 4, 5, 6); + + workflow.complete(); + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("procinputinput2input23input234input2345input23456", result); + } + + @Test + public void testTypedAsync() throws ExecutionException, InterruptedException { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow workflow = + workflowClient.newWorkflowStub( + TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow.class, options); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + UpdateOptions updateOptions = + UpdateOptions.newBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(); + + Assert.assertEquals( + "func", WorkflowClient.startUpdate(workflow::func, updateOptions).getResultAsync().get()); + Assert.assertEquals( + "input", + WorkflowClient.startUpdate(workflow::func1, "input", updateOptions).getResultAsync().get()); + Assert.assertEquals( + "input2", + WorkflowClient.startUpdate(workflow::func2, "input", 2, updateOptions) + .getResultAsync() + .get()); + Assert.assertEquals( + "input23", + WorkflowClient.startUpdate(workflow::func3, "input", 2, 3, updateOptions) + .getResultAsync() + .get()); + Assert.assertEquals( + "input234", + WorkflowClient.startUpdate(workflow::func4, "input", 2, 3, 4, updateOptions) + .getResultAsync() + .get()); + Assert.assertEquals( + "input2345", + WorkflowClient.startUpdate(workflow::func5, "input", 2, 3, 4, 5, updateOptions) + .getResultAsync() + .get()); + Assert.assertEquals( + "input23456", + WorkflowClient.startUpdate(workflow::func6, "input", 2, 3, 4, 5, 6, updateOptions) + .getResultAsync() + .get()); + + UpdateOptions updateVoidOptions = + UpdateOptions.newBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(); + WorkflowClient.startUpdate(workflow::proc, updateVoidOptions).getResultAsync().get(); + WorkflowClient.startUpdate(workflow::proc1, "input", updateVoidOptions).getResultAsync().get(); + WorkflowClient.startUpdate(workflow::proc2, "input", 2, updateVoidOptions) + .getResultAsync() + .get(); + WorkflowClient.startUpdate(workflow::proc3, "input", 2, 3, updateVoidOptions) + .getResultAsync() + .get(); + WorkflowClient.startUpdate(workflow::proc4, "input", 2, 3, 4, updateVoidOptions) + .getResultAsync() + .get(); + WorkflowClient.startUpdate(workflow::proc5, "input", 2, 3, 4, 5, updateVoidOptions) + .getResultAsync() + .get(); + WorkflowClient.startUpdate(workflow::proc6, "input", 2, 3, 4, 5, 6, updateVoidOptions) + .getResultAsync() + .get(); + + workflow.complete(); + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("procinputinput2input23input234input2345input23456", result); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index 3b074180c..4dbe7fa9d 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -20,6 +20,8 @@ package io.temporal.workflow.updateTest; +import static io.temporal.client.WorkflowUpdateStage.ACCEPTED; +import static io.temporal.client.WorkflowUpdateStage.COMPLETED; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; @@ -174,7 +176,7 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello")); // send an update through the async path WorkflowUpdateHandle updateRef = - workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "World"); + workflowStub.startUpdate("update", ACCEPTED, String.class, 0, "World"); assertEquals("Execute-World", updateRef.getResultAsync().get()); // send a bad update that will be rejected through the sync path assertThrows( @@ -197,15 +199,71 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException // send a bad update that will be rejected through the sync path assertThrows( WorkflowUpdateException.class, - () -> - workflowStub.startUpdate( - "update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "Bad Update")); + () -> workflowStub.startUpdate("update", ACCEPTED, String.class, 0, "Bad Update")); workflowStub.update("complete", void.class); assertEquals("Execute-Hello Execute-World", workflowStub.getResult(String.class)); } + @Test + public void testAsyncTypedUpdate() throws ExecutionException, InterruptedException { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + String workflowType = TestWorkflows.WorkflowWithUpdate.class.getSimpleName(); + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + workflowType, + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())); + workflowStub.start(); + + assertEquals("initial", workflowStub.query("getState", String.class)); + + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub( + TestWorkflows.WorkflowWithUpdate.class, workflowStub.getExecution().getWorkflowId()); + + WorkflowUpdateHandle handle = + WorkflowClient.startUpdate( + workflow::update, + 0, + "Hello", + UpdateOptions.newBuilder().setWaitForStage(COMPLETED).build()); + assertEquals("Execute-Hello", handle.getResultAsync().get()); + + assertEquals( + "Execute-World", + WorkflowClient.startUpdate( + workflow::update, + 0, + "World", + UpdateOptions.newBuilder().setWaitForStage(ACCEPTED).build()) + .getResultAsync() + .get()); + + assertEquals( + "Execute-Hello", + WorkflowClient.startUpdate( + workflow::update, + 0, + "World", + UpdateOptions.newBuilder() + .setWaitForStage(COMPLETED) + .setUpdateId(handle.getId()) + .build()) + .getResultAsync() + .get()); + + assertEquals( + null, + WorkflowClient.startUpdate( + workflow::complete, + UpdateOptions.newBuilder().setWaitForStage(COMPLETED).build()) + .getResultAsync() + .get()); + + assertEquals("Execute-Hello Execute-World", workflowStub.getResult(String.class)); + } + @Test public void testUpdateResets() { assumeTrue(