Skip to content

Commit

Permalink
Implement batch reset to build id (#5208)
Browse files Browse the repository at this point in the history
## What changed?
Implement batch reset to build-id-based reset point

## Why?
Help recover from bad deployments

## How did you test it?
new functional test
  • Loading branch information
dnr authored Dec 8, 2023
1 parent 7f59d82 commit c5f59e4
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 106 deletions.
14 changes: 12 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3455,8 +3455,18 @@ func (wh *WorkflowHandler) StartBatchOperation(
case *workflowservice.StartBatchOperationRequest_ResetOperation:
identity = op.ResetOperation.GetIdentity()
operationType = batcher.BatchTypeReset
resetParams.ResetType = op.ResetOperation.GetResetType()
resetParams.ResetReapplyType = op.ResetOperation.GetResetReapplyType()
if op.ResetOperation.Options != nil {
encoded, err := op.ResetOperation.Options.Marshal()
if err != nil {
return nil, err
}
resetParams.ResetOptions = encoded
} else {
// TODO: remove support for old fields later
resetParams.ResetType = op.ResetOperation.GetResetType()
resetParams.ResetReapplyType = op.ResetOperation.GetResetReapplyType()
}

default:
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("The operation type %T is not supported", op))
}
Expand Down
101 changes: 92 additions & 9 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"fmt"
"math"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
Expand All @@ -39,6 +40,7 @@ import (
sdkclient "go.temporal.io/sdk/client"
"golang.org/x/time/rate"

"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -85,6 +87,15 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
return hbd, err
}

// Deserialize batch reset options if set
if b := batchParams.ResetParams.ResetOptions; b != nil {
batchParams.ResetParams.resetOptions = &commonpb.ResetOptions{}
if err := batchParams.ResetParams.resetOptions.Unmarshal(b); err != nil {
logger.Error("Failed to deserialize batch reset options", tag.Error(err))
return hbd, err
}
}

sdkClient := a.ClientFactory.NewClient(sdkclient.Options{
Namespace: batchParams.Namespace,
DataConverter: sdk.PreferProtoDataConverter,
Expand Down Expand Up @@ -282,17 +293,29 @@ func startTaskProcessor(
WorkflowId: workflowID,
RunId: runID,
}
eventId, err := getResetEventIDByType(ctx, batchParams.ResetParams.ResetType, batchParams.Namespace, workflowExecution, frontendClient, logger)
var eventId int64
var err error
var resetReapplyType enumspb.ResetReapplyType
if batchParams.ResetParams.resetOptions != nil {
// Using ResetOptions
// Note: getResetEventIDByOptions may modify workflowExecution.RunId, if reset should be to a prior run
eventId, err = getResetEventIDByOptions(ctx, batchParams.ResetParams.resetOptions, batchParams.Namespace, workflowExecution, frontendClient, logger)
resetReapplyType = batchParams.ResetParams.resetOptions.ResetReapplyType
} else {
// Old fields
eventId, err = getResetEventIDByType(ctx, batchParams.ResetParams.ResetType, batchParams.Namespace, workflowExecution, frontendClient, logger)
resetReapplyType = batchParams.ResetParams.ResetReapplyType
}
if err != nil {
return err
}
_, err = frontendClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: batchParams.Namespace,
WorkflowExecution: workflowExecution,
Reason: batchParams.Reason,
WorkflowTaskFinishEventId: eventId,
RequestId: uuid.New(),
ResetReapplyType: batchParams.ResetParams.ResetReapplyType,
WorkflowTaskFinishEventId: eventId,
ResetReapplyType: resetReapplyType,
})
return err
})
Expand Down Expand Up @@ -333,7 +356,7 @@ func processTask(
err = procFn(task.execution.GetWorkflowId(), task.execution.GetRunId())
if err != nil {
// NotFound means wf is not running or deleted
if _, isNotFound := err.(*serviceerror.NotFound); !isNotFound {
if !common.IsNotFoundError(err) {
return err
}
}
Expand All @@ -350,7 +373,8 @@ func isDone(ctx context.Context) bool {
}
}

func getResetEventIDByType(ctx context.Context,
func getResetEventIDByType(
ctx context.Context,
resetType enumspb.ResetType,
namespaceStr string,
workflowExecution *commonpb.WorkflowExecution,
Expand All @@ -368,7 +392,32 @@ func getResetEventIDByType(ctx context.Context,
}
}

func getLastWorkflowTaskEventID(ctx context.Context,
// Note: may modify workflowExecution.RunId
func getResetEventIDByOptions(
ctx context.Context,
resetOptions *commonpb.ResetOptions,
namespaceStr string,
workflowExecution *commonpb.WorkflowExecution,
frontendClient workflowservice.WorkflowServiceClient,
logger log.Logger,
) (int64, error) {
switch target := resetOptions.Target.(type) {
case *commonpb.ResetOptions_FirstWorkflowTask:
return getFirstWorkflowTaskEventID(ctx, namespaceStr, workflowExecution, frontendClient, logger)
case *commonpb.ResetOptions_LastWorkflowTask:
return getLastWorkflowTaskEventID(ctx, namespaceStr, workflowExecution, frontendClient, logger)
case *commonpb.ResetOptions_WorkflowTaskId:
return target.WorkflowTaskId, nil
case *commonpb.ResetOptions_BuildId:
return getResetPoint(ctx, namespaceStr, workflowExecution, frontendClient, logger, target.BuildId, resetOptions.CurrentRunOnly)
default:
errorMsg := fmt.Sprintf("provided reset target (%+v) is not supported.", resetOptions.Target)
return 0, serviceerror.NewInvalidArgument(errorMsg)
}
}

func getLastWorkflowTaskEventID(
ctx context.Context,
namespaceStr string,
workflowExecution *commonpb.WorkflowExecution,
frontendClient workflowservice.WorkflowServiceClient,
Expand All @@ -383,7 +432,7 @@ func getLastWorkflowTaskEventID(ctx context.Context,
for {
resp, err := frontendClient.GetWorkflowExecutionHistoryReverse(ctx, req)
if err != nil {
logger.Error("failed to run GetWorkflowExecutionHistoryReverse")
logger.Error("failed to run GetWorkflowExecutionHistoryReverse", tag.Error(err))
return 0, errors.New("failed to get workflow execution history")
}
for _, e := range resp.GetHistory().GetEvents() {
Expand All @@ -407,7 +456,8 @@ func getLastWorkflowTaskEventID(ctx context.Context,
return
}

func getFirstWorkflowTaskEventID(ctx context.Context,
func getFirstWorkflowTaskEventID(
ctx context.Context,
namespaceStr string,
workflowExecution *commonpb.WorkflowExecution,
frontendClient workflowservice.WorkflowServiceClient,
Expand All @@ -422,7 +472,7 @@ func getFirstWorkflowTaskEventID(ctx context.Context,
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
logger.Error("failed to run GetWorkflowExecutionHistory")
logger.Error("failed to run GetWorkflowExecutionHistory", tag.Error(err))
return 0, errors.New("GetWorkflowExecutionHistory failed")
}
for _, e := range resp.GetHistory().GetEvents() {
Expand All @@ -446,3 +496,36 @@ func getFirstWorkflowTaskEventID(ctx context.Context,
}
return
}

func getResetPoint(
ctx context.Context,
namespaceStr string,
execution *commonpb.WorkflowExecution,
frontendClient workflowservice.WorkflowServiceClient,
logger log.Logger,
buildId string,
currentRunOnly bool,
) (workflowTaskEventID int64, err error) {
res, err := frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: namespaceStr,
Execution: execution,
})
if err != nil {
return 0, err
}
resetPoints := res.GetWorkflowExecutionInfo().GetAutoResetPoints().GetPoints()
for _, point := range resetPoints {
if point.BuildId == buildId {
if !point.Resettable {
return 0, fmt.Errorf("Reset point for %v is not resettable", buildId)
} else if point.ExpireTime != nil && point.ExpireTime.AsTime().Before(time.Now()) {
return 0, fmt.Errorf("Reset point for %v is expired", buildId)
} else if execution.RunId != point.RunId && currentRunOnly {
return 0, fmt.Errorf("Reset point for %v points to previous run and CurrentRunOnly is set", buildId)
}
execution.RunId = point.RunId
return point.FirstWorkflowTaskCompletedId, nil
}
}
return 0, fmt.Errorf("Can't find reset point for %v", buildId)
}
Loading

0 comments on commit c5f59e4

Please sign in to comment.