From ddac895f8a4843b865ec880f834e18f30061b4f8 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Mon, 19 Jun 2023 19:43:02 -0500 Subject: [PATCH 1/4] implemented sendEvent logics - Add thenAccept for Task Class --- .gitignore | 3 +- .../java/com/microsoft/durabletask/Task.java | 7 ++ .../TaskOrchestrationExecutor.java | 28 ++++- .../durabletask/IntegrationTests.java | 115 ++++++++++++++++++ .../main/java/com/functions/SendEvent.java | 64 ++++++++++ .../java/com/functions/EndToEndTests.java | 23 ++++ 6 files changed, 234 insertions(+), 6 deletions(-) create mode 100644 samples-azure-functions/src/main/java/com/functions/SendEvent.java diff --git a/.gitignore b/.gitignore index 3ff2e24..92c13c4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ build/ .project .settings .classpath -repo/ \ No newline at end of file +repo/ +Nuget.Config \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/Task.java b/client/src/main/java/com/microsoft/durabletask/Task.java index 576034c..2a44c73 100644 --- a/client/src/main/java/com/microsoft/durabletask/Task.java +++ b/client/src/main/java/com/microsoft/durabletask/Task.java @@ -3,6 +3,7 @@ package com.microsoft.durabletask; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * Represents an asynchronous operation in a durable orchestration. @@ -27,6 +28,7 @@ */ public abstract class Task { final CompletableFuture future; + protected Consumer consumer; Task(CompletableFuture future) { this.future = future; @@ -54,4 +56,9 @@ public boolean isCancelled() { * @return the result of the task */ public abstract V await(); + + public Task thenAccept(Consumer consumer) { + this.consumer = consumer; + return this; + } } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 5f7ab9e..67ef645 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -15,6 +15,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.logging.Logger; @@ -693,6 +694,19 @@ private void handleSubOrchestrationFailed(HistoryEvent e){ task.completeExceptionally(exception); } + private void handleEventSentEvent(HistoryEvent e) { + int taskId = e.getEventId(); + EventSentEvent eventSentEvent = e.getEventSent(); + OrchestratorAction taskAction = this.pendingActions.remove(taskId); + if (taskAction == null) { + String message = String.format( + "Non-deterministic orchestrator detected: a history event scheduling an sent-event task with sequence ID %d and name '%s' was replayed but the current orchestrator implementation didn't actually schedule this task. Was a change made to the orchestrator code after this instance had already started running?", + taskId, + eventSentEvent.getName()); + throw new NonDeterministicOrchestratorException(message); + } + } + private void handleExecutionTerminated(HistoryEvent e) { ExecutionTerminatedEvent executionTerminatedEvent = e.getExecutionTerminated(); this.completeInternal(executionTerminatedEvent.getInput().getValue(), null, OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); @@ -772,6 +786,7 @@ private void processEvent(HistoryEvent e) { this.setCurrentInstant(instant); break; case ORCHESTRATORCOMPLETED: + case EXECUTIONCOMPLETED: // No action break; case EXECUTIONSTARTED: @@ -791,8 +806,6 @@ private void processEvent(HistoryEvent e) { TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; -// case EXECUTIONCOMPLETED: -// break; // case EXECUTIONFAILED: // break; case EXECUTIONTERMINATED: @@ -822,8 +835,9 @@ private void processEvent(HistoryEvent e) { case SUBORCHESTRATIONINSTANCEFAILED: this.handleSubOrchestrationFailed(e); break; -// case EVENTSENT: -// break; + case EVENTSENT: + this.handleEventSentEvent(e); + break; case EVENTRAISED: this.handleEventRaised(e); break; @@ -1085,7 +1099,11 @@ public V await() { // If the future is done, return its value right away if (this.future.isDone()) { try { - return this.future.get(); + V result = this.future.get(); + if (this.consumer != null) { + this.consumer.accept(result); + } + return result; } catch (ExecutionException e) { this.handleException(e.getCause()); } catch (Exception e) { diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 5d6a47f..1aab1bf 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -1110,4 +1110,119 @@ private static String getExceptionMessage(String taskName, int expectedTaskId, S expectedTaskId, expectedExceptionMessage); } + + @Test + void sendEvent() throws IOException, InterruptedException, TimeoutException { + final String orchestratorOne = "Orchestrator1"; + final String orchestratorTwo = "Orchestrator2"; + final String instanceId = "testId"; + final String eventName = "testEvent"; + final String eventPayload = "testPayload"; + final String activityName = "Echo"; + final String finishMessage = "Finished Sending Event"; + final String input = Instant.now().toString(); + int start = 1; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorOne, ctx -> { + String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); + String output = ctx.callActivity(activityName, awaitInput, String.class).await(); + ctx.complete(output); + }) + .addOrchestrator(orchestratorTwo, ctx -> { + String activityInput = ctx.getInput(String.class); + ctx.sendEvent(instanceId, eventName, eventPayload); + ctx.complete(finishMessage); + }) + .addActivity(activityName, ctx -> { + return String.format("Hello, %s!", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + client.scheduleNewOrchestrationInstance(orchestratorOne, input, instanceId); + String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorTwo, input); + + OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true); + + assertNotNull(orchestratorOneInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); + String outputTwo = orchestratorOneInstance.readOutputAs(String.class); + String expected = String.format("Hello, %s!", eventPayload); + assertEquals(expected, outputTwo); + + OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( + orchestratorOneID, + defaultTimeout, + true); + + assertNotNull(orchestratorTwoInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); + String outputOne = orchestratorTwoInstance.readOutputAs(String.class); + assertEquals(finishMessage, outputOne); + + } + } + + @Test + void orchestrationThenAccept() throws IOException, InterruptedException, TimeoutException { + final String orchestratorOne = "Orchestrator1"; + final String orchestratorTwo = "Orchestrator2"; + final String instanceId = "testId"; + final String eventName = "testEvent"; + final String eventPayload = "testPayload"; + final String activityName = "Echo"; + final String input = Instant.now().toString(); + int start = 1; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorOne, ctx -> { + String activityInput = ctx.getInput(String.class); + String output = ctx + .callActivity(activityName, activityInput, String.class) + .thenAccept(result -> { + ctx.sendEvent(instanceId, eventName, eventPayload); + }).await(); + ctx.complete(output); + }) + .addOrchestrator(orchestratorTwo, ctx -> { + String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); + String output = ctx.callActivity(activityName, awaitInput, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Hello, %s!", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + client.scheduleNewOrchestrationInstance(orchestratorTwo, input, instanceId); + String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorOne, input); + + OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true); + + assertNotNull(orchestratorTwoInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); + String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); + String expected = String.format("Hello, %s!", eventPayload); + assertEquals(expected, outputTwo); + + OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( + orchestratorOneID, + defaultTimeout, + true); + + assertNotNull(orchestratorOneInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); + String outputOne = orchestratorOneInstance.readOutputAs(String.class); + assertEquals(expected, outputOne); + + } + } } diff --git a/samples-azure-functions/src/main/java/com/functions/SendEvent.java b/samples-azure-functions/src/main/java/com/functions/SendEvent.java new file mode 100644 index 0000000..59be74f --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/SendEvent.java @@ -0,0 +1,64 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +public class SendEvent { + /** + * This HTTP-triggered function starts the orchestration. + */ + @FunctionName("WaitEventOrchestration") + public HttpResponseMessage waitEventOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) throws TimeoutException, InterruptedException { + context.getLogger().info("Java HTTP trigger processed a request."); + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("WaitEvent", null, "123"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("SendEventOrchestration") + public HttpResponseMessage sendEventOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) throws TimeoutException, InterruptedException { + context.getLogger().info("Java HTTP trigger processed a request."); + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("SendEvent"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + // + @FunctionName("WaitEvent") + public String waitEvent( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + String await = ctx.waitForExternalEvent("kcevent", String.class).await(); + return ctx.callActivity("Capitalize", await, String.class).await(); + } + + @FunctionName("SendEvent") + public void sendEvent( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + ctx.sendEvent("123", "kcevent", "Hello World!"); + return; + } +} diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index a174b39..9f1afb8 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -59,4 +59,27 @@ public void continueAsNew() throws InterruptedException { runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); assertEquals("Terminated", runTimeStatus); } + + @Test + public void sendEvent() throws InterruptedException { + String waitEventOrchestrationPath = "api/WaitEventOrchestration"; + String SendEventOrchestrationPath = "api/SendEventOrchestration"; + Response waitEventOrchestrationResponse = post(waitEventOrchestrationPath); + JsonPath jsonPath = waitEventOrchestrationResponse.jsonPath(); + String waitEventOrchestrationStatusQueryGetUri = jsonPath.get("statusQueryGetUri"); + String runTimeStatus; + for (int i = 0; i < 10; i++) { + Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri); + runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Running", runTimeStatus); + Thread.sleep(1000); + } + + post(SendEventOrchestrationPath); + Thread.sleep(1000); + + Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri); + runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Completed", runTimeStatus); + } } From 2081e9bf4659d41b0499a23a7bfce4300465a198 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Tue, 20 Jun 2023 13:02:04 -0500 Subject: [PATCH 2/4] update testing logics - remove thenAccept logics --- .../java/com/microsoft/durabletask/Task.java | 7 - .../TaskOrchestrationExecutor.java | 8 +- .../durabletask/IntegrationTests.java | 133 +++++++++--------- .../main/java/com/functions/SendEvent.java | 15 +- .../java/com/functions/EndToEndTests.java | 1 - 5 files changed, 77 insertions(+), 87 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/Task.java b/client/src/main/java/com/microsoft/durabletask/Task.java index 2a44c73..576034c 100644 --- a/client/src/main/java/com/microsoft/durabletask/Task.java +++ b/client/src/main/java/com/microsoft/durabletask/Task.java @@ -3,7 +3,6 @@ package com.microsoft.durabletask; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; /** * Represents an asynchronous operation in a durable orchestration. @@ -28,7 +27,6 @@ */ public abstract class Task { final CompletableFuture future; - protected Consumer consumer; Task(CompletableFuture future) { this.future = future; @@ -56,9 +54,4 @@ public boolean isCancelled() { * @return the result of the task */ public abstract V await(); - - public Task thenAccept(Consumer consumer) { - this.consumer = consumer; - return this; - } } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 67ef645..9dd6c61 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -15,7 +15,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.logging.Logger; @@ -837,6 +836,7 @@ private void processEvent(HistoryEvent e) { break; case EVENTSENT: this.handleEventSentEvent(e); +// this.handleEventRaised(e); break; case EVENTRAISED: this.handleEventRaised(e); @@ -1099,11 +1099,7 @@ public V await() { // If the future is done, return its value right away if (this.future.isDone()) { try { - V result = this.future.get(); - if (this.consumer != null) { - this.consumer.accept(result); - } - return result; + return this.future.get(); } catch (ExecutionException e) { this.handleException(e.getCause()); } catch (Exception e) { diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 1aab1bf..d0e0183 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -1120,8 +1120,6 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { final String eventPayload = "testPayload"; final String activityName = "Echo"; final String finishMessage = "Finished Sending Event"; - final String input = Instant.now().toString(); - int start = 1; DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorOne, ctx -> { String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); @@ -1129,7 +1127,6 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { ctx.complete(output); }) .addOrchestrator(orchestratorTwo, ctx -> { - String activityInput = ctx.getInput(String.class); ctx.sendEvent(instanceId, eventName, eventPayload); ctx.complete(finishMessage); }) @@ -1140,8 +1137,8 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); try (worker; client) { - client.scheduleNewOrchestrationInstance(orchestratorOne, input, instanceId); - String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorTwo, input); + client.scheduleNewOrchestrationInstance(orchestratorOne, null, instanceId); + String orchestratorTwoID = client.scheduleNewOrchestrationInstance(orchestratorTwo); OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( instanceId, @@ -1150,79 +1147,79 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { assertNotNull(orchestratorOneInstance); assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); - String outputTwo = orchestratorOneInstance.readOutputAs(String.class); + String outputOne = orchestratorOneInstance.readOutputAs(String.class); String expected = String.format("Hello, %s!", eventPayload); - assertEquals(expected, outputTwo); - - OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( - orchestratorOneID, - defaultTimeout, - true); - - assertNotNull(orchestratorTwoInstance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); - String outputOne = orchestratorTwoInstance.readOutputAs(String.class); - assertEquals(finishMessage, outputOne); - - } - } - - @Test - void orchestrationThenAccept() throws IOException, InterruptedException, TimeoutException { - final String orchestratorOne = "Orchestrator1"; - final String orchestratorTwo = "Orchestrator2"; - final String instanceId = "testId"; - final String eventName = "testEvent"; - final String eventPayload = "testPayload"; - final String activityName = "Echo"; - final String input = Instant.now().toString(); - int start = 1; - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorOne, ctx -> { - String activityInput = ctx.getInput(String.class); - String output = ctx - .callActivity(activityName, activityInput, String.class) - .thenAccept(result -> { - ctx.sendEvent(instanceId, eventName, eventPayload); - }).await(); - ctx.complete(output); - }) - .addOrchestrator(orchestratorTwo, ctx -> { - String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); - String output = ctx.callActivity(activityName, awaitInput, String.class).await(); - ctx.complete(output); - }) - .addActivity(activityName, ctx -> { - return String.format("Hello, %s!", ctx.getInput(String.class)); - }) - .buildAndStart(); - - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); - try (worker; client) { - client.scheduleNewOrchestrationInstance(orchestratorTwo, input, instanceId); - String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorOne, input); + assertEquals(expected, outputOne); OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( - instanceId, + orchestratorTwoID, defaultTimeout, true); assertNotNull(orchestratorTwoInstance); assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); - String expected = String.format("Hello, %s!", eventPayload); - assertEquals(expected, outputTwo); - - OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( - orchestratorOneID, - defaultTimeout, - true); - - assertNotNull(orchestratorOneInstance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); - String outputOne = orchestratorOneInstance.readOutputAs(String.class); - assertEquals(expected, outputOne); + assertEquals(finishMessage, outputTwo); } } + +// @Test +// void orchestrationThenAccept() throws IOException, InterruptedException, TimeoutException { +// final String orchestratorOne = "Orchestrator1"; +// final String orchestratorTwo = "Orchestrator2"; +// final String instanceId = "testId"; +// final String eventName = "testEvent"; +// final String eventPayload = "testPayload"; +// final String activityName = "Echo"; +// final String input = Instant.now().toString(); +// int start = 1; +// DurableTaskGrpcWorker worker = this.createWorkerBuilder() +// .addOrchestrator(orchestratorOne, ctx -> { +// String activityInput = ctx.getInput(String.class); +// String output = ctx +// .callActivity(activityName, activityInput, String.class) +// .thenAccept(result -> { +// ctx.sendEvent(instanceId, eventName, eventPayload); +// }).await(); +// ctx.complete(output); +// }) +// .addOrchestrator(orchestratorTwo, ctx -> { +// String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); +// String output = ctx.callActivity(activityName, awaitInput, String.class).await(); +// ctx.complete(output); +// }) +// .addActivity(activityName, ctx -> { +// return String.format("Hello, %s!", ctx.getInput(String.class)); +// }) +// .buildAndStart(); +// +// DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); +// try (worker; client) { +// client.scheduleNewOrchestrationInstance(orchestratorTwo, input, instanceId); +// String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorOne, input); +// +// OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( +// instanceId, +// defaultTimeout, +// true); +// +// assertNotNull(orchestratorTwoInstance); +// assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); +// String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); +// String expected = String.format("Hello, %s!", eventPayload); +// assertEquals(expected, outputTwo); +// +// OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( +// orchestratorOneID, +// defaultTimeout, +// true); +// +// assertNotNull(orchestratorOneInstance); +// assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); +// String outputOne = orchestratorOneInstance.readOutputAs(String.class); +// assertEquals(expected, outputOne); +// +// } +// } } diff --git a/samples-azure-functions/src/main/java/com/functions/SendEvent.java b/samples-azure-functions/src/main/java/com/functions/SendEvent.java index 59be74f..8e550e8 100644 --- a/samples-azure-functions/src/main/java/com/functions/SendEvent.java +++ b/samples-azure-functions/src/main/java/com/functions/SendEvent.java @@ -21,6 +21,10 @@ import java.util.concurrent.TimeoutException; public class SendEvent { + + private final String instanceId = "123"; + private final String eventName = "testEvent"; + private final String eventData = "Hello World!"; /** * This HTTP-triggered function starts the orchestration. */ @@ -31,7 +35,7 @@ public HttpResponseMessage waitEventOrchestration( final ExecutionContext context) throws TimeoutException, InterruptedException { context.getLogger().info("Java HTTP trigger processed a request."); DurableTaskClient client = durableContext.getClient(); - String instanceId = client.scheduleNewOrchestrationInstance("WaitEvent", null, "123"); + client.scheduleNewOrchestrationInstance("WaitEvent", null, instanceId); context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); return durableContext.createCheckStatusResponse(request, instanceId); } @@ -49,16 +53,17 @@ public HttpResponseMessage sendEventOrchestration( } // @FunctionName("WaitEvent") - public String waitEvent( + public void waitEvent( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { - String await = ctx.waitForExternalEvent("kcevent", String.class).await(); - return ctx.callActivity("Capitalize", await, String.class).await(); + String await = ctx.waitForExternalEvent(eventName, String.class).await(); + System.out.println(await); +// return ctx.callActivity("Capitalize", await, String.class).await(); } @FunctionName("SendEvent") public void sendEvent( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { - ctx.sendEvent("123", "kcevent", "Hello World!"); + ctx.sendEvent(instanceId, eventName, eventData); return; } } diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index 9f1afb8..54038a1 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -37,7 +37,6 @@ public void basicChain() throws InterruptedException { assertEquals("Completed", runTimeStatus); } - @Test public void continueAsNew() throws InterruptedException { String startOrchestrationPath = "api/ContinueAsNew"; From f882e8591bcbd3eb9c06c0fa11a5cb91b0331db5 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Tue, 20 Jun 2023 14:33:40 -0500 Subject: [PATCH 3/4] update sendEvent orchestration --- .../src/main/java/com/functions/SendEvent.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/samples-azure-functions/src/main/java/com/functions/SendEvent.java b/samples-azure-functions/src/main/java/com/functions/SendEvent.java index 8e550e8..413bf78 100644 --- a/samples-azure-functions/src/main/java/com/functions/SendEvent.java +++ b/samples-azure-functions/src/main/java/com/functions/SendEvent.java @@ -22,7 +22,7 @@ public class SendEvent { - private final String instanceId = "123"; + private final String instanceId = "waitEventID"; private final String eventName = "testEvent"; private final String eventData = "Hello World!"; /** @@ -47,23 +47,26 @@ public HttpResponseMessage sendEventOrchestration( final ExecutionContext context) throws TimeoutException, InterruptedException { context.getLogger().info("Java HTTP trigger processed a request."); DurableTaskClient client = durableContext.getClient(); - String instanceId = client.scheduleNewOrchestrationInstance("SendEvent"); + String instanceId = client.scheduleNewOrchestrationInstance("SendEvent", null, "sendEventID"); context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); return durableContext.createCheckStatusResponse(request, instanceId); } // @FunctionName("WaitEvent") - public void waitEvent( - @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + public String waitEvent( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx, + final ExecutionContext context) { String await = ctx.waitForExternalEvent(eventName, String.class).await(); - System.out.println(await); -// return ctx.callActivity("Capitalize", await, String.class).await(); + context.getLogger().info("Event received with payload: " + await); + return ctx.callActivity("Capitalize", await, String.class).await(); } @FunctionName("SendEvent") public void sendEvent( - @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx, + final ExecutionContext context) { ctx.sendEvent(instanceId, eventName, eventData); + context.getLogger().info("Event sent"); return; } } From a3a2d14c861290b1e3219c6cf6447ff1a1011b84 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Thu, 22 Jun 2023 12:16:01 -0500 Subject: [PATCH 4/4] simplify integration test --- .../durabletask/IntegrationTests.java | 70 +------------------ 1 file changed, 2 insertions(+), 68 deletions(-) diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index d0e0183..0b20722 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -1118,21 +1118,16 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { final String instanceId = "testId"; final String eventName = "testEvent"; final String eventPayload = "testPayload"; - final String activityName = "Echo"; final String finishMessage = "Finished Sending Event"; DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorOne, ctx -> { String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); - String output = ctx.callActivity(activityName, awaitInput, String.class).await(); - ctx.complete(output); + ctx.complete(awaitInput); }) .addOrchestrator(orchestratorTwo, ctx -> { ctx.sendEvent(instanceId, eventName, eventPayload); ctx.complete(finishMessage); }) - .addActivity(activityName, ctx -> { - return String.format("Hello, %s!", ctx.getInput(String.class)); - }) .buildAndStart(); DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); @@ -1148,8 +1143,7 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { assertNotNull(orchestratorOneInstance); assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); String outputOne = orchestratorOneInstance.readOutputAs(String.class); - String expected = String.format("Hello, %s!", eventPayload); - assertEquals(expected, outputOne); + assertEquals(eventPayload, outputOne); OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( orchestratorTwoID, @@ -1160,66 +1154,6 @@ void sendEvent() throws IOException, InterruptedException, TimeoutException { assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); assertEquals(finishMessage, outputTwo); - } } - -// @Test -// void orchestrationThenAccept() throws IOException, InterruptedException, TimeoutException { -// final String orchestratorOne = "Orchestrator1"; -// final String orchestratorTwo = "Orchestrator2"; -// final String instanceId = "testId"; -// final String eventName = "testEvent"; -// final String eventPayload = "testPayload"; -// final String activityName = "Echo"; -// final String input = Instant.now().toString(); -// int start = 1; -// DurableTaskGrpcWorker worker = this.createWorkerBuilder() -// .addOrchestrator(orchestratorOne, ctx -> { -// String activityInput = ctx.getInput(String.class); -// String output = ctx -// .callActivity(activityName, activityInput, String.class) -// .thenAccept(result -> { -// ctx.sendEvent(instanceId, eventName, eventPayload); -// }).await(); -// ctx.complete(output); -// }) -// .addOrchestrator(orchestratorTwo, ctx -> { -// String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); -// String output = ctx.callActivity(activityName, awaitInput, String.class).await(); -// ctx.complete(output); -// }) -// .addActivity(activityName, ctx -> { -// return String.format("Hello, %s!", ctx.getInput(String.class)); -// }) -// .buildAndStart(); -// -// DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); -// try (worker; client) { -// client.scheduleNewOrchestrationInstance(orchestratorTwo, input, instanceId); -// String orchestratorOneID = client.scheduleNewOrchestrationInstance(orchestratorOne, input); -// -// OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( -// instanceId, -// defaultTimeout, -// true); -// -// assertNotNull(orchestratorTwoInstance); -// assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); -// String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); -// String expected = String.format("Hello, %s!", eventPayload); -// assertEquals(expected, outputTwo); -// -// OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( -// orchestratorOneID, -// defaultTimeout, -// true); -// -// assertNotNull(orchestratorOneInstance); -// assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); -// String outputOne = orchestratorOneInstance.readOutputAs(String.class); -// assertEquals(expected, outputOne); -// -// } -// } }