Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to restart always running jobs #715

Merged
merged 6 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
## 0.3.6

* Added support for hybrid pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689))
* Added support for `always_running` jobs, which are restarted on resource updates ([#715](https://github.com/databrickslabs/terraform-provider-databricks/pull/715))
* Azure CLI auth is now forcing JSON output ([#717](https://github.com/databrickslabs/terraform-provider-databricks/pull/717))
* `databricks_permissions` are getting validation on `terraform plan` stage ([#706](https://github.com/databrickslabs/terraform-provider-databricks/pull/706))
* Added `databricks_directory` resource ([#690](https://github.com/databrickslabs/terraform-provider-databricks/pull/690))
* Added support for hybrid instance pools ([#689](https://github.com/databrickslabs/terraform-provider-databricks/pull/689))
* Added `run_as_role` field to `databricks_sql_query` ([#684](https://github.com/databrickslabs/terraform-provider-databricks/pull/684))
* Added `user_id` attribute for `databricks_user` data resource, so that it's possible to dynamically create resources based on members of the group ([#714](https://github.com/databrickslabs/terraform-provider-databricks/pull/714))

Updated dependency versions:

* Bump github.com/aws/aws-sdk-go from 1.38.51 to 1.38.71
* Bump github.com/Azure/go-autorest/autorest/azure/auth from 0.5.7 to 0.5.8
* Bump github.com/Azure/go-autorest/autorest from 0.11.18 to 0.11.19
* Bump github.com/Azure/go-autorest/autorest/adal from 0.9.13 to 0.9.14
* Bump github.com/zclconf/go-cty from 1.8.3 to 1.8.4
* Bump github.com/hashicorp/terraform-plugin-sdk/v2 from 2.6.1 to 2.7.0

## 0.3.5

Expand Down
2 changes: 2 additions & 0 deletions common/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Resource struct {
Read func(ctx context.Context, d *schema.ResourceData, c *DatabricksClient) error
Update func(ctx context.Context, d *schema.ResourceData, c *DatabricksClient) error
Delete func(ctx context.Context, d *schema.ResourceData, c *DatabricksClient) error
CustomizeDiff func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error
StateUpgraders []schema.StateUpgrader
Schema map[string]*schema.Schema
SchemaVersion int
Expand Down Expand Up @@ -60,6 +61,7 @@ func (r Resource) ToResource() *schema.Resource {
Schema: r.Schema,
SchemaVersion: r.SchemaVersion,
StateUpgraders: r.StateUpgraders,
CustomizeDiff: r.CustomizeDiff,
CreateContext: func(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
c := m.(*DatabricksClient)
err := r.Create(ctx, d, c)
Expand Down
4 changes: 3 additions & 1 deletion compute/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,9 @@ func (j Job) ID() string {

// RunParameters ...
type RunParameters struct {
// TODO: if we add job_id, it can be also a request to RunNow
// a shortcut field to reuse this type for RunNow
JobID int64 `json:"job_id,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be tf:"computed" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this entity is not exposed to TF - just internal comms


NotebookParams map[string]string `json:"notebook_params,omitempty"`
JarParams []string `json:"jar_params,omitempty"`
PythonParams []string `json:"python_params,omitempty"`
Expand Down
162 changes: 121 additions & 41 deletions compute/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"log"
"strconv"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

Expand Down Expand Up @@ -36,6 +38,94 @@ func (a JobsAPI) RunsList(r JobRunsListRequest) (jrl JobRunsList, err error) {
return
}

// RunsCancel ...
func (a JobsAPI) RunsCancel(runID int64, timeout time.Duration) error {
var response interface{}
err := a.client.Post(a.context, "/jobs/runs/cancel", map[string]interface{}{
"run_id": runID,
}, &response)
if err != nil {
return err
}
return a.waitForRunState(runID, "TERMINATED", timeout)
}

func (a JobsAPI) waitForRunState(runID int64, desiredState string, timeout time.Duration) error {
return resource.RetryContext(a.context, timeout, func() *resource.RetryError {
jobRun, err := a.RunsGet(runID)
if err != nil {
return resource.NonRetryableError(
fmt.Errorf("cannot get job %s: %v", desiredState, err))
}
state := jobRun.State
if state.LifeCycleState == desiredState {
return nil
}
if state.LifeCycleState == "INTERNAL_ERROR" {
return resource.NonRetryableError(
fmt.Errorf("cannot get job %s: %s",
desiredState, state.StateMessage))
}
return resource.RetryableError(
fmt.Errorf("run is %s: %s",
state.LifeCycleState,
state.StateMessage))
})
}

// RunNow triggers the job and returns a run ID
func (a JobsAPI) RunNow(jobID int64) (int64, error) {
var jr JobRun
err := a.client.Post(a.context, "/jobs/run-now", RunParameters{
JobID: jobID,
}, &jr)
return jr.RunID, err
}

// RunsGet to retrieve information about the run
func (a JobsAPI) RunsGet(runID int64) (JobRun, error) {
var jr JobRun
err := a.client.Get(a.context, "/jobs/runs/get", map[string]interface{}{
"run_id": runID,
}, &jr)
return jr, err
}

func (a JobsAPI) Start(jobID int64, timeout time.Duration) error {
runID, err := a.RunNow(jobID)
if err != nil {
return fmt.Errorf("cannot start job run: %v", err)
}
return a.waitForRunState(runID, "RUNNING", timeout)
}

func (a JobsAPI) Restart(id string, timeout time.Duration) error {
jobID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
return err
}
runs, err := a.RunsList(JobRunsListRequest{JobID: jobID, ActiveOnly: true})
if err != nil {
return err
}
if len(runs.Runs) == 0 {
// nothing to cancel
return a.Start(jobID, timeout)
}
if len(runs.Runs) > 1 {
return fmt.Errorf("`always_running` must be specified only with "+
"`max_concurrent_runs = 1`. There are %d active runs", len(runs.Runs))
}
if len(runs.Runs) == 1 {
activeRun := runs.Runs[0]
err = a.RunsCancel(activeRun.RunID, timeout)
if err != nil {
return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunID, err)
}
}
return a.Start(jobID, timeout)
}

// Create creates a job on the workspace given the job settings
func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) {
var job Job
Expand Down Expand Up @@ -100,25 +190,16 @@ func wrapMissingJobError(err error, id string) error {

var jobSchema = common.StructToSchema(JobSettings{},
func(s map[string]*schema.Schema) map[string]*schema.Schema {
s["existing_cluster_id"].Description = "If existing_cluster_id, the ID " +
"of an existing cluster that will be used for all runs of this job. " +
"When running jobs on an existing cluster, you may need to manually " +
"restart the cluster if it stops responding. We strongly suggest to use " +
"`new_cluster` for greater reliability."
s["new_cluster"].Description = "Same set of parameters as for " +
"[databricks_cluster](cluster.md) resource."
if p, err := common.SchemaPath(s, "new_cluster", "num_workers"); err == nil {
p.Optional = true
p.Default = 0
p.Type = schema.TypeInt
p.ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(0))
p.Required = false
}

if p, err := common.SchemaPath(s, "schedule", "pause_status"); err == nil {
p.ValidateFunc = validation.StringInSlice([]string{"PAUSED", "UNPAUSED"}, false)
}

if v, err := common.SchemaPath(s, "new_cluster", "spark_conf"); err == nil {
v.DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool {
isPossiblyLegacyConfig := k == "new_cluster.0.spark_conf.%" && old == "1" && new == "0"
Expand All @@ -130,7 +211,6 @@ var jobSchema = common.StructToSchema(JobSettings{},
return false
}
}

if v, err := common.SchemaPath(s, "new_cluster", "aws_attributes"); err == nil {
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.aws_attributes.#")
}
Expand All @@ -140,40 +220,16 @@ var jobSchema = common.StructToSchema(JobSettings{},
if v, err := common.SchemaPath(s, "new_cluster", "gcp_attributes"); err == nil {
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#")
}

s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#")

s["name"].Description = "An optional name for the job. The default value is Untitled."
s["library"].Description = "An optional list of libraries to be installed on " +
"the cluster that will execute the job. The default value is an empty list."
s["email_notifications"].Description = "An optional set of email addresses " +
"notified when runs of this job begin and complete and when this job is " +
"deleted. The default behavior is to not send any emails."
s["timeout_seconds"].Description = "An optional timeout applied to each run " +
"of this job. The default behavior is to have no timeout."
s["max_retries"].Description = "An optional maximum number of times to retry " +
"an unsuccessful run. A run is considered to be unsuccessful if it " +
"completes with a FAILED result_state or INTERNAL_ERROR life_cycle_state. " +
"The value -1 means to retry indefinitely and the value 0 means to never " +
"retry. The default behavior is to never retry."
s["min_retry_interval_millis"].Description = "An optional minimal interval in " +
"milliseconds between the start of the failed run and the subsequent retry run. " +
"The default behavior is that unsuccessful runs are immediately retried."
s["retry_on_timeout"].Description = "An optional policy to specify whether to " +
"retry a job when it times out. The default behavior is to not retry on timeout."
s["schedule"].Description = "An optional periodic schedule for this job. " +
"The default behavior is that the job runs when triggered by clicking " +
"Run Now in the Jobs UI or sending an API request to runNow."
s["max_concurrent_runs"].ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(1))
s["url"] = &schema.Schema{
Type: schema.TypeString,
Computed: true,
}
s["max_concurrent_runs"] = &schema.Schema{
Optional: true,
Default: 1,
Type: schema.TypeInt,
ValidateDiagFunc: validation.ToDiagFunc(validation.IntAtLeast(1)),
Description: "An optional maximum allowed number of concurrent runs of the job.",
nfx marked this conversation as resolved.
Show resolved Hide resolved
s["always_running"] = &schema.Schema{
Optional: true,
Default: false,
Type: schema.TypeBool,
}
return s
})
Expand All @@ -183,6 +239,18 @@ func ResourceJob() *schema.Resource {
return common.Resource{
Schema: jobSchema,
SchemaVersion: 2,
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(DefaultProvisionTimeout),
Update: schema.DefaultTimeout(DefaultProvisionTimeout),
},
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error {
alwaysRunning := d.Get("always_running").(bool)
maxConcurrentRuns := d.Get("max_concurrent_runs").(int)
if alwaysRunning && maxConcurrentRuns > 1 {
return fmt.Errorf("`always_running` must be specified only with `max_concurrent_runs = 1`")
}
return nil
},
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var js JobSettings
err := common.DataToStructPointer(d, jobSchema, &js)
Expand All @@ -194,11 +262,15 @@ func ResourceJob() *schema.Resource {
return err
}
}
job, err := NewJobsAPI(ctx, c).Create(js)
jobsAPI := NewJobsAPI(ctx, c)
job, err := jobsAPI.Create(js)
if err != nil {
return err
}
d.SetId(job.ID())
if d.Get("always_running").(bool) {
return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate))
}
return nil
},
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
Expand All @@ -221,7 +293,15 @@ func ResourceJob() *schema.Resource {
return err
}
}
return NewJobsAPI(ctx, c).Update(d.Id(), js)
jobsAPI := NewJobsAPI(ctx, c)
err = jobsAPI.Update(d.Id(), js)
if err != nil {
return err
}
if d.Get("always_running").(bool) {
return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate))
}
return nil
},
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
return NewJobsAPI(ctx, c).Delete(d.Id())
Expand Down
Loading