Skip to content

Commit

Permalink
Wrap GRPC::CANCELED and DEADLINE_EXCEEDED in new exception type (#2172)
Browse files Browse the repository at this point in the history
* Wrap GRPC::CANCELED and DEADLINE_EXCEEDED
  • Loading branch information
Quinn-With-Two-Ns authored Aug 6, 2024
1 parent 98b2e78 commit 531d3cb
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.temporal.api.common.v1.WorkflowExecution;

/**
* Error that occurs when an update call times out or is cancelled.
*
* <p>Note, this is not related to any general concept of timing out or cancelling a running update,
* this is only related to the client call itself.
*/
public class WorkflowUpdateTimeoutOrCancelledException extends WorkflowServiceException {
public WorkflowUpdateTimeoutOrCancelledException(
WorkflowExecution execution, String updateId, String updateName, Throwable cause) {
super(execution, "", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
// does not exist or because the update ID does not exist.
throw sre;
}
throw sre;
} else if (failure instanceof WorkflowException) {
throw (WorkflowException) failure;
} else if (failure instanceof TimeoutException) {
throw new CompletionException((TimeoutException) failure);
throw new CompletionException(failure);
}
throw new WorkflowServiceException(execution, workflowType, (Throwable) failure);
throw new WorkflowServiceException(execution, workflowType, failure);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,18 @@ public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage();
do {
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
result = genericClient.update(updateRequest, pollTimeoutDeadline);
} while (result.getStage().getNumber() < waitForStage.getNumber()
try {
result = genericClient.update(updateRequest, pollTimeoutDeadline);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED
|| e.getStatus().getCode() == Status.Code.CANCELLED) {
throw new WorkflowUpdateTimeoutOrCancelledException(
input.getWorkflowExecution(), input.getUpdateName(), input.getUpdateId(), e);
}
throw e;
}

} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
&& result.getStage().getNumber()
< UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
Expand Down Expand Up @@ -466,17 +476,17 @@ private void pollWorkflowUpdateHelper(
return;
}
if ((e instanceof StatusRuntimeException
&& ((StatusRuntimeException) e).getStatus().getCode()
== Status.Code.DEADLINE_EXCEEDED)
&& (((StatusRuntimeException) e).getStatus().getCode()
== Status.Code.DEADLINE_EXCEEDED
|| ((StatusRuntimeException) e).getStatus().getCode()
== Status.Code.CANCELLED))
|| deadline.isExpired()) {
resultCF.completeExceptionally(
new TimeoutException(
"WorkflowId="
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId()
+ ", runId="
+ request.getUpdateRef().getWorkflowExecution().getRunId()
+ ", updateId="
+ request.getUpdateRef().getUpdateId()));
new WorkflowUpdateTimeoutOrCancelledException(
request.getUpdateRef().getWorkflowExecution(),
request.getUpdateRef().getUpdateId(),
"",
e));
} else if (e != null) {
resultCF.completeExceptionally(e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@
import static org.junit.Assert.assertEquals;

import com.google.common.base.Stopwatch;
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowStub;
import io.temporal.client.WorkflowUpdateStage;
import io.temporal.client.*;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -111,7 +107,9 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
// Verify get throws the correct exception in around the right amount of time
Stopwatch stopWatch = Stopwatch.createStarted();
ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
assertThat(executionException.getCause(), is(instanceOf(TimeoutException.class)));
assertThat(
executionException.getCause(),
is(instanceOf(WorkflowUpdateTimeoutOrCancelledException.class)));
stopWatch.stop();
long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS);
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.updateTest;

import static org.junit.Assert.assertThrows;

import io.grpc.Context;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.*;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.CompletablePromise;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class UpdateExceptionWrapped {

private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(WorkerOptions.newBuilder().build())
.setWorkflowTypes(TestUpdateWorkflowImpl.class)
.build();

@Test
public void testUpdateStart() {
String workflowId = UUID.randomUUID().toString();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setWorkflowId(workflowId)
.build();
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
// To execute workflow client.execute() would do. But we want to start workflow and immediately
// return.
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
testWorkflowRule.getTestEnvironment().shutdownNow();
testWorkflowRule.getTestEnvironment().awaitTermination(1000, TimeUnit.MILLISECONDS);

final AtomicReference<WorkflowUpdateTimeoutOrCancelledException> exception =
new AtomicReference<>();

Context.current()
.withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduledExecutor)
.run(
() ->
exception.set(
assertThrows(
WorkflowUpdateTimeoutOrCancelledException.class,
() -> workflow.update(0, ""))));
Assert.assertEquals(execution.getWorkflowId(), exception.get().getExecution().getWorkflowId());
}

public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
String state = "initial";
List<String> updates = new ArrayList<>();
CompletablePromise<Void> promise = Workflow.newPromise();

@Override
public String execute() {
promise.get();
return "";
}

@Override
public String getState() {
return state;
}

@Override
public String update(Integer index, String value) {
Workflow.await(() -> false);
return "";
}

@Override
public void updateValidator(Integer index, String value) {}

@Override
public void complete() {
promise.complete(null);
}

@Override
public void completeValidator() {}
}
}

0 comments on commit 531d3cb

Please sign in to comment.