Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a mechanism to disable automatic retries for workflows and remote bazel #8172

Merged
merged 6 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions enterprise/server/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func (s *APIServer) ExecuteWorkflow(ctx context.Context, req *apipb.ExecuteWorkf
Visibility: req.GetVisibility(),
Async: req.GetAsync(),
Env: req.GetEnv(),
DisableRetry: req.GetDisableRetry(),
}
rsp, err := wfs.ExecuteWorkflow(ctx, r)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions enterprise/server/hostedrunner/hostedrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hostedrunner
import (
"context"
"encoding/base64"
"fmt"
"io"
"sort"
"strings"
Expand Down Expand Up @@ -198,6 +199,8 @@ func (r *runnerService) createAction(ctx context.Context, req *rnpb.RunRequest,
image = req.GetContainerImage()
}

retry := !req.GetDisableRetry()

// Hosted Bazel shares the same pool with workflows.
cmd := &repb.Command{
EnvironmentVariables: []*repb.Command_EnvironmentVariable{
Expand All @@ -219,6 +222,7 @@ func (r *runnerService) createAction(ctx context.Context, req *rnpb.RunRequest,
{Name: platform.EstimatedComputeUnitsPropertyName, Value: "3"},
{Name: platform.EstimatedFreeDiskPropertyName, Value: "20000000000"}, // 20GB
{Name: platform.DockerUserPropertyName, Value: user},
{Name: platform.RetryPropertyName, Value: fmt.Sprintf("%v", retry)},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/executor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//server/util/log",
"//server/util/metricsutil",
"//server/util/proto",
"//server/util/rexec",
"//server/util/status",
"//server/util/tracing",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
16 changes: 6 additions & 10 deletions enterprise/server/remote_execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/metricsutil"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/rexec"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/buildbuddy-io/buildbuddy/server/util/tracing"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -144,23 +145,18 @@ func parseTimeouts(task *repb.ExecutionTask) (*executionTimeouts, error) {
return timeouts, nil
}

// isTaskMisconfigured returns whether a task failed to execute because of a
// configuration error that will prevent the action from executing properly,
// even if retried.
func isTaskMisconfigured(err error) bool {
return status.IsInvalidArgumentError(err) ||
status.IsFailedPreconditionError(err) ||
status.IsUnauthenticatedError(err)
}

func isClientBazel(task *repb.ExecutionTask) bool {
// TODO(bduffany): Find a more reliable way to determine this.
return !platform.IsCICommand(task.GetCommand(), platform.GetProto(task.GetAction(), task.GetCommand()))
}

func shouldRetry(task *repb.ExecutionTask, taskError error) bool {
if !platform.Retryable(task) {
return false
}

// If the task is invalid / misconfigured, more attempts won't help.
if isTaskMisconfigured(taskError) {
if !rexec.Retryable(taskError) {
return false
}
// If the task timed out, respect the timeout and don't keep retrying.
Expand Down
15 changes: 15 additions & 0 deletions enterprise/server/remote_execution/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
DefaultTimeoutPropertyName = "default-timeout"
TerminationGracePeriodPropertyName = "termination-grace-period"
SnapshotKeyOverridePropertyName = "snapshot-key-override"
RetryPropertyName = "retry"

OperatingSystemPropertyName = "OSFamily"
LinuxOperatingSystemName = "linux"
Expand Down Expand Up @@ -250,6 +251,14 @@ type Properties struct {
// from.
// Only applies to recyclable firecracker actions.
OverrideSnapshotKey *fcpb.SnapshotKey

// Retry determines whether the scheduler should automatically retry
// transient errors.
// Should be set to false for non-idempotent commands, and clients should
// handle more fine-grained retry behavior.
// This property is ignored for bazel executions, because the bazel client
// handles retries itself.
Retry bool
}

// ContainerType indicates the type of containerization required by an executor.
Expand Down Expand Up @@ -388,6 +397,7 @@ func ParseProperties(task *repb.ExecutionTask) (*Properties, error) {
ExtraArgs: stringListProp(m, extraArgsPropertyName),
EnvOverrides: envOverrides,
OverrideSnapshotKey: overrideSnapshotKey,
Retry: boolProp(m, RetryPropertyName, true),
}, nil
}

Expand Down Expand Up @@ -794,3 +804,8 @@ func IsCICommand(cmd *repb.Command, platform *repb.Platform) bool {
}
return false
}

func Retryable(task *repb.ExecutionTask) bool {
v := FindEffectiveValue(task, RetryPropertyName)
return v == "true" || v == ""
}
1 change: 1 addition & 0 deletions enterprise/server/scheduling/scheduler_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
embed = [":scheduler_server"],
deps = [
"//enterprise/server/remote_execution/execution_server",
"//enterprise/server/remote_execution/platform",
"//enterprise/server/testutil/enterprise_testenv",
"//enterprise/server/testutil/testredis",
"//proto:remote_execution_go_proto",
Expand Down
42 changes: 30 additions & 12 deletions enterprise/server/scheduling/scheduler_server/scheduler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ type enqueueTaskReservationOpts struct {
scheduleOnConnectedExecutors bool
}

func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRequest *scpb.EnqueueTaskReservationRequest, serializedTask []byte, opts enqueueTaskReservationOpts) error {
func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRequest *scpb.EnqueueTaskReservationRequest, task *repb.ExecutionTask, opts enqueueTaskReservationOpts) error {
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, enqueueRequest.GetTaskId())

os := enqueueRequest.GetSchedulingMetadata().GetOs()
Expand Down Expand Up @@ -1781,10 +1781,6 @@ func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRe
time.Since(startTime), strings.Join(successfulReservations, ", "))
}()

task := &repb.ExecutionTask{}
if err := proto.Unmarshal(serializedTask, task); err != nil {
return status.InternalErrorf("failed to unmarshal ExecutionTask: %s", err)
}
cmd := task.GetCommand()
remoteInstanceName := task.GetExecuteRequest().GetInstanceName()

Expand Down Expand Up @@ -1970,7 +1966,11 @@ func (s *SchedulerServer) ScheduleTask(ctx context.Context, req *scpb.ScheduleTa
numReplicas: probesPerTask,
scheduleOnConnectedExecutors: false,
}
if err := s.enqueueTaskReservations(ctx, enqueueRequest, req.GetSerializedTask(), opts); err != nil {
task := &repb.ExecutionTask{}
if err := proto.Unmarshal(req.GetSerializedTask(), task); err != nil {
return nil, status.InternalErrorf("failed to unmarshal ExecutionTask: %s", err)
}
if err := s.enqueueTaskReservations(ctx, enqueueRequest, task, opts); err != nil {
return nil, err
}
return &scpb.ScheduleTaskResponse{}, nil
Expand Down Expand Up @@ -2004,15 +2004,16 @@ func (s *SchedulerServer) reEnqueueTask(ctx context.Context, taskID, leaseID, re
if taskID == "" {
return status.FailedPreconditionError("A task_id is required")
}
task, err := s.readTask(ctx, taskID)
scheduledTask, err := s.readTask(ctx, taskID)
if err != nil {
return err
}
if task.attemptCount >= maxTaskAttemptCount {

if scheduledTask.attemptCount >= maxTaskAttemptCount {
if _, err := s.deleteTask(ctx, taskID); err != nil {
return err
}
msg := fmt.Sprintf("Task %q already attempted %d times.", taskID, task.attemptCount)
msg := fmt.Sprintf("Task %q already attempted %d times.", taskID, scheduledTask.attemptCount)
if reason != "" {
msg += " Last failure: " + reason
}
Expand All @@ -2021,6 +2022,23 @@ func (s *SchedulerServer) reEnqueueTask(ctx context.Context, taskID, leaseID, re
}
return status.ResourceExhaustedError(msg)
}

task := &repb.ExecutionTask{}
if err := proto.Unmarshal(scheduledTask.serializedTask, task); err != nil {
return status.InternalErrorf("failed to unmarshal ExecutionTask: %s", err)
}
if !platform.Retryable(task) {
if _, err := s.deleteTask(ctx, taskID); err != nil {
return err
}
msg := fmt.Sprintf("Task %q does not have retries enabled. Not re-enqueuing. Last failure: %s", taskID, reason)
log.Infof(msg)
if err := s.env.GetRemoteExecutionService().MarkExecutionFailed(ctx, taskID, status.InternalError(msg)); err != nil {
log.CtxWarningf(ctx, "Could not mark execution failed for task %q: %s", taskID, err)
}
return nil
}

if err := s.unclaimTask(ctx, taskID, leaseID, reconnectToken); err != nil {
// A "permission denied" error means the task is already claimed
// by a different leaseholder so we shouldn't touch it.
Expand All @@ -2037,15 +2055,15 @@ func (s *SchedulerServer) reEnqueueTask(ctx context.Context, taskID, leaseID, re
}
enqueueRequest := &scpb.EnqueueTaskReservationRequest{
TaskId: taskID,
TaskSize: task.metadata.GetTaskSize(),
SchedulingMetadata: task.metadata,
TaskSize: scheduledTask.metadata.GetTaskSize(),
SchedulingMetadata: scheduledTask.metadata,
Delay: durationpb.New(delay),
}
opts := enqueueTaskReservationOpts{
numReplicas: numReplicas,
scheduleOnConnectedExecutors: false,
}
if err := s.enqueueTaskReservations(ctx, enqueueRequest, task.serializedTask, opts); err != nil {
if err := s.enqueueTaskReservations(ctx, enqueueRequest, task, opts); err != nil {
// Unavailable indicates that it's impossible to schedule the task (no executors in pool).
if status.IsUnavailableError(err) {
if markErr := s.env.GetRemoteExecutionService().MarkExecutionFailed(ctx, taskID, err); markErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/execution_server"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/platform"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/enterprise_testenv"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/testredis"
"github.com/buildbuddy-io/buildbuddy/server/environment"
Expand Down Expand Up @@ -548,6 +549,28 @@ func TestExecutorReEnqueue_NonMatchingLeaseID(t *testing.T) {
require.True(t, status.IsPermissionDeniedError(err))
}

func TestExecutorReEnqueue_RetriesDisabled(t *testing.T) {
env, ctx := getEnv(t, &schedulerOpts{}, "user1")

fe := newFakeExecutor(ctx, t, env.GetSchedulerClient())
fe.Register()

taskID := scheduleTask(ctx, t, env, map[string]string{platform.RetryPropertyName: "false"})
fe.WaitForTask(taskID)
lease := fe.Claim(taskID)
fe.ResetTasks()

_, err := env.GetSchedulerClient().ReEnqueueTask(ctx, &scpb.ReEnqueueTaskRequest{
TaskId: taskID,
Reason: "for fun",
LeaseId: lease.leaseID,
})
require.NoError(t, err)

// Ensure the task was never re-enqueued
fe.EnsureTaskNotReceived(taskID)
}

func TestLeaseExpiration(t *testing.T) {
flags.Set(t, "remote_execution.lease_duration", 10*time.Second)
flags.Set(t, "remote_execution.lease_grace_period", 10*time.Second)
Expand Down
19 changes: 12 additions & 7 deletions enterprise/server/workflow/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ func (ws *workflowService) ExecuteWorkflow(ctx context.Context, req *wfpb.Execut
// The workflow execution is trusted since we're authenticated as a member of
// the BuildBuddy org that owns the workflow.
isTrusted := true
executionID, err := ws.executeWorkflowAction(executionCtx, apiKey, wf, wd, isTrusted, action, invocationID, extraCIRunnerArgs, req.GetEnv())
shouldRetry := !req.GetDisableRetry()
executionID, err := ws.executeWorkflowAction(executionCtx, apiKey, wf, wd, isTrusted, action, invocationID, extraCIRunnerArgs, req.GetEnv(), shouldRetry)
if err != nil {
statusErr = status.WrapErrorf(err, "failed to execute workflow action %q", action.Name)
log.CtxWarning(executionCtx, statusErr.Error())
Expand Down Expand Up @@ -1064,7 +1065,7 @@ func (ws *workflowService) createBBURL(ctx context.Context, path string) (string
// Creates an action that executes the CI runner for the given workflow and params.
// Returns the digest of the action as well as the invocation ID that the CI runner
// will assign to the workflow invocation.
func (ws *workflowService) createActionForWorkflow(ctx context.Context, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, ak *tables.APIKey, instanceName string, workflowAction *config.Action, invocationID string, extraArgs []string, env map[string]string) (*repb.Digest, error) {
func (ws *workflowService) createActionForWorkflow(ctx context.Context, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, ak *tables.APIKey, instanceName string, workflowAction *config.Action, invocationID string, extraArgs []string, env map[string]string, retry bool) (*repb.Digest, error) {
cache := ws.env.GetCache()
if cache == nil {
return nil, status.UnavailableError("No cache configured.")
Expand Down Expand Up @@ -1217,6 +1218,7 @@ func (ws *workflowService) createActionForWorkflow(ctx context.Context, wf *tabl
{Name: platform.EstimatedFreeDiskPropertyName, Value: estimatedDisk},
{Name: platform.EstimatedMemoryPropertyName, Value: workflowAction.ResourceRequests.GetEstimatedMemory()},
{Name: platform.EstimatedCPUPropertyName, Value: workflowAction.ResourceRequests.GetEstimatedCPU()},
{Name: platform.RetryPropertyName, Value: fmt.Sprintf("%v", retry)},
},
},
}
Expand Down Expand Up @@ -1470,7 +1472,10 @@ func (ws *workflowService) startWorkflow(ctx context.Context, gitProvider interf
wg.Add(1)
go func() {
defer wg.Done()
if _, err := ws.executeWorkflowAction(ctx, apiKey, wf, wd, isTrusted, action, invocationID, nil /*=extraCIRunnerArgs*/, env); err != nil {
// Webhook triggered workflows should always be retried, because they
// don't have a client to retry for them
shouldRetry := true
if _, err := ws.executeWorkflowAction(ctx, apiKey, wf, wd, isTrusted, action, invocationID, nil /*=extraCIRunnerArgs*/, env, shouldRetry); err != nil {
log.CtxErrorf(ctx, "Failed to execute workflow %s (%s) action %q: %s", wf.WorkflowID, wf.RepoURL, action.Name, err)
}
}()
Expand All @@ -1480,13 +1485,13 @@ func (ws *workflowService) startWorkflow(ctx context.Context, gitProvider interf
}

// Starts a CI runner execution to execute a single workflow action, and returns the execution ID.
func (ws *workflowService) executeWorkflowAction(ctx context.Context, key *tables.APIKey, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, action *config.Action, invocationID string, extraCIRunnerArgs []string, env map[string]string) (string, error) {
func (ws *workflowService) executeWorkflowAction(ctx context.Context, key *tables.APIKey, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, action *config.Action, invocationID string, extraCIRunnerArgs []string, env map[string]string, shouldRetry bool) (string, error) {
opts := retry.DefaultOptions()
opts.MaxRetries = executeWorkflowMaxRetries
r := retry.New(ctx, opts)
var lastErr error
for r.Next() {
executionID, err := ws.attemptExecuteWorkflowAction(ctx, key, wf, wd, isTrusted, action, invocationID, nil /*=extraCIRunnerArgs*/, env)
executionID, err := ws.attemptExecuteWorkflowAction(ctx, key, wf, wd, isTrusted, action, invocationID, nil /*=extraCIRunnerArgs*/, env, shouldRetry)
if err == ApprovalRequired {
log.CtxInfof(ctx, "Skipping workflow action %s (%s) %q (requires approval)", wf.WorkflowID, wf.RepoURL, action.Name)
if err := ws.createApprovalRequiredStatus(ctx, wf, wd, action.Name); err != nil {
Expand All @@ -1507,14 +1512,14 @@ func (ws *workflowService) executeWorkflowAction(ctx context.Context, key *table
return "", lastErr
}

func (ws *workflowService) attemptExecuteWorkflowAction(ctx context.Context, key *tables.APIKey, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, workflowAction *config.Action, invocationID string, extraCIRunnerArgs []string, env map[string]string) (string, error) {
func (ws *workflowService) attemptExecuteWorkflowAction(ctx context.Context, key *tables.APIKey, wf *tables.Workflow, wd *interfaces.WebhookData, isTrusted bool, workflowAction *config.Action, invocationID string, extraCIRunnerArgs []string, env map[string]string, retry bool) (string, error) {
ctx = ws.env.GetAuthenticator().AuthContextFromAPIKey(ctx, key.Value)
ctx, err := prefix.AttachUserPrefixToContext(ctx, ws.env)
if err != nil {
return "", err
}
in := instanceName(wf, wd, workflowAction.Name, workflowAction.GitCleanExclude)
ad, err := ws.createActionForWorkflow(ctx, wf, wd, isTrusted, key, in, workflowAction, invocationID, extraCIRunnerArgs, env)
ad, err := ws.createActionForWorkflow(ctx, wf, wd, isTrusted, key, in, workflowAction, invocationID, extraCIRunnerArgs, env, retry)
if err != nil {
return "", err
}
Expand Down
4 changes: 4 additions & 0 deletions proto/api/v1/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ message ExecuteWorkflowRequest {
// buildbuddy.yaml will still apply.
map<string, string> env = 7;

// By default, the scheduler will automatically retry transient errors.
// For non-idempotent workloads, set to true to disable this behavior.
bool disable_retry = 10;

// DEPRECATED: Use `branch` and/or `commit_sha` instead.
string ref = 2 [deprecated = true];
}
Expand Down
4 changes: 4 additions & 0 deletions proto/runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ message RunRequest {
// the command you're running.
string name = 18;

// By default, the scheduler will automatically retry transient errors.
// For non-idempotent workloads, set to true to disable this behavior.
bool disable_retry = 19;

// DEPRECATED: Use `steps` instead.
string bazel_command = 4 [deprecated = true];
}
Expand Down
4 changes: 4 additions & 0 deletions proto/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ message ExecuteWorkflowRequest {
// overrides will take precedence. Otherwise all env vars set in
// buildbuddy.yaml will still apply.
map<string, string> env = 15;

// By default, the scheduler will automatically retry transient errors.
// For non-idempotent workloads, set to true to disable this behavior.
bool disable_retry = 16;
}

message ExecuteWorkflowResponse {
Expand Down
9 changes: 9 additions & 0 deletions server/util/rexec/rexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,12 @@ func AuxiliaryMetadata(md *repb.ExecutedActionMetadata, pb proto.Message) (ok bo
}
return false, nil
}

// Retryable returns false if the error is a configuration error
// that will persist, even despite retries.
func Retryable(err error) bool {
taskMisconfigured := status.IsInvalidArgumentError(err) ||
status.IsFailedPreconditionError(err) ||
status.IsUnauthenticatedError(err)
return !taskMisconfigured
}
Loading