Skip to content

Commit

Permalink
Improve verification APIs error handling (#3064)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 18, 2022
1 parent db09e28 commit d0ebe68
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 33 deletions.
11 changes: 2 additions & 9 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,22 +2258,15 @@ func (e *historyEngineImpl) VerifyChildExecutionCompletionRecorded(
),
)
if err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
// workflow not found error, verification logic need to keep waiting in this case
// if we return NotFound directly, caller can't tell if it's workflow not found or child not found
// standby logic will continue verification
return consts.ErrWorkflowNotReady
}
return err
}
defer func() { workflowContext.GetReleaseFn()(retError) }()

mutableState := workflowContext.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() &&
mutableState.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
// standby logic will stop verification as the parent has already completed
// and can't be blocked after failover.
return consts.ErrWorkflowCompleted
// parent has already completed and can't be blocked after failover.
return nil
}

onCurrentBranch, err := historyEventOnCurrentBranch(mutableState, request.ParentInitiatedId, request.ParentInitiatedVersion)
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_WorkflowNotExi
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.NotFound{})

err := s.historyEngine.VerifyChildExecutionCompletionRecorded(metrics.AddMetricsContext(context.Background()), request)
s.IsType(&serviceerror.WorkflowNotReady{}, err)
s.IsType(&serviceerror.NotFound{}, err)
}

func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_WorkflowClosed() {
Expand Down Expand Up @@ -1763,7 +1763,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_WorkflowClosed
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)

err = s.historyEngine.VerifyChildExecutionCompletionRecorded(metrics.AddMetricsContext(context.Background()), request)
s.IsType(&serviceerror.NotFound{}, err)
s.NoError(err)
}

func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_InitiatedEventNotFound() {
Expand Down
12 changes: 4 additions & 8 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,9 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution(
Clock: executionInfo.ParentClock,
})
switch err.(type) {
case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
// NOTE: NotFound is only returned when workflow already completed
// If workflow can't be found at all, WorkflowNotReady error will be returned.
case nil, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
return nil, nil
case *serviceerror.WorkflowNotReady:
case *serviceerror.NotFound, *serviceerror.WorkflowNotReady:
return verifyChildCompletionRecordedInfo, nil
default:
t.logger.Error("Failed to verify child execution completion recoreded",
Expand Down Expand Up @@ -450,11 +448,9 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
Clock: childWorkflowInfo.Clock,
})
switch err.(type) {
case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
// NOTE: NotFound is only returned when workflow already completed
// If workflow can't be found at all, WorkflowNotReady error will be returned.
case nil, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
return nil, nil
case *serviceerror.WorkflowNotReady:
case *serviceerror.NotFound, *serviceerror.WorkflowNotReady:
return &startChildExecutionPostActionInfo{}, nil
default:
t.logger.Error("Failed to verify first workflow task scheduled",
Expand Down
20 changes: 14 additions & 6 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,21 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() {

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).Times(5)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).AnyTimes()

s.mockShard.SetCurrentTime(s.clusterName, now)
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, nil)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, consts.ErrWorkflowExecutionNotFound)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, consts.ErrWorkflowNotReady)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, serviceerror.NewUnimplemented("not implemented"))
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand All @@ -638,12 +646,12 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() {
s.Equal(errVerificationFailed, err)

s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration))
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{})
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, consts.ErrWorkflowNotReady)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.discardDuration))
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{})
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, consts.ErrWorkflowNotReady)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskDiscarded, err)
}
Expand Down Expand Up @@ -1054,11 +1062,11 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, consts.ErrWorkflowCompleted)
s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, consts.ErrWorkflowNotReady)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
s.Equal(consts.ErrTaskRetry, err)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, consts.ErrWorkflowNotReady)
s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, consts.ErrWorkflowExecutionNotFound)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

Expand Down
7 changes: 1 addition & 6 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,19 +634,14 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule
),
)
if err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
// workflow not found error, verification logic need to keep waiting in this case
// as it's possible that replication has not replicate this workflow yet.
return consts.ErrWorkflowNotReady
}
return err
}
defer func() { workflowContext.GetReleaseFn()(retError) }()

mutableState := workflowContext.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() &&
mutableState.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
return consts.ErrWorkflowCompleted
return nil
}

if !mutableState.HasProcessedOrPendingWorkflowTask() {
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflowTaskHandlerCallbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) TestVerifyFirstWorkflowTaskScheduled_
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.NotFound{})

err := s.workflowTaskHandlerCallback.verifyFirstWorkflowTaskScheduled(context.Background(), request)
s.IsType(&serviceerror.WorkflowNotReady{}, err)
s.IsType(&serviceerror.NotFound{}, err)
}

func (s *WorkflowTaskHandlerCallbackSuite) TestVerifyFirstWorkflowTaskScheduled_WorkflowCompleted() {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) TestVerifyFirstWorkflowTaskScheduled_
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)

err = s.workflowTaskHandlerCallback.verifyFirstWorkflowTaskScheduled(context.Background(), request)
s.IsType(&serviceerror.NotFound{}, err)
s.NoError(err)
}

func (s *WorkflowTaskHandlerCallbackSuite) TestVerifyFirstWorkflowTaskScheduled_WorkflowZombie() {
Expand Down

0 comments on commit d0ebe68

Please sign in to comment.