Skip to content

Commit

Permalink
Deflake/rewrite TestActivityHeartbeatDetailsDuringRetry test (#2304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan authored Dec 15, 2021
1 parent 67b06d6 commit 8750b1c
Showing 1 changed file with 117 additions and 172 deletions.
289 changes: 117 additions & 172 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ package host

import (
"bytes"
"context"
"encoding/binary"
"errors"
"time"

"go.temporal.io/sdk/activity"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/common/convert"

"github.com/pborman/uuid"
Expand All @@ -42,7 +47,6 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/failure"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
Expand Down Expand Up @@ -174,177 +178,6 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
}
}

func (s *integrationSuite) TestActivityHeartbeatDetailsDuringRetry() {
id := "integration-heartbeat-details-retry-test"
wt := "integration-heartbeat-details-retry-type"
tl := "integration-heartbeat-details-retry-taskqueue"
identity := "worker1"
activityName := "activity_heartbeat_retry"

workflowType := &commonpb.WorkflowType{Name: wt}

taskQueue := &taskqueuepb.TaskQueue{Name: tl}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}

we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

workflowComplete := false
activitiesScheduled := false

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
if !activitiesScheduled {
activitiesScheduled = true
return []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "0",
ActivityType: &commonpb.ActivityType{Name: activityName},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: nil,
ScheduleToCloseTimeout: timestamp.DurationPtr(8 * time.Second),
ScheduleToStartTimeout: timestamp.DurationPtr(8 * time.Second),
StartToCloseTimeout: timestamp.DurationPtr(8 * time.Second),
HeartbeatTimeout: timestamp.DurationPtr(1 * time.Second),
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: timestamp.DurationPtr(1 * time.Second),
MaximumAttempts: 3,
MaximumInterval: timestamp.DurationPtr(1 * time.Second),
BackoffCoefficient: 1,
},
},
}},
}, nil
}

workflowComplete = true
s.Logger.Info("Completing Workflow")
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

activityExecutedCount := 0
heartbeatDetails := payloads.EncodeString("details")
atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType,
activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) {
s.Equal(id, execution.GetWorkflowId())
s.Equal(activityName, activityType.GetName())

var err error
if activityExecutedCount == 0 {
s.Logger.Info("Heartbeating for activity:", tag.WorkflowActivityID(activityID))
_, err = s.engine.RecordActivityTaskHeartbeat(NewContext(), &workflowservice.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken, Details: heartbeatDetails})
s.NoError(err)
// Trigger heartbeat timeout and retry
time.Sleep(time.Second * 4)
} else if activityExecutedCount == 1 {
// return an error and retry
err = errors.New("retryable-error")
}

activityExecutedCount++
return nil, false, err
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

describeWorkflowExecution := func() (*workflowservice.DescribeWorkflowExecutionResponse, error) {
return s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err, err)

for i := 0; i != 3; i++ {
err = poller.PollAndProcessActivityTask(false)
if i == 0 {
// first time, hearbeat timeout, respond activity complete will fail
s.Error(err)
} else {
// second time, retryable error
s.NoError(err)
}

dweResponse, err := describeWorkflowExecution()
s.NoError(err)

pendingActivities := dweResponse.GetPendingActivities()
if i == 2 {
// third time, complete activity, no pending info
s.Equal(0, len(pendingActivities))
} else {
s.Equal(1, len(pendingActivities))
pendingActivity := pendingActivities[0]

s.Equal(int32(3), pendingActivity.GetMaximumAttempts())
s.Equal(int32(i+2), pendingActivity.GetAttempt())
s.Equal(enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, pendingActivity.GetState())
if i == 0 {
s.Equal(failure.NewTimeoutFailure("activity timeout", enumspb.TIMEOUT_TYPE_HEARTBEAT), pendingActivity.GetLastFailure())
} else { // i == 1
expectedErrString := "retryable-error"
s.NotNil(pendingActivity.GetLastFailure().GetApplicationFailureInfo())
s.Equal(expectedErrString, pendingActivity.GetLastFailure().GetMessage())
s.False(pendingActivity.GetLastFailure().GetApplicationFailureInfo().GetNonRetryable())
}
s.Equal(identity, pendingActivity.GetLastWorkerIdentity())

scheduledTS := pendingActivity.ScheduledTime
lastHeartbeatTS := pendingActivity.LastHeartbeatTime
expirationTS := pendingActivity.ExpirationTime
s.NotZero(scheduledTS)
s.NotZero(lastHeartbeatTS)
s.NotZero(expirationTS)
s.Zero(pendingActivity.LastStartedTime)
s.True(scheduledTS.After(timestamp.TimeValue(lastHeartbeatTS)))
s.True(expirationTS.After(timestamp.TimeValue(scheduledTS)))

s.Equal(heartbeatDetails, pendingActivity.GetHeartbeatDetails())
}
}

_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)

s.True(workflowComplete)
s.Equal(3, activityExecutedCount)
}

func (s *integrationSuite) TestActivityRetry() {
id := "integration-activity-retry-test"
wt := "integration-activity-retry-type"
Expand Down Expand Up @@ -1028,3 +861,115 @@ func (s *integrationSuite) TestActivityCancellationNotStarted() {
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
}

func (s *clientIntegrationSuite) TestActivityHeartbeatDetailsDuringRetry() {
// Latest reported heartbeat on activity should be available throughout workflow execution or until activity succeeds.
// 1. Start workflow with single activity
// 2. First invocation of activity sets heartbeat details and times out.
// 3. Second invocation triggers retriable error.
// 4. Next invocations succeed.
// 5. Test should start polling for heartbeat details once first heartbeat was reported.
// 6. Once workflow completes -- we're done.

activityTimeout := time.Second

activityExecutedCount := 0
heartbeatDetails := 7771 // any value
heartbeatDetailsPayload, err := payloads.Encode(heartbeatDetails)
s.NoError(err)
activityFn := func(ctx context.Context) error {
var err error
if activityExecutedCount == 0 {
activity.RecordHeartbeat(ctx, heartbeatDetails)
time.Sleep(activityTimeout + time.Second)
} else if activityExecutedCount == 1 {
time.Sleep(activityTimeout / 2)
err = errors.New("retryable-error")
}

if activityExecutedCount > 0 {
s.True(activity.HasHeartbeatDetails(ctx))
var details int
s.NoError(activity.GetHeartbeatDetails(ctx, &details))
s.Equal(details, heartbeatDetails)
}

activityExecutedCount++
return err
}

var err1 error

activityId := "heartbeat_retry"
workflowFn := func(ctx workflow.Context) error {
activityRetryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second * 2,
BackoffCoefficient: 1,
MaximumInterval: time.Second * 2,
MaximumAttempts: 3,
}

ctx1 := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ActivityID: activityId,
ScheduleToStartTimeout: 2 * time.Second,
StartToCloseTimeout: 2 * time.Second,
RetryPolicy: activityRetryPolicy,
})
f1 := workflow.ExecuteActivity(ctx1, activityFn)

err1 = f1.Get(ctx1, nil)

return nil
}

s.worker.RegisterActivity(activityFn)
s.worker.RegisterWorkflow(workflowFn)

wfId := "integration-test-heartbeat-details-during-retry"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: wfId,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: 20 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

runId := workflowRun.GetRunID()

describeWorkflowExecution := func() (*workflowservice.DescribeWorkflowExecutionResponse, error) {
return s.engine.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: wfId,
RunId: runId,
},
})
}

time.Sleep(time.Second) // wait for the timeout to trigger

for dweResult, dweErr := describeWorkflowExecution(); dweResult.GetWorkflowExecutionInfo().GetCloseTime() == nil; dweResult, dweErr = describeWorkflowExecution() {
s.NoError(dweErr)
s.NotNil(dweResult.GetWorkflowExecutionInfo())
s.LessOrEqual(len(dweResult.PendingActivities), 1)

if dweResult.PendingActivities != nil && len(dweResult.PendingActivities) == 1 {
details := dweResult.PendingActivities[0].GetHeartbeatDetails()
s.Equal(heartbeatDetailsPayload, details)
}

time.Sleep(time.Millisecond * 100)
}

err = workflowRun.Get(ctx, nil)
s.NoError(err)

s.NoError(err1)
}

0 comments on commit 8750b1c

Please sign in to comment.