Skip to content

Commit

Permalink
Added warning when trying to deploy bundle with --fail-if-running a…
Browse files Browse the repository at this point in the history
…nd running resources (#1163)

## Changes
Deploying bundle when there are bundle resources running at the same
time can be disruptive for jobs and pipelines in progress.

With this change during deployment phase (before uploading any
resources) if there is `--fail-if-running` specified DABs will check if
there are any resources running and if so, will fail the deployment

## Tests
Manual + add tests
  • Loading branch information
andrewnester authored Feb 7, 2024
1 parent b64e113 commit 6edab93
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 9 deletions.
6 changes: 3 additions & 3 deletions bundle/config/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ type Bundle struct {
// For example, where to find the binary, which version to use, etc.
Terraform *Terraform `json:"terraform,omitempty" bundle:"readonly"`

// Lock configures locking behavior on deployment.
Lock Lock `json:"lock" bundle:"readonly"`

// Force-override Git branch validation.
Force bool `json:"force,omitempty" bundle:"readonly"`

Expand All @@ -43,4 +40,7 @@ type Bundle struct {

// Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"`

// Deployment section specifies deployment related configuration for bundle
Deployment Deployment `json:"deployment"`
}
10 changes: 10 additions & 0 deletions bundle/config/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

type Deployment struct {
// FailOnActiveRuns specifies whether to fail the deployment if there are
// running jobs or pipelines in the workspace. Defaults to false.
FailOnActiveRuns bool `json:"fail_on_active_runs,omitempty"`

// Lock configures locking behavior on deployment.
Lock Lock `json:"lock" bundle:"readonly"`
}
143 changes: 143 additions & 0 deletions bundle/deploy/check_running_resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package deploy

import (
"context"
"fmt"
"strconv"

"github.com/databricks/cli/bundle"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/hashicorp/terraform-exec/tfexec"
tfjson "github.com/hashicorp/terraform-json"
"golang.org/x/sync/errgroup"
)

type ErrResourceIsRunning struct {
resourceType string
resourceId string
}

func (e ErrResourceIsRunning) Error() string {
return fmt.Sprintf("%s %s is running", e.resourceType, e.resourceId)
}

type checkRunningResources struct {
}

func (l *checkRunningResources) Name() string {
return "check-running-resources"
}

func (l *checkRunningResources) Apply(ctx context.Context, b *bundle.Bundle) error {
if !b.Config.Bundle.Deployment.FailOnActiveRuns {
return nil
}

tf := b.Terraform
if tf == nil {
return fmt.Errorf("terraform not initialized")
}

err := tf.Init(ctx, tfexec.Upgrade(true))
if err != nil {
return fmt.Errorf("terraform init: %w", err)
}

state, err := b.Terraform.Show(ctx)
if err != nil {
return err
}

err = checkAnyResourceRunning(ctx, b.WorkspaceClient(), state)
if err != nil {
return fmt.Errorf("deployment aborted, err: %w", err)
}

return nil
}

func CheckRunningResource() *checkRunningResources {
return &checkRunningResources{}
}

func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, state *tfjson.State) error {
if state.Values == nil || state.Values.RootModule == nil {
return nil
}

errs, errCtx := errgroup.WithContext(ctx)

for _, resource := range state.Values.RootModule.Resources {
// Limit to resources.
if resource.Mode != tfjson.ManagedResourceMode {
continue
}

value, ok := resource.AttributeValues["id"]
if !ok {
continue
}
id, ok := value.(string)
if !ok {
continue
}

switch resource.Type {
case "databricks_job":
errs.Go(func() error {
isRunning, err := IsJobRunning(errCtx, w, id)
// If there's an error retrieving the job, we assume it's not running
if err != nil {
return err
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "job", resourceId: id}
}
return nil
})
case "databricks_pipeline":
errs.Go(func() error {
isRunning, err := IsPipelineRunning(errCtx, w, id)
// If there's an error retrieving the pipeline, we assume it's not running
if err != nil {
return nil
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id}
}
return nil
})
}
}

return errs.Wait()
}

func IsJobRunning(ctx context.Context, w *databricks.WorkspaceClient, jobId string) (bool, error) {
id, err := strconv.Atoi(jobId)
if err != nil {
return false, err
}

runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: int64(id), ActiveOnly: true})
if err != nil {
return false, err
}

return len(runs) > 0, nil
}

func IsPipelineRunning(ctx context.Context, w *databricks.WorkspaceClient, pipelineId string) (bool, error) {
resp, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId})
if err != nil {
return false, err
}
switch resp.State {
case pipelines.PipelineStateIdle, pipelines.PipelineStateFailed, pipelines.PipelineStateDeleted:
return false, nil
default:
return true, nil
}
}
125 changes: 125 additions & 0 deletions bundle/deploy/check_running_resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package deploy

import (
"context"
"errors"
"testing"

"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestIsAnyResourceRunningWithEmptyState(t *testing.T) {
mock := mocks.NewMockWorkspaceClient(t)
state := &tfjson.State{}
err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, state)
require.NoError(t, err)
}

func TestIsAnyResourceRunningWithJob(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
state := &tfjson.State{
Values: &tfjson.StateValues{
RootModule: &tfjson.StateModule{
Resources: []*tfjson.StateResource{
{
Type: "databricks_job",
AttributeValues: map[string]interface{}{
"id": "123",
},
Mode: tfjson.ManagedResourceMode,
},
},
},
},
}

jobsApi := m.GetMockJobsAPI()
jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{
JobId: 123,
ActiveOnly: true,
}).Return([]jobs.BaseRun{
{RunId: 1234},
}, nil).Once()

err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state)
require.ErrorContains(t, err, "job 123 is running")

jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{
JobId: 123,
ActiveOnly: true,
}).Return([]jobs.BaseRun{}, nil).Once()

err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state)
require.NoError(t, err)
}

func TestIsAnyResourceRunningWithPipeline(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
state := &tfjson.State{
Values: &tfjson.StateValues{
RootModule: &tfjson.StateModule{
Resources: []*tfjson.StateResource{
{
Type: "databricks_pipeline",
AttributeValues: map[string]interface{}{
"id": "123",
},
Mode: tfjson.ManagedResourceMode,
},
},
},
},
}

pipelineApi := m.GetMockPipelinesAPI()
pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "123",
}).Return(&pipelines.GetPipelineResponse{
PipelineId: "123",
State: pipelines.PipelineStateRunning,
}, nil).Once()

err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state)
require.ErrorContains(t, err, "pipeline 123 is running")

pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "123",
}).Return(&pipelines.GetPipelineResponse{
PipelineId: "123",
State: pipelines.PipelineStateIdle,
}, nil).Once()
err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state)
require.NoError(t, err)
}

func TestIsAnyResourceRunningWithAPIFailure(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
state := &tfjson.State{
Values: &tfjson.StateValues{
RootModule: &tfjson.StateModule{
Resources: []*tfjson.StateResource{
{
Type: "databricks_pipeline",
AttributeValues: map[string]interface{}{
"id": "123",
},
Mode: tfjson.ManagedResourceMode,
},
},
},
},
}

pipelineApi := m.GetMockPipelinesAPI()
pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "123",
}).Return(nil, errors.New("API failure")).Once()

err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state)
require.NoError(t, err)
}
4 changes: 2 additions & 2 deletions bundle/deploy/lock/acquire.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m *acquire) init(b *bundle.Bundle) error {

func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error {
// Return early if locking is disabled.
if !b.Config.Bundle.Lock.IsEnabled() {
if !b.Config.Bundle.Deployment.Lock.IsEnabled() {
log.Infof(ctx, "Skipping; locking is disabled")
return nil
}
Expand All @@ -45,7 +45,7 @@ func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error {
return err
}

force := b.Config.Bundle.Lock.Force
force := b.Config.Bundle.Deployment.Lock.Force
log.Infof(ctx, "Acquiring deployment lock (force: %v)", force)
err = b.Locker.Lock(ctx, force)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bundle/deploy/lock/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *release) Name() string {

func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error {
// Return early if locking is disabled.
if !b.Config.Bundle.Lock.IsEnabled() {
if !b.Config.Bundle.Deployment.Lock.IsEnabled() {
log.Infof(ctx, "Skipping; locking is disabled")
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/databricks/cli/bundle/artifacts"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/deploy"
"github.com/databricks/cli/bundle/deploy/files"
"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/bundle/deploy/metadata"
Expand All @@ -22,6 +23,8 @@ func Deploy() bundle.Mutator {
lock.Acquire(),
bundle.Defer(
bundle.Seq(
terraform.StatePull(),
deploy.CheckRunningResource(),
mutator.ValidateGitDetails(),
libraries.MatchWithArtifacts(),
artifacts.CleanUp(),
Expand All @@ -31,7 +34,6 @@ func Deploy() bundle.Mutator {
permissions.ApplyWorkspaceRootPermissions(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
bundle.Defer(
terraform.Apply(),
bundle.Seq(
Expand Down
8 changes: 7 additions & 1 deletion cmd/bundle/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@ func newDeployCommand() *cobra.Command {

var force bool
var forceLock bool
var failOnActiveRuns bool
var computeID string
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())

b.Config.Bundle.Force = force
b.Config.Bundle.Lock.Force = forceLock
b.Config.Bundle.Deployment.Lock.Force = forceLock
b.Config.Bundle.ComputeID = computeID

if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
}

return bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
phases.Build(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/bundle/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newDestroyCommand() *cobra.Command {
b := bundle.Get(ctx)

// If `--force-lock` is specified, force acquisition of the deployment lock.
b.Config.Bundle.Lock.Force = forceDestroy
b.Config.Bundle.Deployment.Lock.Force = forceDestroy

// If `--auto-approve`` is specified, we skip confirmation checks
b.AutoApprove = autoApprove
Expand Down

0 comments on commit 6edab93

Please sign in to comment.