From 1010a0d2dadc88041f8f9d00f215df8550eb40fc Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Thu, 8 Jul 2021 19:06:51 +0200 Subject: [PATCH] Added configurable timeouts for databricks_job --- CHANGELOG.md | 16 ++++++++++++++++ compute/resource_job.go | 32 ++++++++++++++++++-------------- compute/resource_job_test.go | 22 +++++++++++----------- docs/resources/job.md | 13 ++++++++++++- 4 files changed, 57 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00b386a59f..9806d87886 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,22 @@ ## 0.3.6 * Added support for hybrid pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689)) +* Added support for `always_running` jobs, which are restarted on resource updates ([#715](https://github.com/databrickslabs/terraform-provider-databricks/pull/715)) +* Azure CLI auth is now forcing JSON output ([#717](https://github.com/databrickslabs/terraform-provider-databricks/pull/717)) +* `databricks_permissions` are getting validation on `terraform plan` stage ([#706](https://github.com/databrickslabs/terraform-provider-databricks/pull/706)) +* Added `databricks_directory` resource ([#690](https://github.com/databrickslabs/terraform-provider-databricks/pull/690)) +* Added support for hybrid instance pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689)) +* Added `run_as_role` field to `databricks_sql_query` ([#684](https://github.com/databrickslabs/terraform-provider-databricks/pull/684)) +* Added `user_id` attribute for `databricks_user` data resource, so that it's possible to dynamically create resources based on members of the group ([#714](https://github.com/databrickslabs/terraform-provider-databricks/pull/714)) + +Updated dependency versions: + +* Bump github.com/aws/aws-sdk-go from 1.38.51 to 1.38.71 +* Bump github.com/Azure/go-autorest/autorest/azure/auth from 0.5.7 to 0.5.8 +* Bump github.com/Azure/go-autorest/autorest from 0.11.18 to 0.11.19 +* Bump github.com/Azure/go-autorest/autorest/adal from 0.9.13 to 0.9.14 +* Bump github.com/zclconf/go-cty from 1.8.3 to 1.8.4 +* Bump github.com/hashicorp/terraform-plugin-sdk/v2 from 2.6.1 to 2.7.0 ## 0.3.5 diff --git a/compute/resource_job.go b/compute/resource_job.go index 09d38ef6fe..d498d95e1f 100644 --- a/compute/resource_job.go +++ b/compute/resource_job.go @@ -17,14 +17,13 @@ import ( // NewJobsAPI creates JobsAPI instance from provider meta func NewJobsAPI(ctx context.Context, m interface{}) JobsAPI { - return JobsAPI{m.(*common.DatabricksClient), ctx, 20 * time.Minute} + return JobsAPI{m.(*common.DatabricksClient), ctx} } // JobsAPI exposes the Jobs API type JobsAPI struct { client *common.DatabricksClient context context.Context - timeout time.Duration } // List all jobs @@ -40,7 +39,7 @@ func (a JobsAPI) RunsList(r JobRunsListRequest) (jrl JobRunsList, err error) { } // RunsCancel ... -func (a JobsAPI) RunsCancel(runID int64) error { +func (a JobsAPI) RunsCancel(runID int64, timeout time.Duration) error { var response interface{} err := a.client.Post(a.context, "/jobs/runs/cancel", map[string]interface{}{ "run_id": runID, @@ -48,11 +47,11 @@ func (a JobsAPI) RunsCancel(runID int64) error { if err != nil { return err } - return a.waitForRunState(runID, "TERMINATED") + return a.waitForRunState(runID, "TERMINATED", timeout) } -func (a JobsAPI) waitForRunState(runID int64, desiredState string) error { - return resource.RetryContext(a.context, a.timeout, func() *resource.RetryError { +func (a JobsAPI) waitForRunState(runID int64, desiredState string, timeout time.Duration) error { + return resource.RetryContext(a.context, timeout, func() *resource.RetryError { jobRun, err := a.RunsGet(runID) if err != nil { return resource.NonRetryableError( @@ -92,15 +91,15 @@ func (a JobsAPI) RunsGet(runID int64) (JobRun, error) { return jr, err } -func (a JobsAPI) Start(jobID int64) error { +func (a JobsAPI) Start(jobID int64, timeout time.Duration) error { runID, err := a.RunNow(jobID) if err != nil { return fmt.Errorf("cannot start job run: %v", err) } - return a.waitForRunState(runID, "RUNNING") + return a.waitForRunState(runID, "RUNNING", timeout) } -func (a JobsAPI) Restart(id string) error { +func (a JobsAPI) Restart(id string, timeout time.Duration) error { jobID, err := strconv.ParseInt(id, 10, 32) if err != nil { return err @@ -111,7 +110,7 @@ func (a JobsAPI) Restart(id string) error { } if len(runs.Runs) == 0 { // nothing to cancel - return a.Start(jobID) + return a.Start(jobID, timeout) } if len(runs.Runs) > 1 { return fmt.Errorf("`always_running` must be specified only with "+ @@ -119,12 +118,12 @@ func (a JobsAPI) Restart(id string) error { } if len(runs.Runs) == 1 { activeRun := runs.Runs[0] - err = a.RunsCancel(activeRun.RunID) + err = a.RunsCancel(activeRun.RunID, timeout) if err != nil { return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunID, err) } } - return a.Start(jobID) + return a.Start(jobID, timeout) } // Create creates a job on the workspace given the job settings @@ -222,6 +221,7 @@ var jobSchema = common.StructToSchema(JobSettings{}, v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#") } s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#") + s["max_concurrent_runs"].ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(1)) s["url"] = &schema.Schema{ Type: schema.TypeString, Computed: true, @@ -239,6 +239,10 @@ func ResourceJob() *schema.Resource { return common.Resource{ Schema: jobSchema, SchemaVersion: 2, + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(DefaultProvisionTimeout), + Update: schema.DefaultTimeout(DefaultProvisionTimeout), + }, CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error { alwaysRunning := d.Get("always_running").(bool) maxConcurrentRuns := d.Get("max_concurrent_runs").(int) @@ -265,7 +269,7 @@ func ResourceJob() *schema.Resource { } d.SetId(job.ID()) if d.Get("always_running").(bool) { - return jobsAPI.Start(job.JobID) + return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate)) } return nil }, @@ -295,7 +299,7 @@ func ResourceJob() *schema.Resource { return err } if d.Get("always_running").(bool) { - return jobsAPI.Restart(d.Id()) + return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate)) } return nil }, diff --git a/compute/resource_job_test.go b/compute/resource_job_test.go index dd5adaa2af..1243240b8d 100644 --- a/compute/resource_job_test.go +++ b/compute/resource_job_test.go @@ -766,38 +766,38 @@ func TestJobRestarts(t *testing.T) { }, }, func(ctx context.Context, client *common.DatabricksClient) { ja := NewJobsAPI(ctx, client) - ja.timeout = 500 * time.Millisecond + timeout := 500 * time.Millisecond - err := ja.Start(123) + err := ja.Start(123, timeout) assert.NoError(t, err) - err = ja.waitForRunState(345, "RUNNING") + err = ja.waitForRunState(345, "RUNNING", timeout) assert.EqualError(t, err, "cannot get job RUNNING: nope") - err = ja.waitForRunState(456, "TERMINATED") + err = ja.waitForRunState(456, "TERMINATED", timeout) assert.EqualError(t, err, "cannot get job TERMINATED: Quota exceeded") - err = ja.waitForRunState(890, "RUNNING") + err = ja.waitForRunState(890, "RUNNING", timeout) assert.EqualError(t, err, "run is SOMETHING: Checking...") // no active runs for the first time - err = ja.Restart("123") + err = ja.Restart("123", timeout) assert.NoError(t, err) // one active run for the second time - err = ja.Restart("123") + err = ja.Restart("123", timeout) assert.NoError(t, err) - err = ja.Restart("111") + err = ja.Restart("111", timeout) assert.EqualError(t, err, "cannot cancel run 567: nope") - err = ja.Restart("a") + err = ja.Restart("a", timeout) assert.EqualError(t, err, "strconv.ParseInt: parsing \"a\": invalid syntax") - err = ja.Restart("222") + err = ja.Restart("222", timeout) assert.EqualError(t, err, "nope") - err = ja.Restart("678") + err = ja.Restart("678", timeout) assert.EqualError(t, err, "`always_running` must be specified only "+ "with `max_concurrent_runs = 1`. There are 2 active runs") }) diff --git a/docs/resources/job.md b/docs/resources/job.md index bf0feb02fc..608c129b8c 100644 --- a/docs/resources/job.md +++ b/docs/resources/job.md @@ -54,13 +54,13 @@ The following arguments are required: * `name` - (Optional) An optional name for the job. The default value is Untitled. * `new_cluster` - (Optional) Same set of parameters as for [databricks_cluster](cluster.md) resource. * `existing_cluster_id` - (Optional) If existing_cluster_id, the ID of an existing [cluster](cluster.md) that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. We strongly suggest to use `new_cluster` for greater reliability. +* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks. * `library` - (Optional) (Set) An optional list of libraries to be installed on the cluster that will execute the job. Please consult [libraries section](cluster.md#libraries) for [databricks_cluster](cluster.md) resource. * `retry_on_timeout` - (Optional) (Bool) An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout. * `max_retries` - (Optional) (Integer) An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED result_state or INTERNAL_ERROR life_cycle_state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry. * `timeout_seconds` - (Optional) (Integer) An optional timeout applied to each run of this job. The default behavior is to have no timeout. * `min_retry_interval_millis` - (Optional) (Integer) An optional minimal interval in milliseconds between the start of the failed run and the subsequent retry run. The default behavior is that unsuccessful runs are immediately retried. * `max_concurrent_runs` - (Optional) (Integer) An optional maximum allowed number of concurrent runs of the job. -* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. * `email_notifications` - (Optional) (List) An optional set of email addresses notified when runs of this job begin and complete and when this job is deleted. The default behavior is to not send any emails. This field is a block and is documented below. * `schedule` - (Optional) (List) An optional periodic schedule for this job. The default behavior is that the job runs when triggered by clicking Run Now in the Jobs UI or sending an API request to runNow. This field is a block and is documented below. @@ -105,6 +105,17 @@ By default, all users can create and modify jobs unless an administrator [enable * [databricks_permissions](permissions.md#Job-usage) can control which groups or individual users can *Can View*, *Can Manage Run*, and *Can Manage*. * [databricks_cluster_policy](cluster_policy.md) can control which kinds of clusters users can create for jobs. +## Timeouts + +The `timeouts` block allows you to specify `create` and `update` timeouts if you have an `always_running` job. Please launch `TF_LOG=DEBUG terraform apply` whenever you observe timeout issues. + +```hcl +timeouts { + create = "20m" + update = "20m +} +``` + ## Import The resource job can be imported using the id of the job