Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented sendEvent logics #144

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ build/
.project
.settings
.classpath
repo/
repo/
Nuget.Config
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,19 @@ private void handleSubOrchestrationFailed(HistoryEvent e){
task.completeExceptionally(exception);
}

private void handleEventSentEvent(HistoryEvent e) {
int taskId = e.getEventId();
EventSentEvent eventSentEvent = e.getEventSent();
OrchestratorAction taskAction = this.pendingActions.remove(taskId);
if (taskAction == null) {
String message = String.format(
"Non-deterministic orchestrator detected: a history event scheduling an sent-event task with sequence ID %d and name '%s' was replayed but the current orchestrator implementation didn't actually schedule this task. Was a change made to the orchestrator code after this instance had already started running?",
taskId,
eventSentEvent.getName());
throw new NonDeterministicOrchestratorException(message);
}
}

private void handleExecutionTerminated(HistoryEvent e) {
ExecutionTerminatedEvent executionTerminatedEvent = e.getExecutionTerminated();
this.completeInternal(executionTerminatedEvent.getInput().getValue(), null, OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED);
Expand Down Expand Up @@ -772,6 +785,7 @@ private void processEvent(HistoryEvent e) {
this.setCurrentInstant(instant);
break;
case ORCHESTRATORCOMPLETED:
case EXECUTIONCOMPLETED:
// No action
break;
case EXECUTIONSTARTED:
Expand All @@ -791,8 +805,6 @@ private void processEvent(HistoryEvent e) {
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
// case EXECUTIONCOMPLETED:
// break;
// case EXECUTIONFAILED:
// break;
case EXECUTIONTERMINATED:
Expand Down Expand Up @@ -822,8 +834,10 @@ private void processEvent(HistoryEvent e) {
case SUBORCHESTRATIONINSTANCEFAILED:
this.handleSubOrchestrationFailed(e);
break;
// case EVENTSENT:
// break;
case EVENTSENT:
this.handleEventSentEvent(e);
// this.handleEventRaised(e);
break;
case EVENTRAISED:
this.handleEventRaised(e);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,4 +1110,50 @@ private static String getExceptionMessage(String taskName, int expectedTaskId, S
expectedTaskId,
expectedExceptionMessage);
}

@Test
void sendEvent() throws IOException, InterruptedException, TimeoutException {
final String orchestratorOne = "Orchestrator1";
final String orchestratorTwo = "Orchestrator2";
final String instanceId = "testId";
final String eventName = "testEvent";
final String eventPayload = "testPayload";
final String finishMessage = "Finished Sending Event";
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorOne, ctx -> {
String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await();
ctx.complete(awaitInput);
})
.addOrchestrator(orchestratorTwo, ctx -> {
ctx.sendEvent(instanceId, eventName, eventPayload);
ctx.complete(finishMessage);
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.scheduleNewOrchestrationInstance(orchestratorOne, null, instanceId);
String orchestratorTwoID = client.scheduleNewOrchestrationInstance(orchestratorTwo);

OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion(
instanceId,
defaultTimeout,
true);

assertNotNull(orchestratorOneInstance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus());
String outputOne = orchestratorOneInstance.readOutputAs(String.class);
assertEquals(eventPayload, outputOne);

OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion(
orchestratorTwoID,
defaultTimeout,
true);

assertNotNull(orchestratorTwoInstance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus());
String outputTwo = orchestratorTwoInstance.readOutputAs(String.class);
assertEquals(finishMessage, outputTwo);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.functions;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

public class SendEvent {

private final String instanceId = "waitEventID";
private final String eventName = "testEvent";
private final String eventData = "Hello World!";
/**
* This HTTP-triggered function starts the orchestration.
*/
@FunctionName("WaitEventOrchestration")
public HttpResponseMessage waitEventOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) throws TimeoutException, InterruptedException {
context.getLogger().info("Java HTTP trigger processed a request.");
DurableTaskClient client = durableContext.getClient();
client.scheduleNewOrchestrationInstance("WaitEvent", null, instanceId);
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("SendEventOrchestration")
public HttpResponseMessage sendEventOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) throws TimeoutException, InterruptedException {
context.getLogger().info("Java HTTP trigger processed a request.");
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("SendEvent", null, "sendEventID");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}
//
@FunctionName("WaitEvent")
public String waitEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
final ExecutionContext context) {
String await = ctx.waitForExternalEvent(eventName, String.class).await();
context.getLogger().info("Event received with payload: " + await);
return ctx.callActivity("Capitalize", await, String.class).await();
}

@FunctionName("SendEvent")
public void sendEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
final ExecutionContext context) {
ctx.sendEvent(instanceId, eventName, eventData);
context.getLogger().info("Event sent");
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public void basicChain() throws InterruptedException {
assertEquals("Completed", runTimeStatus);
}


@Test
public void continueAsNew() throws InterruptedException {
String startOrchestrationPath = "api/ContinueAsNew";
Expand All @@ -59,4 +58,27 @@ public void continueAsNew() throws InterruptedException {
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
assertEquals("Terminated", runTimeStatus);
}

@Test
public void sendEvent() throws InterruptedException {
String waitEventOrchestrationPath = "api/WaitEventOrchestration";
String SendEventOrchestrationPath = "api/SendEventOrchestration";
Response waitEventOrchestrationResponse = post(waitEventOrchestrationPath);
JsonPath jsonPath = waitEventOrchestrationResponse.jsonPath();
String waitEventOrchestrationStatusQueryGetUri = jsonPath.get("statusQueryGetUri");
String runTimeStatus;
for (int i = 0; i < 10; i++) {
Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri);
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
assertEquals("Running", runTimeStatus);
Thread.sleep(1000);
}

post(SendEventOrchestrationPath);
Thread.sleep(1000);

Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri);
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
assertEquals("Completed", runTimeStatus);
}
}