Skip to content

Commit

Permalink
Test server support for Nexus operation complete before start (#2348)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner authored Jan 8, 2025
1 parent 9a8894a commit cf06131
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.*;
import io.temporal.api.nexus.v1.Link;
import io.temporal.api.nexus.v1.StartOperationResponse;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.workflowservice.v1.*;
Expand Down Expand Up @@ -116,6 +117,9 @@ void startNexusOperation(

void completeNexusOperation(NexusOperationRef ref, Payload result);

void completeAsyncNexusOperation(
NexusOperationRef ref, Payload result, String operationID, Link startLink);

void failNexusOperation(NexusOperationRef ref, Failure failure);

boolean validateOperationTaskToken(NexusTaskToken tt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.testservice;

import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.*;
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
import static io.temporal.internal.testservice.StateMachines.*;
import static io.temporal.internal.testservice.StateUtils.mergeMemo;
Expand Down Expand Up @@ -1711,9 +1712,24 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
log.warn("skipping non-nexus completion callback");
continue;
}

String serializedRef = cb.getNexus().getHeaderOrThrow("operation-reference");
NexusOperationRef ref = NexusOperationRef.fromBytes(serializedRef.getBytes());
service.completeNexusOperation(ref, completionEvent.get());

io.temporal.api.nexus.v1.Link startLink =
workflowEventToNexusLink(
Link.WorkflowEvent.newBuilder()
.setNamespace(ctx.getNamespace())
.setWorkflowId(ctx.getExecution().getWorkflowId())
.setRunId(ctx.getExecution().getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
.build())
.build());

service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
}
}

Expand Down Expand Up @@ -2252,6 +2268,32 @@ public void completeNexusOperation(NexusOperationRef ref, Payload result) {
});
}

@Override
public void completeAsyncNexusOperation(
NexusOperationRef ref,
Payload result,
String operationID,
io.temporal.api.nexus.v1.Link startLink) {
update(
ctx -> {
StateMachine<NexusOperationData> operation =
getPendingNexusOperation(ref.getScheduledEventId());
if (operation.getState() == State.INITIATED) {
// Received completion before start, so fabricate started event.
StartOperationResponse.Async start =
StartOperationResponse.Async.newBuilder()
.setOperationId(operationID)
.addLinks(startLink)
.build();
operation.action(Action.START, ctx, start, 0);
}
operation.action(Action.COMPLETE, ctx, result, 0);
nexusOperations.remove(ref.getScheduledEventId());
scheduleWorkflowTask(ctx);
ctx.unlockTimer("completeNexusOperation");
});
}

@Override
public void failNexusOperation(NexusOperationRef ref, Failure failure) {
update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.nexus.v1.HandlerError;
import io.temporal.api.nexus.v1.Request;
import io.temporal.api.nexus.v1.StartOperationResponse;
import io.temporal.api.nexus.v1.UnsuccessfulOperationError;
import io.temporal.api.nexus.v1.*;
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
import io.temporal.api.testservice.v1.SleepRequest;
import io.temporal.api.testservice.v1.TestServiceGrpc;
Expand Down Expand Up @@ -899,7 +896,8 @@ public void respondNexusTaskFailed(
}
}

public void completeNexusOperation(NexusOperationRef ref, HistoryEvent completionEvent) {
public void completeNexusOperation(
NexusOperationRef ref, String operationID, Link startLink, HistoryEvent completionEvent) {
TestWorkflowMutableState target = getMutableState(ref.getExecutionId());

switch (completionEvent.getEventType()) {
Expand All @@ -912,7 +910,7 @@ public void completeNexusOperation(NexusOperationRef ref, HistoryEvent completio
// Nexus does not support it.
Payload p =
(result.getPayloadsCount() > 0) ? result.getPayloads(0) : Payload.getDefaultInstance();
target.completeNexusOperation(ref, p);
target.completeAsyncNexusOperation(ref, p, operationID, startLink);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
Failure f =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,66 @@ public void testNexusOperationAsyncCompletion() {
}
}

@Test
public void testNexusOperationAsyncCompletionBeforeStart() {
WorkflowStub callerStub = newWorkflowStub("TestNexusOperationAsyncCompletionWorkflow");
WorkflowExecution callerExecution = callerStub.start();

// Get first WFT and respond with ScheduleNexusOperation command
PollWorkflowTaskQueueResponse callerTask = pollWorkflowTask();
completeWorkflowTask(callerTask.getTaskToken(), newScheduleOperationCommand());

// Poll for Nexus task with start request but do not complete it
Request startReq;
try {
startReq = pollNexusTask().get().getRequest();
} catch (Exception e) {
Assert.fail(e.getMessage());
return;
}

// Manually start handler WF with callback
TaskQueue handlerWFTaskQueue = TaskQueue.newBuilder().setName("nexus-handler-tq").build();
testWorkflowRule
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.startWorkflowExecution(
StartWorkflowExecutionRequest.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
.setWorkflowId("TestNexusOperationAsyncHandlerWorkflow")
.setWorkflowType(WorkflowType.newBuilder().setName("EchoNexusHandlerWorkflowImpl"))
.setTaskQueue(handlerWFTaskQueue)
.setInput(Payloads.newBuilder().addPayloads(defaultInput))
.setIdentity("test")
.addAllLinks(
startReq.getStartOperation().getLinksList().stream()
.map(LinkConverter::nexusLinkToWorkflowEvent)
.collect(Collectors.toList()))
.addCompletionCallbacks(
Callback.newBuilder()
.setNexus(
Callback.Nexus.newBuilder()
.setUrl(startReq.getStartOperation().getCallback())
.putAllHeader(startReq.getStartOperation().getCallbackHeaderMap())))
.build());

// Complete handler workflow
PollWorkflowTaskQueueResponse handlerTask = pollWorkflowTask(handlerWFTaskQueue);
completeWorkflow(
handlerTask.getTaskToken(),
Payload.newBuilder().setData(ByteString.copyFromUtf8("operation result")).build());

// Verify operation start and completion are recorded and triggers caller workflow progress
callerTask = pollWorkflowTask();
testWorkflowRule.assertHistoryEvent(
callerExecution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED);
testWorkflowRule.assertHistoryEvent(
callerExecution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED);
completeWorkflow(callerTask.getTaskToken());
}

@Test
public void testNexusOperationAsyncHandlerCanceled() {
String operationId = UUID.randomUUID().toString();
Expand Down

0 comments on commit cf06131

Please sign in to comment.