Skip to content

Commit

Permalink
Added configurable timeouts for databricks_job
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Jul 8, 2021
1 parent 09f4ac4 commit 54ef068
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
## 0.3.6

* Added support for hybrid pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689))
* Added support for `always_running` jobs, which are restarted on resource updates ([#715](https://github.com/databrickslabs/terraform-provider-databricks/pull/715))
* Azure CLI auth is now forcing JSON output ([#717](https://github.com/databrickslabs/terraform-provider-databricks/pull/717))
* `databricks_permissions` are getting validation on `terraform plan` stage ([#706](https://github.com/databrickslabs/terraform-provider-databricks/pull/706))
* Added `databricks_directory` resource ([#690](https://github.com/databrickslabs/terraform-provider-databricks/pull/690))
* Added support for hybrid instance pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689))
* Added `run_as_role` field to `databricks_sql_query` ([#684](https://github.com/databrickslabs/terraform-provider-databricks/pull/684))
* Added `user_id` attribute for `databricks_user` data resource, so that it's possible to dynamically create resources based on members of the group ([#714](https://github.com/databrickslabs/terraform-provider-databricks/pull/714))

Updated dependency versions:

* Bump github.com/aws/aws-sdk-go from 1.38.51 to 1.38.71
* Bump github.com/Azure/go-autorest/autorest/azure/auth from 0.5.7 to 0.5.8
* Bump github.com/Azure/go-autorest/autorest from 0.11.18 to 0.11.19
* Bump github.com/Azure/go-autorest/autorest/adal from 0.9.13 to 0.9.14
* Bump github.com/zclconf/go-cty from 1.8.3 to 1.8.4
* Bump github.com/hashicorp/terraform-plugin-sdk/v2 from 2.6.1 to 2.7.0

## 0.3.5

Expand Down
32 changes: 18 additions & 14 deletions compute/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (

// NewJobsAPI creates JobsAPI instance from provider meta
func NewJobsAPI(ctx context.Context, m interface{}) JobsAPI {
return JobsAPI{m.(*common.DatabricksClient), ctx, 20 * time.Minute}
return JobsAPI{m.(*common.DatabricksClient), ctx}
}

// JobsAPI exposes the Jobs API
type JobsAPI struct {
client *common.DatabricksClient
context context.Context
timeout time.Duration
}

// List all jobs
Expand All @@ -40,19 +39,19 @@ func (a JobsAPI) RunsList(r JobRunsListRequest) (jrl JobRunsList, err error) {
}

// RunsCancel ...
func (a JobsAPI) RunsCancel(runID int64) error {
func (a JobsAPI) RunsCancel(runID int64, timeout time.Duration) error {
var response interface{}
err := a.client.Post(a.context, "/jobs/runs/cancel", map[string]interface{}{
"run_id": runID,
}, &response)
if err != nil {
return err
}
return a.waitForRunState(runID, "TERMINATED")
return a.waitForRunState(runID, "TERMINATED", timeout)
}

func (a JobsAPI) waitForRunState(runID int64, desiredState string) error {
return resource.RetryContext(a.context, a.timeout, func() *resource.RetryError {
func (a JobsAPI) waitForRunState(runID int64, desiredState string, timeout time.Duration) error {
return resource.RetryContext(a.context, timeout, func() *resource.RetryError {
jobRun, err := a.RunsGet(runID)
if err != nil {
return resource.NonRetryableError(
Expand Down Expand Up @@ -92,15 +91,15 @@ func (a JobsAPI) RunsGet(runID int64) (JobRun, error) {
return jr, err
}

func (a JobsAPI) Start(jobID int64) error {
func (a JobsAPI) Start(jobID int64, timeout time.Duration) error {
runID, err := a.RunNow(jobID)
if err != nil {
return fmt.Errorf("cannot start job run: %v", err)
}
return a.waitForRunState(runID, "RUNNING")
return a.waitForRunState(runID, "RUNNING", timeout)
}

func (a JobsAPI) Restart(id string) error {
func (a JobsAPI) Restart(id string, timeout time.Duration) error {
jobID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
return err
Expand All @@ -111,20 +110,20 @@ func (a JobsAPI) Restart(id string) error {
}
if len(runs.Runs) == 0 {
// nothing to cancel
return a.Start(jobID)
return a.Start(jobID, timeout)
}
if len(runs.Runs) > 1 {
return fmt.Errorf("`always_running` must be specified only with "+
"`max_concurrent_runs = 1`. There are %d active runs", len(runs.Runs))
}
if len(runs.Runs) == 1 {
activeRun := runs.Runs[0]
err = a.RunsCancel(activeRun.RunID)
err = a.RunsCancel(activeRun.RunID, timeout)
if err != nil {
return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunID, err)
}
}
return a.Start(jobID)
return a.Start(jobID, timeout)
}

// Create creates a job on the workspace given the job settings
Expand Down Expand Up @@ -222,6 +221,7 @@ var jobSchema = common.StructToSchema(JobSettings{},
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#")
}
s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#")
s["max_concurrent_runs"].ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(1))
s["url"] = &schema.Schema{
Type: schema.TypeString,
Computed: true,
Expand All @@ -239,6 +239,10 @@ func ResourceJob() *schema.Resource {
return common.Resource{
Schema: jobSchema,
SchemaVersion: 2,
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(DefaultProvisionTimeout),
Update: schema.DefaultTimeout(DefaultProvisionTimeout),
},
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error {
alwaysRunning := d.Get("always_running").(bool)
maxConcurrentRuns := d.Get("max_concurrent_runs").(int)
Expand All @@ -265,7 +269,7 @@ func ResourceJob() *schema.Resource {
}
d.SetId(job.ID())
if d.Get("always_running").(bool) {
return jobsAPI.Start(job.JobID)
return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate))
}
return nil
},
Expand Down Expand Up @@ -295,7 +299,7 @@ func ResourceJob() *schema.Resource {
return err
}
if d.Get("always_running").(bool) {
return jobsAPI.Restart(d.Id())
return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate))
}
return nil
},
Expand Down
22 changes: 11 additions & 11 deletions compute/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,38 +766,38 @@ func TestJobRestarts(t *testing.T) {
},
}, func(ctx context.Context, client *common.DatabricksClient) {
ja := NewJobsAPI(ctx, client)
ja.timeout = 500 * time.Millisecond
timeout := 500 * time.Millisecond

err := ja.Start(123)
err := ja.Start(123, timeout)
assert.NoError(t, err)

err = ja.waitForRunState(345, "RUNNING")
err = ja.waitForRunState(345, "RUNNING", timeout)
assert.EqualError(t, err, "cannot get job RUNNING: nope")

err = ja.waitForRunState(456, "TERMINATED")
err = ja.waitForRunState(456, "TERMINATED", timeout)
assert.EqualError(t, err, "cannot get job TERMINATED: Quota exceeded")

err = ja.waitForRunState(890, "RUNNING")
err = ja.waitForRunState(890, "RUNNING", timeout)
assert.EqualError(t, err, "run is SOMETHING: Checking...")

// no active runs for the first time
err = ja.Restart("123")
err = ja.Restart("123", timeout)
assert.NoError(t, err)

// one active run for the second time
err = ja.Restart("123")
err = ja.Restart("123", timeout)
assert.NoError(t, err)

err = ja.Restart("111")
err = ja.Restart("111", timeout)
assert.EqualError(t, err, "cannot cancel run 567: nope")

err = ja.Restart("a")
err = ja.Restart("a", timeout)
assert.EqualError(t, err, "strconv.ParseInt: parsing \"a\": invalid syntax")

err = ja.Restart("222")
err = ja.Restart("222", timeout)
assert.EqualError(t, err, "nope")

err = ja.Restart("678")
err = ja.Restart("678", timeout)
assert.EqualError(t, err, "`always_running` must be specified only "+
"with `max_concurrent_runs = 1`. There are 2 active runs")
})
Expand Down
13 changes: 12 additions & 1 deletion docs/resources/job.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ The following arguments are required:
* `name` - (Optional) An optional name for the job. The default value is Untitled.
* `new_cluster` - (Optional) Same set of parameters as for [databricks_cluster](cluster.md) resource.
* `existing_cluster_id` - (Optional) If existing_cluster_id, the ID of an existing [cluster](cluster.md) that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. We strongly suggest to use `new_cluster` for greater reliability.
* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks.
* `library` - (Optional) (Set) An optional list of libraries to be installed on the cluster that will execute the job. Please consult [libraries section](cluster.md#libraries) for [databricks_cluster](cluster.md) resource.
* `retry_on_timeout` - (Optional) (Bool) An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout.
* `max_retries` - (Optional) (Integer) An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED result_state or INTERNAL_ERROR life_cycle_state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry.
* `timeout_seconds` - (Optional) (Integer) An optional timeout applied to each run of this job. The default behavior is to have no timeout.
* `min_retry_interval_millis` - (Optional) (Integer) An optional minimal interval in milliseconds between the start of the failed run and the subsequent retry run. The default behavior is that unsuccessful runs are immediately retried.
* `max_concurrent_runs` - (Optional) (Integer) An optional maximum allowed number of concurrent runs of the job.
* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default.
* `email_notifications` - (Optional) (List) An optional set of email addresses notified when runs of this job begin and complete and when this job is deleted. The default behavior is to not send any emails. This field is a block and is documented below.
* `schedule` - (Optional) (List) An optional periodic schedule for this job. The default behavior is that the job runs when triggered by clicking Run Now in the Jobs UI or sending an API request to runNow. This field is a block and is documented below.

Expand Down Expand Up @@ -105,6 +105,17 @@ By default, all users can create and modify jobs unless an administrator [enable
* [databricks_permissions](permissions.md#Job-usage) can control which groups or individual users can *Can View*, *Can Manage Run*, and *Can Manage*.
* [databricks_cluster_policy](cluster_policy.md) can control which kinds of clusters users can create for jobs.

## Timeouts

The `timeouts` block allows you to specify `create` and `update` timeouts if you have an `always_running` job. Please launch `TF_LOG=DEBUG terraform apply` whenever you observe timeout issues.

```hcl
timeouts {
create = "20m"
update = "20m
}
```

## Import

The resource job can be imported using the id of the job
Expand Down

0 comments on commit 54ef068

Please sign in to comment.