From 44ee560d516eea043b8b6ab29b547ec7aa01bf5f Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Wed, 7 Sep 2022 13:25:35 -0700 Subject: [PATCH] Move commonly used get & update function to api package (#3311) * Move commonly used get & update function to api package * Update history engine & workflow task handler accordingly --- Makefile | 2 +- service/history/api/update_workflow_util.go | 26 +++ service/history/historyEngine.go | 180 ++++++++---------- .../history/workflowTaskHandlerCallbacks.go | 125 ++++++++---- 4 files changed, 190 insertions(+), 143 deletions(-) diff --git a/Makefile b/Makefile index b3734390361..d6e716b1bc4 100644 --- a/Makefile +++ b/Makefile @@ -117,7 +117,7 @@ update-mockgen: update-proto-plugins: @printf $(COLOR) "Install/update proto plugins..." - @go install github.com/temporalio/gogo-protobuf/protoc-gen-gogoslick@latest + @go install github.com/temporalio/gogo-protobuf/protoc-gen-gogoslick@master # This to download sources of gogo-protobuf which are required to build proto files. @GO111MODULE=off go get github.com/temporalio/gogo-protobuf/protoc-gen-gogoslick @go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest diff --git a/service/history/api/update_workflow_util.go b/service/history/api/update_workflow_util.go index 71a8b34473c..e61897490ca 100644 --- a/service/history/api/update_workflow_util.go +++ b/service/history/api/update_workflow_util.go @@ -27,10 +27,36 @@ package api import ( "context" + clockspb "go.temporal.io/server/api/clock/v1" + "go.temporal.io/server/common/definition" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" ) +func GetAndUpdateWorkflowWithNew( + ctx context.Context, + reqClock *clockspb.VectorClock, + consistencyCheckFn MutableStateConsistencyPredicate, + workflowKey definition.WorkflowKey, + action UpdateWorkflowActionFunc, + newWorkflowFn func() (workflow.Context, workflow.MutableState, error), + shard shard.Context, + workflowConsistencyChecker WorkflowConsistencyChecker, +) (retError error) { + workflowContext, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + reqClock, + consistencyCheckFn, + workflowKey, + ) + if err != nil { + return err + } + defer func() { workflowContext.GetReleaseFn()(retError) }() + + return UpdateWorkflowWithNew(shard, ctx, workflowContext, action, newWorkflowFn) +} + func UpdateWorkflowWithNew( shard shard.Context, ctx context.Context, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 9ec3aef97c7..2b9cc0e7bd0 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -42,7 +42,6 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" - clockspb "go.temporal.io/server/api/clock/v1" enumsspb "go.temporal.io/server/api/enums/v1" historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" @@ -54,7 +53,6 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" - "go.temporal.io/server/common/failure" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -510,7 +508,7 @@ func (e *historyEngineImpl) StartWorkflowExecution( if prevExecutionUpdateAction != nil { // update prev execution and create new execution in one transaction - err := e.updateWorkflowWithNew( + err := api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -534,6 +532,8 @@ func (e *historyEngineImpl) StartWorkflowExecution( } return workflowContext.GetContext(), workflowContext.GetMutableState(), nil }, + e.shard, + e.workflowConsistencyChecker, ) switch err { case nil: @@ -1058,7 +1058,7 @@ func (e *historyEngineImpl) ResetStickyTaskQueue( return nil, err } - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -1079,6 +1079,9 @@ func (e *historyEngineImpl) ResetStickyTaskQueue( CreateWorkflowTask: false, }, nil }, + nil, + e.shard, + e.workflowConsistencyChecker, ) if err != nil { @@ -1250,7 +1253,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( namespace := namespaceEntry.Name() response := &historyservice.RecordActivityTaskStartedResponse{} - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, request.Clock, api.BypassMutableStateConsistencyPredicate, @@ -1338,7 +1341,11 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( Noop: false, CreateWorkflowTask: false, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) if err != nil { return nil, err @@ -1410,7 +1417,7 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted( var activityStartedTime time.Time var taskQueue string var workflowTypeName string - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, api.BypassMutableStateConsistencyPredicate, @@ -1456,7 +1463,11 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted( Noop: false, CreateWorkflowTask: true, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskCompletedScope). @@ -1495,7 +1506,7 @@ func (e *historyEngineImpl) RespondActivityTaskFailed( var activityStartedTime time.Time var taskQueue string var workflowTypeName string - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, api.BypassMutableStateConsistencyPredicate, @@ -1562,7 +1573,11 @@ func (e *historyEngineImpl) RespondActivityTaskFailed( activityStartedTime = *ai.StartedTime taskQueue = ai.TaskQueue return postActions, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskFailedScope). Tagged( @@ -1600,7 +1615,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled( var activityStartedTime time.Time var taskQueue string var workflowTypeName string - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, api.BypassMutableStateConsistencyPredicate, @@ -1658,7 +1673,11 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled( Noop: false, CreateWorkflowTask: true, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskCanceledScope). @@ -1697,7 +1716,7 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( } var cancelRequested bool - err = e.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, api.BypassMutableStateConsistencyPredicate, @@ -1745,7 +1764,11 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( Noop: false, CreateWorkflowTask: false, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) if err != nil { return &historyservice.RecordActivityTaskHeartbeatResponse{}, err @@ -1776,7 +1799,7 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( runID = "" } - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -1828,7 +1851,11 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( } return api.UpdateWorkflowWithNewWorkflowTask, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } func (e *historyEngineImpl) SignalWorkflowExecution( @@ -1846,7 +1873,7 @@ func (e *historyEngineImpl) SignalWorkflowExecution( parentExecution := signalRequest.ExternalWorkflowExecution childWorkflowOnly := signalRequest.GetChildWorkflowOnly() - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -1909,7 +1936,11 @@ func (e *historyEngineImpl) SignalWorkflowExecution( Noop: false, CreateWorkflowTask: createWorkflowTask, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } // SignalWithStartWorkflowExecution signals current workflow (if running) or creates & signals a new workflow @@ -1939,7 +1970,7 @@ func (e *historyEngineImpl) RemoveSignalMutableState( return err } - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -1959,7 +1990,11 @@ func (e *historyEngineImpl) RemoveSignalMutableState( Noop: false, CreateWorkflowTask: false, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } func (e *historyEngineImpl) TerminateWorkflowExecution( @@ -1981,7 +2016,7 @@ func (e *historyEngineImpl) TerminateWorkflowExecution( if len(request.FirstExecutionRunId) != 0 { runID = "" } - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -2022,7 +2057,11 @@ func (e *historyEngineImpl) TerminateWorkflowExecution( request.GetIdentity(), false, ) - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } func (e *historyEngineImpl) DeleteWorkflowExecution( @@ -2083,7 +2122,8 @@ func (e *historyEngineImpl) DeleteWorkflowExecution( true, ) }, - nil) + nil, + ) } } @@ -2118,7 +2158,7 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( parentInitiatedID := completionRequest.ParentInitiatedId parentInitiatedVersion := completionRequest.ParentInitiatedVersion - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, completionRequest.Clock, func(mutableState workflow.MutableState) bool { @@ -2196,7 +2236,11 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( Noop: false, CreateWorkflowTask: true, }, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } func (e *historyEngineImpl) VerifyChildExecutionCompletionRecorded( @@ -2426,84 +2470,6 @@ func (e *historyEngineImpl) ResetWorkflowExecution( }, nil } -func (e *historyEngineImpl) updateWorkflow( - ctx context.Context, - reqClock *clockspb.VectorClock, - consistencyCheckFn api.MutableStateConsistencyPredicate, - workflowKey definition.WorkflowKey, - action api.UpdateWorkflowActionFunc, -) (retError error) { - workflowContext, err := e.workflowConsistencyChecker.GetWorkflowContext( - ctx, - reqClock, - consistencyCheckFn, - workflowKey, - ) - if err != nil { - return err - } - defer func() { workflowContext.GetReleaseFn()(retError) }() - - return api.UpdateWorkflowWithNew(e.shard, ctx, workflowContext, action, nil) -} - -func (e *historyEngineImpl) updateWorkflowWithNew( - ctx context.Context, - reqClock *clockspb.VectorClock, - consistencyCheckFn api.MutableStateConsistencyPredicate, - workflowKey definition.WorkflowKey, - action api.UpdateWorkflowActionFunc, - newWorkflowFn func() (workflow.Context, workflow.MutableState, error), -) (retError error) { - workflowContext, err := e.workflowConsistencyChecker.GetWorkflowContext( - ctx, - reqClock, - consistencyCheckFn, - workflowKey, - ) - if err != nil { - return err - } - defer func() { workflowContext.GetReleaseFn()(retError) }() - - return api.UpdateWorkflowWithNew(e.shard, ctx, workflowContext, action, newWorkflowFn) -} - -func (e *historyEngineImpl) failWorkflowTask( - ctx context.Context, - wfContext workflow.Context, - scheduledEventID int64, - startedEventID int64, - wtFailedCause *workflowTaskFailedCause, - request *workflowservice.RespondWorkflowTaskCompletedRequest, -) (workflow.MutableState, error) { - - // clear any updates we have accumulated so far - wfContext.Clear() - - // Reload workflow execution so we can apply the workflow task failure event - mutableState, err := wfContext.LoadWorkflowExecution(ctx) - if err != nil { - return nil, err - } - - if _, err = mutableState.AddWorkflowTaskFailedEvent( - scheduledEventID, - startedEventID, - wtFailedCause.failedCause, - failure.NewServerFailure(wtFailedCause.Message(), true), - request.GetIdentity(), - request.GetBinaryChecksum(), - "", - "", - 0); err != nil { - return nil, err - } - - // Return new builder back to the caller for further updates - return mutableState, nil -} - func (e *historyEngineImpl) NotifyNewHistoryEvent( notification *events.Notification, ) { @@ -2621,7 +2587,7 @@ func (e *historyEngineImpl) ReapplyEvents( } namespaceID := namespaceEntry.ID() - return e.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, nil, api.BypassMutableStateConsistencyPredicate, @@ -2774,7 +2740,11 @@ func (e *historyEngineImpl) ReapplyEvents( }, nil } return postActions, nil - }) + }, + nil, + e.shard, + e.workflowConsistencyChecker, + ) } func (e *historyEngineImpl) GetDLQMessages( diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index c9cc4e977aa..c4f4a9f441e 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -33,8 +33,10 @@ import ( querypb "go.temporal.io/api/query/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/failure" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/api" @@ -71,39 +73,41 @@ type ( } workflowTaskHandlerCallbacksImpl struct { - currentClusterName string - config *configs.Config - shard shard.Context - timeSource clock.TimeSource - historyEngine *historyEngineImpl - namespaceRegistry namespace.Registry - tokenSerializer common.TaskTokenSerializer - metricsClient metrics.Client - logger log.Logger - throttledLogger log.Logger - commandAttrValidator *commandAttrValidator - searchAttributesMapper searchattribute.Mapper + currentClusterName string + config *configs.Config + shard shard.Context + workflowConsistencyChecker api.WorkflowConsistencyChecker + timeSource clock.TimeSource + namespaceRegistry namespace.Registry + tokenSerializer common.TaskTokenSerializer + metricsClient metrics.Client + logger log.Logger + throttledLogger log.Logger + commandAttrValidator *commandAttrValidator + searchAttributesMapper searchattribute.Mapper + searchAttributesValidator *searchattribute.Validator } ) func newWorkflowTaskHandlerCallback(historyEngine *historyEngineImpl) *workflowTaskHandlerCallbacksImpl { return &workflowTaskHandlerCallbacksImpl{ - currentClusterName: historyEngine.currentClusterName, - config: historyEngine.config, - shard: historyEngine.shard, - timeSource: historyEngine.shard.GetTimeSource(), - historyEngine: historyEngine, - namespaceRegistry: historyEngine.shard.GetNamespaceRegistry(), - tokenSerializer: historyEngine.tokenSerializer, - metricsClient: historyEngine.metricsClient, - logger: historyEngine.logger, - throttledLogger: historyEngine.throttledLogger, + currentClusterName: historyEngine.currentClusterName, + config: historyEngine.config, + shard: historyEngine.shard, + workflowConsistencyChecker: historyEngine.workflowConsistencyChecker, + timeSource: historyEngine.shard.GetTimeSource(), + namespaceRegistry: historyEngine.shard.GetNamespaceRegistry(), + tokenSerializer: historyEngine.tokenSerializer, + metricsClient: historyEngine.metricsClient, + logger: historyEngine.logger, + throttledLogger: historyEngine.throttledLogger, commandAttrValidator: newCommandAttrValidator( historyEngine.shard.GetNamespaceRegistry(), historyEngine.config, historyEngine.searchAttributesValidator, ), - searchAttributesMapper: historyEngine.shard.GetSearchAttributesMapper(), + searchAttributesMapper: historyEngine.shard.GetSearchAttributesMapper(), + searchAttributesValidator: historyEngine.searchAttributesValidator, } } @@ -112,12 +116,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled( req *historyservice.ScheduleWorkflowTaskRequest, ) error { - _, err := handler.historyEngine.getActiveNamespaceEntry(namespace.ID(req.GetNamespaceId())) + _, err := api.GetActiveNamespace(handler.shard, namespace.ID(req.GetNamespaceId())) if err != nil { return err } - return handler.historyEngine.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, req.Clock, api.BypassMutableStateConsistencyPredicate, @@ -149,7 +153,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled( } return &api.UpdateWorkflowAction{}, nil - }) + }, + nil, + handler.shard, + handler.workflowConsistencyChecker, + ) } func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( @@ -157,7 +165,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( req *historyservice.RecordWorkflowTaskStartedRequest, ) (*historyservice.RecordWorkflowTaskStartedResponse, error) { - namespaceEntry, err := handler.historyEngine.getActiveNamespaceEntry(namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(handler.shard, namespace.ID(req.GetNamespaceId())) if err != nil { return nil, err } @@ -166,7 +174,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( requestID := req.GetRequestId() var resp *historyservice.RecordWorkflowTaskStartedResponse - err = handler.historyEngine.updateWorkflow( + err = api.GetAndUpdateWorkflowWithNew( ctx, req.Clock, api.BypassMutableStateConsistencyPredicate, @@ -246,7 +254,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( return nil, err } return updateAction, nil - }) + }, + nil, + handler.shard, + handler.workflowConsistencyChecker, + ) if err != nil { return nil, err @@ -259,7 +271,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( req *historyservice.RespondWorkflowTaskFailedRequest, ) (retError error) { - _, err := handler.historyEngine.getActiveNamespaceEntry(namespace.ID(req.GetNamespaceId())) + _, err := api.GetActiveNamespace(handler.shard, namespace.ID(req.GetNamespaceId())) if err != nil { return err } @@ -270,7 +282,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( return consts.ErrDeserializingToken } - return handler.historyEngine.updateWorkflow( + return api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, api.BypassMutableStateConsistencyPredicate, @@ -300,7 +312,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( Noop: false, CreateWorkflowTask: true, }, nil - }) + }, + nil, + handler.shard, + handler.workflowConsistencyChecker, + ) } func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( @@ -308,7 +324,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( req *historyservice.RespondWorkflowTaskCompletedRequest, ) (resp *historyservice.RespondWorkflowTaskCompletedResponse, retError error) { - namespaceEntry, err := handler.historyEngine.getActiveNamespaceEntry(namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(handler.shard, namespace.ID(req.GetNamespaceId())) if err != nil { return nil, err } @@ -321,7 +337,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } scheduledEventID := token.GetScheduledEventId() - workflowContext, err := handler.historyEngine.workflowConsistencyChecker.GetWorkflowContext( + workflowContext, err := handler.workflowConsistencyChecker.GetWorkflowContext( ctx, token.Clock, func(mutableState workflow.MutableState) bool { @@ -429,7 +445,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( handler.config.HistoryCountLimitError(namespace.String()), completedEvent.GetEventId(), msBuilder, - handler.historyEngine.searchAttributesValidator, + handler.searchAttributesValidator, executionStats, handler.metricsClient.Scope(metrics.HistoryRespondWorkflowTaskCompletedScope, metrics.NamespaceTag(namespace.String())), handler.throttledLogger, @@ -480,7 +496,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( // drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout. return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) } - msBuilder, err = handler.historyEngine.failWorkflowTask(ctx, weContext, scheduledEventID, startedEventID, wtFailedCause, request) + msBuilder, err = failWorkflowTask(ctx, weContext, scheduledEventID, startedEventID, wtFailedCause, request) if err != nil { return nil, err } @@ -622,7 +638,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule return err } - workflowContext, err := handler.historyEngine.workflowConsistencyChecker.GetWorkflowContext( + workflowContext, err := handler.workflowConsistencyChecker.GetWorkflowContext( ctx, req.Clock, api.BypassMutableStateConsistencyPredicate, @@ -797,3 +813,38 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleBufferedQueries(msBuilder } } } + +func failWorkflowTask( + ctx context.Context, + wfContext workflow.Context, + scheduledEventID int64, + startedEventID int64, + wtFailedCause *workflowTaskFailedCause, + request *workflowservice.RespondWorkflowTaskCompletedRequest, +) (workflow.MutableState, error) { + + // clear any updates we have accumulated so far + wfContext.Clear() + + // Reload workflow execution so we can apply the workflow task failure event + mutableState, err := wfContext.LoadWorkflowExecution(ctx) + if err != nil { + return nil, err + } + + if _, err = mutableState.AddWorkflowTaskFailedEvent( + scheduledEventID, + startedEventID, + wtFailedCause.failedCause, + failure.NewServerFailure(wtFailedCause.Message(), true), + request.GetIdentity(), + request.GetBinaryChecksum(), + "", + "", + 0); err != nil { + return nil, err + } + + // Return new builder back to the caller for further updates + return mutableState, nil +}