Skip to content

Commit

Permalink
Set LastHeartbeatDetails on activity failure (#2354)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Jan 6, 2025
1 parent ff333ca commit 9a8894a
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;

public interface ActivityExecutionContextFactory {
ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
InternalActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -61,7 +60,8 @@ public ActivityExecutionContextFactoryImpl(
}

@Override
public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) {
public InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Scope metricsScope) {
return new ActivityExecutionContextImpl(
service,
namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* @see ActivityExecutionContext
*/
@ThreadSafe
class ActivityExecutionContextImpl implements ActivityExecutionContext {
class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
private final Lock lock = new ReentrantLock();
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
private final Functions.Proc completionHandle;
Expand Down Expand Up @@ -165,4 +165,9 @@ public Scope getMetricsScope() {
public ActivityInfo getInfo() {
return info;
}

@Override
public Object getLastHeartbeatValue() {
return heartbeatContext.getLastHeartbeatDetails();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.DynamicActivity;
import io.temporal.api.common.v1.Payload;
Expand Down Expand Up @@ -76,7 +75,8 @@ public BaseActivityTaskExecutor(

@Override
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope);
InternalActivityExecutionContext context =
executionContextFactory.createContext(info, metricsScope);
ActivityInfo activityInfo = context.getInfo();
ActivitySerializationContext serializationContext =
new ActivitySerializationContext(
Expand Down Expand Up @@ -133,7 +133,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
}

return mapToActivityFailure(
ex, info.getActivityId(), metricsScope, local, dataConverterWithActivityContext);
ex,
info.getActivityId(),
context.getLastHeartbeatValue(),
metricsScope,
local,
dataConverterWithActivityContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ public Result handle(ActivityTask activityTask, Scope metricsScope, boolean loca
+ knownTypes);
} catch (Exception exception) {
return mapToActivityFailure(
exception, pollResponse.getActivityId(), metricsScope, localActivity, dataConverter);
exception,
pollResponse.getActivityId(),
null,
metricsScope,
localActivity,
dataConverter);
}
}

Expand Down Expand Up @@ -186,6 +191,7 @@ private void registerActivityImplementation(Object activity) {
static ActivityTaskHandler.Result mapToActivityFailure(
Throwable exception,
String activityId,
@Nullable Object lastHeartbeatDetails,
Scope metricsScope,
boolean isLocalActivity,
DataConverter dataConverter) {
Expand All @@ -212,6 +218,9 @@ static ActivityTaskHandler.Result mapToActivityFailure(
Failure failure = dataConverter.exceptionToFailure(exception);
RespondActivityTaskFailedRequest.Builder result =
RespondActivityTaskFailedRequest.newBuilder().setFailure(failure);
if (lastHeartbeatDetails != null) {
dataConverter.toPayloads(lastHeartbeatDetails).ifPresent(result::setLastHeartbeatDetails);
}
return new ActivityTaskHandler.Result(
activityId,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ interface HeartbeatContext {
* @see io.temporal.activity.ActivityExecutionContext#getHeartbeatDetails(Class, Type)
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

Object getLastHeartbeatDetails();
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
}
}

@Override
public Object getLastHeartbeatDetails() {
lock.lock();
try {
if (receivedAHeartbeat) {
return this.lastDetails;
}
return null;
} finally {
lock.unlock();
}
}

private void doHeartBeatLocked(Object details) {
long nextHeartbeatDelay;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.activity;

import io.temporal.activity.ActivityExecutionContext;

/**
* Internal context object passed to an Activity implementation, providing more internal details
* than the user facing {@link ActivityExecutionContext}.
*/
public interface InternalActivityExecutionContext extends ActivityExecutionContext {
/** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */
Object getLastHeartbeatValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;

public class LocalActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {

public LocalActivityExecutionContextFactoryImpl() {}

@Override
public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) {
public InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Scope metricsScope) {
return new LocalActivityExecutionContextImpl(info, metricsScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
import io.temporal.client.ActivityCompletionException;
import java.lang.reflect.Type;
import java.util.Optional;

class LocalActivityExecutionContextImpl implements ActivityExecutionContext {
class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext {
private final ActivityInfo info;
private final Scope metricsScope;

Expand Down Expand Up @@ -88,4 +87,9 @@ public ManualActivityCompletionClient useLocalManualCompletion() {
public Scope getMetricsScope() {
return metricsScope;
}

@Override
public Object getLastHeartbeatValue() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.activity;

import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import org.junit.Rule;
import org.junit.Test;

public class ActivityHeartbeatSentOnFailureTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(new HeartBeatingActivityImpl())
.build();

/** Tests that the last Activity#heartbeat value is sent if the activity fails. */
@Test
public void activityHeartbeatSentOnFailure() {
TestWorkflows.NoArgsWorkflow workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class);
workflow.execute();
}

public static class TestWorkflowImpl implements TestWorkflows.NoArgsWorkflow {

private final TestActivities.NoArgsActivity activities =
Workflow.newActivityStub(
TestActivities.NoArgsActivity.class,
SDKTestOptions.newActivityOptions20sScheduleToClose());

@Override
public void execute() {
activities.execute();
}
}

public static class HeartBeatingActivityImpl implements TestActivities.NoArgsActivity {
@Override
public void execute() {
// If the heartbeat details are "3", then we know that the last heartbeat was sent.
if (Activity.getExecutionContext().getHeartbeatDetails(String.class).orElse("").equals("3")) {
return;
}
// Send 3 heartbeats and then fail, expecting the last heartbeat to be sent
// even though the activity fails and the last two attempts would normally be throttled.
Activity.getExecutionContext().heartbeat("1");
Activity.getExecutionContext().heartbeat("2");
Activity.getExecutionContext().heartbeat("3");
throw new RuntimeException("simulated failure");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1956,9 +1956,11 @@ private static State failActivityTask(
RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
if (request instanceof RespondActivityTaskFailedRequest) {
RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
data.heartbeatDetails = req.getLastHeartbeatDetails();
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
} else if (request instanceof RespondActivityTaskFailedByIdRequest) {
RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
data.heartbeatDetails = req.getLastHeartbeatDetails();
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
} else {
throw new IllegalArgumentException("Unknown request: " + request);
Expand Down

0 comments on commit 9a8894a

Please sign in to comment.