Skip to content

Commit

Permalink
Update Java SDK for Temporal Sever v1.26.2 (#2357)
Browse files Browse the repository at this point in the history
Lower refreshNexusEndpointsMinWait
  • Loading branch information
Quinn-With-Two-Ns authored Jan 6, 2025
1 parent c3e0e77 commit 82d3c93
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 76 deletions.
4 changes: 3 additions & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ component.nexusoperations.callback.endpoint.template:
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
AllowInsecure: true
AllowInsecure: true
system.refreshNexusEndpointsMinWait:
- value: 1ms
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
}
case FAILUREINFO_NOT_SET:
default:
throw new IllegalArgumentException("Failure info not set");
// All unknown types are considered to be retryable ApplicationError.
return ApplicationFailure.newFromValues(
failure.getMessage(),
"",
false,
new EncodedValues(Optional.empty(), dataConverter),
cause,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ public Throwable wrapFailure(ActivityTask t, Throwable failure) {
failure);
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private void sendReply(
ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ private WorkflowTaskHandler.Result handleTask(
}
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
ByteString taskToken,
RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
Expand Down Expand Up @@ -514,6 +516,8 @@ private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
grpcRetryOptions);
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private void sendTaskFailed(
ByteString taskToken,
RespondWorkflowTaskFailedRequest.Builder taskFailed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.*;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;

/**
Expand All @@ -70,13 +69,11 @@
*/
@RunWith(JUnitParamsRunner.class)
public class ActivityTimeoutTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build();

// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quick shutdown
public @Rule Timeout timeout = Timeout.seconds(15);
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build();

/**
* An activity reaches startToClose timeout once, max retries are set to 1. o
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ public void syncOperationImmediatelyCancelled() {
"operation canceled before it was started", canceledFailure.getOriginalMessage());
}

@Test
public void syncOperationCancelled() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure);
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.NexusOperationFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
Expand All @@ -54,10 +55,20 @@ public void terminateAsyncOperation() {
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure);
Assert.assertEquals(
"operation terminated",
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
// TODO(https://github.com/temporalio/sdk-java/issues/2358): Test server needs to be fixed to
// return the correct type
Assert.assertTrue(
nexusFailure.getCause() instanceof ApplicationFailure
|| nexusFailure.getCause() instanceof TerminatedFailure);
if (nexusFailure.getCause() instanceof ApplicationFailure) {
Assert.assertEquals(
"operation terminated",
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
} else {
Assert.assertEquals(
"operation terminated",
((TerminatedFailure) nexusFailure.getCause()).getOriginalMessage());
}
}

@Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ static final class ActivityTaskData {
TestServiceRetryState retryState;
Duration nextBackoffInterval;
String identity;
Timestamp lastAttemptCompleteTime;

ActivityTaskData(
TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
Expand Down Expand Up @@ -2112,6 +2113,7 @@ private static RetryState attemptActivityRetry(
ctx.onCommit(
(historySize) -> {
data.retryState = nextAttempt;
data.lastAttemptCompleteTime = ctx.currentTime();
task.setAttempt(nextAttempt.getAttempt());
task.setCurrentAttemptScheduledTime(ctx.currentTime());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -86,6 +87,17 @@
import org.slf4j.LoggerFactory;

class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
static final Failure FAILED_UPDATE_ON_WF_COMPLETION =
Failure.newBuilder()
.setMessage(
"Workflow Update failed because the Workflow completed before the Update completed.")
.setSource("Server")
.setApplicationFailureInfo(
ApplicationFailureInfo.newBuilder()
.setType("AcceptedUpdateCompletedWorkflow")
.setNonRetryable(true)
.build())
.build();

/**
* If the implementation throws an exception, changes accumulated in the RequestContext will not
Expand Down Expand Up @@ -541,6 +553,7 @@ public void completeWorkflowTask(
|| request.getForceCreateNewWorkflowTask())) {
scheduleWorkflowTask(ctx);
}

workflowTaskStateMachine.getData().bufferedEvents.clear();
Map<String, ConsistentQuery> queries = data.consistentQueryRequests;
Map<String, WorkflowQueryResult> queryResultsMap = request.getQueryResultsMap();
Expand Down Expand Up @@ -1671,6 +1684,27 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
return;
}

updates.forEach(
(k, updateStateMachine) -> {
if (!(updateStateMachine.getState() == StateMachines.State.COMPLETED
|| updateStateMachine.getState() == StateMachines.State.FAILED)) {
updateStateMachine.action(
Action.COMPLETE,
ctx,
Message.newBuilder()
.setBody(
Any.pack(
Response.newBuilder()
.setOutcome(
Outcome.newBuilder()
.setFailure(FAILED_UPDATE_ON_WF_COMPLETION)
.build())
.build()))
.build(),
completionEvent.get().getEventId());
}
});

for (Callback cb : startRequest.getCompletionCallbacksList()) {
if (!cb.hasNexus()) {
// test server only supports nexus callbacks currently
Expand Down Expand Up @@ -3101,6 +3135,10 @@ private static PendingActivityInfo constructPendingActivityInfo(
builder.setLastWorkerIdentity(activityTaskData.identity);
}

if (activityTaskData.lastAttemptCompleteTime != null) {
builder.setLastAttemptCompleteTime(activityTaskData.lastAttemptCompleteTime);
}

// Some ids are only present in the schedule event...
if (activityTaskData.scheduledEvent != null) {
populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent);
Expand Down Expand Up @@ -3145,12 +3183,8 @@ private static void populatePendingActivityInfoFromScheduledEvent(

private static void populatePendingActivityInfoFromPollResponse(
PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) {
// In golang, we set one but never both of these fields, depending on the activity state
if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) {
builder.setScheduledTime(task.getScheduledTime());
} else {
builder.setLastStartedTime(task.getStartedTime());
}
builder.setScheduledTime(task.getScheduledTime());
builder.setLastStartedTime(task.getStartedTime());
}

private static void populatePendingActivityInfoFromHeartbeatDetails(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void testSuccessfulActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.build();
Expand Down Expand Up @@ -266,8 +267,10 @@ public void testFailedActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.setLastAttemptCompleteTime(actual.getLastAttemptCompleteTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
// it.
.setLastWorkerIdentity(actual.getLastWorkerIdentity())
Expand Down Expand Up @@ -333,6 +336,7 @@ private void testKilledWorkflow(
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.testserver.functional;

import static org.junit.Assume.assumeFalse;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payloads;
Expand Down Expand Up @@ -548,7 +546,6 @@ public void updateAndPollByWorkflowId() {
@Test
public void getCompletedUpdateOfCompletedWorkflow() {
// Assert that we can get and poll a completed update from a completed workflow.
assumeFalse("Skipping as real server has a bug", SDKTestWorkflowRule.useExternalService);

WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
Expand Down Expand Up @@ -593,7 +590,7 @@ public void getCompletedUpdateOfCompletedWorkflow() {

@Test
public void getIncompleteUpdateOfCompletedWorkflow() {
// Assert that we can't get an incomplete update of a completed workflow. Expect a NOT_FOUND
// Assert that the server fails an incomplete update if the workflow is completed.
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();

Expand All @@ -617,48 +614,60 @@ public void getIncompleteUpdateOfCompletedWorkflow() {
workflowStub.signal();
workflowStub.execute();

StatusRuntimeException exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());

response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());

PollWorkflowExecutionUpdateResponse pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());

pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());
}

private void assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(Outcome outcome) {
Assert.assertEquals(
"Workflow Update failed because the Workflow completed before the Update completed.",
outcome.getFailure().getMessage());
Assert.assertEquals(
"AcceptedUpdateCompletedWorkflow",
outcome.getFailure().getApplicationFailureInfo().getType());
}

private UpdateWorkflowExecutionResponse updateWorkflow(
Expand Down

0 comments on commit 82d3c93

Please sign in to comment.