diff --git a/CHANGELOG.md b/CHANGELOG.md index adf3c19965..00ae1c8115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.3.9 +* Added initial support for multiple task orchestration in `databricks_job` [#853](https://github.com/databrickslabs/terraform-provider-databricks/pull/853) * Fixed provider crash for new terraform states related to bug [#813](https://github.com/hashicorp/terraform-plugin-sdk/issues/813) in Terraform SDK v2.8.0 ([#854](https://github.com/databrickslabs/terraform-provider-databricks/issues/854)) Updated dependency versions: diff --git a/access/resource_sql_permissions_test.go b/access/resource_sql_permissions_test.go index 0a36ceb816..ddcaee2f8c 100644 --- a/access/resource_sql_permissions_test.go +++ b/access/resource_sql_permissions_test.go @@ -431,5 +431,5 @@ func TestResourceSqlPermissions_Delete(t *testing.T) { } func TestResourceSqlPermissions_CornerCases(t *testing.T) { - qa.ResourceCornerCases(t, ResourceSqlPermissions(), "database/foo") + qa.ResourceCornerCases(t, ResourceSqlPermissions(), qa.CornerCaseID("database/foo")) } diff --git a/common/http.go b/common/http.go index 71c4aed5ba..b99d3d941c 100644 --- a/common/http.go +++ b/common/http.go @@ -244,7 +244,7 @@ func (c *DatabricksClient) checkHTTPRetry(ctx context.Context, resp *http.Respon // Get on path func (c *DatabricksClient) Get(ctx context.Context, path string, request interface{}, response interface{}) error { - body, err := c.authenticatedQuery(ctx, http.MethodGet, path, request, c.api2) + body, err := c.authenticatedQuery(ctx, http.MethodGet, path, request, c.completeUrl) if err != nil { return err } @@ -253,7 +253,7 @@ func (c *DatabricksClient) Get(ctx context.Context, path string, request interfa // Post on path func (c *DatabricksClient) Post(ctx context.Context, path string, request interface{}, response interface{}) error { - body, err := c.authenticatedQuery(ctx, http.MethodPost, path, request, c.api2) + body, err := c.authenticatedQuery(ctx, http.MethodPost, path, request, c.completeUrl) if err != nil { return err } @@ -262,19 +262,19 @@ func (c *DatabricksClient) Post(ctx context.Context, path string, request interf // Delete on path func (c *DatabricksClient) Delete(ctx context.Context, path string, request interface{}) error { - _, err := c.authenticatedQuery(ctx, http.MethodDelete, path, request, c.api2) + _, err := c.authenticatedQuery(ctx, http.MethodDelete, path, request, c.completeUrl) return err } // Patch on path func (c *DatabricksClient) Patch(ctx context.Context, path string, request interface{}) error { - _, err := c.authenticatedQuery(ctx, http.MethodPatch, path, request, c.api2) + _, err := c.authenticatedQuery(ctx, http.MethodPatch, path, request, c.completeUrl) return err } // Put on path func (c *DatabricksClient) Put(ctx context.Context, path string, request interface{}) error { - _, err := c.authenticatedQuery(ctx, http.MethodPut, path, request, c.api2) + _, err := c.authenticatedQuery(ctx, http.MethodPut, path, request, c.completeUrl) return err } @@ -298,28 +298,26 @@ func (c *DatabricksClient) unmarshall(path string, body []byte, response interfa } } -func (c *DatabricksClient) api2(r *http.Request) error { +type ApiVersion string + +const ( + API_1_2 ApiVersion = "1.2" + API_2_0 ApiVersion = "2.0" + API_2_1 ApiVersion = "2.1" +) + +func (c *DatabricksClient) completeUrl(r *http.Request) error { if r.URL == nil { return fmt.Errorf("no URL found in request") } - r.URL.Path = fmt.Sprintf("/api/2.0%s", r.URL.Path) - r.Header.Set("Content-Type", "application/json") - url, err := url.Parse(c.Host) - if err != nil { - return err + ctx := r.Context() + av, ok := ctx.Value(Api).(ApiVersion) + if !ok { + av = API_2_0 } - r.URL.Host = url.Host - r.URL.Scheme = url.Scheme - return nil -} - -func (c *DatabricksClient) api12(r *http.Request) error { - if r.URL == nil { - return fmt.Errorf("no URL found in request") - } - r.URL.Path = fmt.Sprintf("/api/1.2%s", r.URL.Path) + r.URL.Path = fmt.Sprintf("/api/%s%s", av, r.URL.Path) r.Header.Set("Content-Type", "application/json") url, err := url.Parse(c.Host) @@ -334,7 +332,7 @@ func (c *DatabricksClient) api12(r *http.Request) error { // Scim sets SCIM headers func (c *DatabricksClient) Scim(ctx context.Context, method, path string, request interface{}, response interface{}) error { - body, err := c.authenticatedQuery(ctx, method, path, request, c.api2, func(r *http.Request) error { + body, err := c.authenticatedQuery(ctx, method, path, request, c.completeUrl, func(r *http.Request) error { r.Header.Set("Content-Type", "application/scim+json") if c.isAccountsClient() && c.AccountID != "" { // until `/preview` is there for workspace scim @@ -348,15 +346,6 @@ func (c *DatabricksClient) Scim(ctx context.Context, method, path string, reques return c.unmarshall(path, body, &response) } -// OldAPI performs call on context api -func (c *DatabricksClient) OldAPI(ctx context.Context, method, path string, request interface{}, response interface{}) error { - body, err := c.authenticatedQuery(ctx, method, path, request, c.api12) - if err != nil { - return err - } - return c.unmarshall(path, body, &response) -} - func (c *DatabricksClient) authenticatedQuery(ctx context.Context, method, requestURL string, data interface{}, visitors ...func(*http.Request) error) (body []byte, err error) { err = c.Authenticate(ctx) @@ -462,7 +451,7 @@ func (c *DatabricksClient) genericQuery(ctx context.Context, method, requestURL headers += "\n" } } - log.Printf("[DEBUG] %s %s %s%v", method, requestURL, headers, c.redactedDump(requestBody)) // lgtm[go/clear-text-logging] + log.Printf("[DEBUG] %s %s %s%v", method, request.URL.Path, headers, c.redactedDump(requestBody)) // lgtm[go/clear-text-logging] r, err := retryablehttp.FromRequest(request) if err != nil { @@ -486,7 +475,7 @@ func (c *DatabricksClient) genericQuery(ctx context.Context, method, requestURL if err != nil { return nil, err } - log.Printf("[DEBUG] %s %v <- %s %s", resp.Status, c.redactedDump(body), method, requestURL) + log.Printf("[DEBUG] %s %v <- %s %s", resp.Status, c.redactedDump(body), method, request.URL.Path) return body, nil } diff --git a/common/http_test.go b/common/http_test.go index 0857402606..f628ded105 100644 --- a/common/http_test.go +++ b/common/http_test.go @@ -288,12 +288,12 @@ func TestUnmarshall(t *testing.T) { func TestAPI2(t *testing.T) { ws := DatabricksClient{Host: "ht_tp://example.com/"} - err := ws.api2(&http.Request{}) + err := ws.completeUrl(&http.Request{}) require.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "no URL found in request"), "Actual message: %s", err.Error()) - err = ws.api2(&http.Request{ + err = ws.completeUrl(&http.Request{ Header: http.Header{}, URL: &url.URL{ Path: "/x/y/x", @@ -314,15 +314,6 @@ func TestScim(t *testing.T) { require.NoError(t, err) } -func TestOldAPI(t *testing.T) { - ws, server := singleRequestServer(t, "GET", "/api/1.2/imaginary/endpoint", `{"a": "b"}`) - defer server.Close() - - var resp map[string]string - err := ws.OldAPI(context.Background(), "GET", "/imaginary/endpoint", nil, &resp) - require.NoError(t, err) -} - func TestMakeRequestBody(t *testing.T) { type x struct { Scope string `json:"scope" url:"scope"` diff --git a/common/reflect_resource.go b/common/reflect_resource.go index e74b13fb24..3493bbcd2b 100644 --- a/common/reflect_resource.go +++ b/common/reflect_resource.go @@ -66,10 +66,18 @@ func SchemaPath(s map[string]*schema.Schema, path ...string) (*schema.Schema, er return nil, fmt.Errorf("%v does not compute", path) } +func MustSchemaPath(s map[string]*schema.Schema, path ...string) *schema.Schema { + sch, err := SchemaPath(s, path...) + if err != nil { + panic(err) + } + return sch +} + // StructToSchema makes schema from a struct type & applies customizations from callback given func StructToSchema(v interface{}, customize func(map[string]*schema.Schema) map[string]*schema.Schema) map[string]*schema.Schema { rv := reflect.ValueOf(v) - scm := typeToSchema(rv, rv.Type()) + scm := typeToSchema(rv, rv.Type(), []string{}) if customize != nil { scm = customize(scm) } @@ -128,7 +136,9 @@ func chooseFieldName(typeField reflect.StructField) string { return strings.Split(jsonTag, ",")[0] } -func typeToSchema(v reflect.Value, t reflect.Type) map[string]*schema.Schema { +// typeToSchema converts struct into terraform schema. `path` is used for block suppressions +// special path element `"0"` is used to denote either arrays or sets of elements +func typeToSchema(v reflect.Value, t reflect.Type, path []string) map[string]*schema.Schema { scm := map[string]*schema.Schema{} rk := v.Kind() if rk != reflect.Struct { @@ -180,8 +190,14 @@ func typeToSchema(v reflect.Value, t reflect.Type) map[string]*schema.Schema { scm[fieldName].Type = schema.TypeList elem := typeField.Type.Elem() sv := reflect.New(elem).Elem() + if strings.Contains(tfTag, "suppress_diff") { + // TODO: we may also suppress count diffs on all json:"..,omitempty" without tf:"force_new" + // find . -type f -name '*.go' -not -path "vendor/*" | xargs grep ',omitempty' | grep '*' + blockCount := strings.Join(append(path, fieldName, "#"), ".") + scm[fieldName].DiffSuppressFunc = makeEmptyBlockSuppressFunc(blockCount) + } scm[fieldName].Elem = &schema.Resource{ - Schema: typeToSchema(sv, elem), + Schema: typeToSchema(sv, elem, append(path, fieldName, "0")), } case reflect.Slice: ft := schema.TypeList @@ -202,7 +218,7 @@ func typeToSchema(v reflect.Value, t reflect.Type) map[string]*schema.Schema { case reflect.Struct: sv := reflect.New(elem).Elem() scm[fieldName].Elem = &schema.Resource{ - Schema: typeToSchema(sv, elem), + Schema: typeToSchema(sv, elem, append(path, fieldName, "0")), } } default: diff --git a/common/reflect_resource_test.go b/common/reflect_resource_test.go index 17f7684df9..86c1b4bd41 100644 --- a/common/reflect_resource_test.go +++ b/common/reflect_resource_test.go @@ -194,7 +194,7 @@ type Dummy struct { Unique []Address `json:"unique,omitempty" tf:"slice_set"` Things []string `json:"things,omitempty" tf:"slice_set"` Tags map[string]string `json:"tags,omitempty" tf:"max_items:5"` - Home *Address `json:"home,omitempty" tf:"group:v"` + Home *Address `json:"home,omitempty" tf:"group:v,suppress_diff"` House *Address `json:"house,omitempty" tf:"group:v"` } @@ -368,6 +368,10 @@ func TestStructToData(t *testing.T) { assert.Equal(t, false, d.Get("enabled")) assert.Equal(t, 2, d.Get("addresses.#")) + assert.NotNil(t, s["home"].DiffSuppressFunc) + assert.True(t, s["home"].DiffSuppressFunc("home.#", "1", "0", d)) + assert.False(t, s["home"].DiffSuppressFunc("home.#", "1", "1", d)) + { //lint:ignore SA1019 Empty optional string should not be set. _, ok := d.GetOkExists("addresses.0.optional_string") diff --git a/common/resource.go b/common/resource.go index c9d1964dea..ee3e1098d0 100644 --- a/common/resource.go +++ b/common/resource.go @@ -3,6 +3,8 @@ package common import ( "context" "log" + "regexp" + "strings" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -96,11 +98,18 @@ func (r Resource) ToResource() *schema.Resource { } } -func MakeEmptyBlockSuppressFunc(name string) func(k, old, new string, d *schema.ResourceData) bool { +func MustCompileKeyRE(name string) *regexp.Regexp { + regexFromName := strings.ReplaceAll(name, ".", "\\.") + regexFromName = strings.ReplaceAll(regexFromName, ".0", ".\\d+") + return regexp.MustCompile(regexFromName) +} + +func makeEmptyBlockSuppressFunc(name string) func(k, old, new string, d *schema.ResourceData) bool { + re := MustCompileKeyRE(name) return func(k, old, new string, d *schema.ResourceData) bool { - log.Printf("[DEBUG] k='%v', old='%v', new='%v'", k, old, new) - if k == name && old == "1" && new == "0" { - log.Printf("[DEBUG] Disable removal of empty block") + log.Printf("[DEBUG] name=%s k='%v', old='%v', new='%v'", name, k, old, new) + if re.Match([]byte(name)) && old == "1" && new == "0" { + log.Printf("[DEBUG] Suppressing diff for name=%s k=%#v old=%#v new=%#v", name, k, old, new) return true } return false diff --git a/common/resource_test.go b/common/resource_test.go index e979c0b486..d6e14c772f 100644 --- a/common/resource_test.go +++ b/common/resource_test.go @@ -56,12 +56,16 @@ func TestUpdate(t *testing.T) { }, }.ToResource() + client := &DatabricksClient{} + ctx := context.Background() d := r.TestResourceData() - datas, err := r.Importer.StateContext( - context.Background(), d, - &DatabricksClient{}) + datas, err := r.Importer.StateContext(ctx, d, client) require.NoError(t, err) assert.Len(t, datas, 1) assert.False(t, r.Schema["foo"].ForceNew) assert.Equal(t, "", d.Id()) + + diags := r.UpdateContext(ctx, d, client) + assert.True(t, diags.HasError()) + assert.Equal(t, "nope", diags[0].Summary) } diff --git a/common/version.go b/common/version.go index 54ec256e98..90abc9f85d 100644 --- a/common/version.go +++ b/common/version.go @@ -12,6 +12,8 @@ var ( Current contextKey = 3 // If current resource is data IsData contextKey = 4 + // apiVersion + Api contextKey = 5 ) type contextKey int diff --git a/compute/acceptance/job_test.go b/compute/acceptance/job_test.go index ba5f007208..ccaa800d08 100644 --- a/compute/acceptance/job_test.go +++ b/compute/acceptance/job_test.go @@ -51,7 +51,7 @@ func TestAwsAccJobsCreate(t *testing.T) { }, }, }, - EmailNotifications: &JobEmailNotifications{ + EmailNotifications: &EmailNotifications{ OnStart: []string{}, OnSuccess: []string{}, OnFailure: []string{}, @@ -88,6 +88,82 @@ func TestAwsAccJobsCreate(t *testing.T) { assert.True(t, job.Settings.NewCluster.SparkVersion == newSparkVersion, "Something is wrong with spark version") } +func TestAccJobTasks(t *testing.T) { + acceptance.Test(t, []acceptance.Step{ + { + Template: ` + data "databricks_current_user" "me" {} + data "databricks_spark_version" "latest" {} + data "databricks_node_type" "smallest" { + local_disk = true + } + + resource "databricks_notebook" "this" { + path = "${data.databricks_current_user.me.home}/Terraform{var.RANDOM}" + language = "PYTHON" + content_base64 = base64encode(<<-EOT + # created from ${abspath(path.module)} + display(spark.range(10)) + EOT + ) + } + + resource "databricks_job" "this" { + name = "{var.RANDOM}" + task { + task_key = "a" + + new_cluster { + num_workers = 1 + spark_version = data.databricks_spark_version.latest.id + node_type_id = data.databricks_node_type.smallest.id + } + + notebook_task { + notebook_path = databricks_notebook.this.path + } + } + + task { + task_key = "b" + + depends_on { + task_key = "a" + } + + new_cluster { + num_workers = 8 + spark_version = data.databricks_spark_version.latest.id + node_type_id = data.databricks_node_type.smallest.id + } + + notebook_task { + notebook_path = databricks_notebook.this.path + } + } + + task { + task_key = "c" + + depends_on { + task_key = "b" + } + + new_cluster { + num_workers = 20 + spark_version = data.databricks_spark_version.latest.id + node_type_id = data.databricks_node_type.smallest.id + } + + notebook_task { + notebook_path = databricks_notebook.this.path + } + } + }`, + }, + }) +} + func TestAccJobResource(t *testing.T) { if _, ok := os.LookupEnv("CLOUD_ENV"); !ok { t.Skip("Acceptance tests skipped unless env 'CLOUD_ENV' is set") diff --git a/compute/commands.go b/compute/commands.go index 33d46ea6e9..f6e9f6d630 100644 --- a/compute/commands.go +++ b/compute/commands.go @@ -16,7 +16,7 @@ import ( func NewCommandsAPI(ctx context.Context, m interface{}) CommandsAPI { return CommandsAPI{ client: m.(*common.DatabricksClient), - context: ctx, + context: context.WithValue(ctx, common.Api, common.API_1_2), } } @@ -29,7 +29,9 @@ type CommandsAPI struct { // Execute creates a spark context and executes a command and then closes context // Any leading whitespace is trimmed func (a CommandsAPI) Execute(clusterID, language, commandStr string) common.CommandResults { - cluster, err := NewClustersAPI(a.context, a.client).Get(clusterID) + // this is the place, where API version propagation through context looks strange + ctx := context.WithValue(a.context, common.Api, common.API_2_0) + cluster, err := NewClustersAPI(ctx, a.client).Get(clusterID) if err != nil { return common.CommandResults{ ResultType: "error", @@ -107,7 +109,7 @@ type genericCommandRequest struct { func (a CommandsAPI) createCommand(contextID, clusterID, language, commandStr string) (string, error) { var command Command - err := a.client.OldAPI(a.context, "POST", "/commands/execute", genericCommandRequest{ + err := a.client.Post(a.context, "/commands/execute", genericCommandRequest{ Language: language, ClusterID: clusterID, ContextID: contextID, @@ -118,7 +120,7 @@ func (a CommandsAPI) createCommand(contextID, clusterID, language, commandStr st func (a CommandsAPI) getCommand(commandID, contextID, clusterID string) (Command, error) { var commandResp Command - err := a.client.OldAPI(a.context, "GET", "/commands/status", genericCommandRequest{ + err := a.client.Get(a.context, "/commands/status", genericCommandRequest{ CommandID: commandID, ContextID: contextID, ClusterID: clusterID, @@ -127,7 +129,7 @@ func (a CommandsAPI) getCommand(commandID, contextID, clusterID string) (Command } func (a CommandsAPI) deleteContext(contextID, clusterID string) error { - return a.client.OldAPI(a.context, "POST", "/contexts/destroy", genericCommandRequest{ + return a.client.Post(a.context, "/contexts/destroy", genericCommandRequest{ ContextID: contextID, ClusterID: clusterID, }, nil) @@ -135,7 +137,7 @@ func (a CommandsAPI) deleteContext(contextID, clusterID string) error { func (a CommandsAPI) getContext(contextID, clusterID string) (string, error) { var contextStatus Command // internal hack, yes - err := a.client.OldAPI(a.context, "GET", "/contexts/status", genericCommandRequest{ + err := a.client.Get(a.context, "/contexts/status", genericCommandRequest{ ContextID: contextID, ClusterID: clusterID, }, &contextStatus) @@ -144,7 +146,7 @@ func (a CommandsAPI) getContext(contextID, clusterID string) (string, error) { func (a CommandsAPI) createContext(language, clusterID string) (string, error) { var context Command // internal hack, yes - err := a.client.OldAPI(a.context, "POST", "/contexts/create", genericCommandRequest{ + err := a.client.Post(a.context, "/contexts/create", genericCommandRequest{ Language: language, ClusterID: clusterID, }, &context) diff --git a/compute/model.go b/compute/model.go index e3d9667f4b..1a2b43a22a 100644 --- a/compute/model.go +++ b/compute/model.go @@ -273,16 +273,16 @@ type Cluster struct { NumWorkers int32 `json:"num_workers" tf:"group:size"` Autoscale *AutoScale `json:"autoscale,omitempty" tf:"group:size"` EnableElasticDisk bool `json:"enable_elastic_disk,omitempty" tf:"computed"` - EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty"` + EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty" tf:"computed"` NodeTypeID string `json:"node_type_id,omitempty" tf:"group:node_type,computed"` DriverNodeTypeID string `json:"driver_node_type_id,omitempty" tf:"group:node_type,computed"` InstancePoolID string `json:"instance_pool_id,omitempty" tf:"group:node_type"` DriverInstancePoolID string `json:"driver_instance_pool_id,omitempty" tf:"group:node_type,computed"` PolicyID string `json:"policy_id,omitempty"` - AwsAttributes *AwsAttributes `json:"aws_attributes,omitempty" tf:"conflicts:instance_pool_id"` - AzureAttributes *AzureAttributes `json:"azure_attributes,omitempty" tf:"conflicts:instance_pool_id"` - GcpAttributes *GcpAttributes `json:"gcp_attributes,omitempty" tf:"conflicts:instance_pool_id"` + AwsAttributes *AwsAttributes `json:"aws_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"` + AzureAttributes *AzureAttributes `json:"azure_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"` + GcpAttributes *GcpAttributes `json:"gcp_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"` AutoterminationMinutes int32 `json:"autotermination_minutes,omitempty"` SparkConf map[string]string `json:"spark_conf,omitempty"` @@ -413,8 +413,8 @@ type InstancePool struct { MinIdleInstances int32 `json:"min_idle_instances,omitempty"` MaxCapacity int32 `json:"max_capacity,omitempty"` IdleInstanceAutoTerminationMinutes int32 `json:"idle_instance_autotermination_minutes"` - AwsAttributes *InstancePoolAwsAttributes `json:"aws_attributes,omitempty" tf:"force_new"` - AzureAttributes *InstancePoolAzureAttributes `json:"azure_attributes,omitempty" tf:"force_new"` + AwsAttributes *InstancePoolAwsAttributes `json:"aws_attributes,omitempty" tf:"force_new,suppress_diff"` + AzureAttributes *InstancePoolAzureAttributes `json:"azure_attributes,omitempty" tf:"force_new,suppress_diff"` NodeTypeID string `json:"node_type_id" tf:"force_new"` CustomTags map[string]string `json:"custom_tags,omitempty" tf:"force_new"` EnableElasticDisk bool `json:"enable_elastic_disk,omitempty" tf:"force_new"` @@ -517,8 +517,8 @@ type SparkSubmitTask struct { Parameters []string `json:"parameters,omitempty"` } -// JobEmailNotifications contains the information for email notifications after job completion -type JobEmailNotifications struct { +// EmailNotifications contains the information for email notifications after job completion +type EmailNotifications struct { OnStart []string `json:"on_start,omitempty"` OnSuccess []string `json:"on_success,omitempty"` OnFailure []string `json:"on_failure,omitempty"` @@ -532,27 +532,66 @@ type CronSchedule struct { PauseStatus string `json:"pause_status,omitempty" tf:"computed"` } -// JobSettings contains the information for configuring a job on databricks -type JobSettings struct { - Name string `json:"name,omitempty" tf:"default:Untitled"` +type TaskDependency struct { + TaskKey string `json:"task_key,omitempty"` +} - ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"` - NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"` +type JobTaskSettings struct { + TaskKey string `json:"task_key,omitempty"` + Description string `json:"description,omitempty"` + DependsOn []TaskDependency `json:"depends_on,omitempty"` - NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"` - SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"` - SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"` - SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"` + // TODO: add PipelineTask, PythonWheelTask + ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"` + NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"` + Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"` + NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"` + SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"` + SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"` + SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"` + EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"` + TimeoutSeconds int32 `json:"timeout_seconds,omitempty"` + MaxRetries int32 `json:"max_retries,omitempty"` + MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"` + RetryOnTimeout bool `json:"retry_on_timeout,omitempty" tf:"computed"` +} - Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"` - TimeoutSeconds int32 `json:"timeout_seconds,omitempty"` - MaxRetries int32 `json:"max_retries,omitempty"` - MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"` - RetryOnTimeout bool `json:"retry_on_timeout,omitempty"` - Schedule *CronSchedule `json:"schedule,omitempty"` - MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"` +// JobSettings contains the information for configuring a job on databricks +type JobSettings struct { + Name string `json:"name,omitempty" tf:"default:Untitled"` - EmailNotifications *JobEmailNotifications `json:"email_notifications,omitempty"` + // BEGIN Jobs API 2.0 + ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"` + NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"` + NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"` + SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"` + SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"` + SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"` + Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"` + TimeoutSeconds int32 `json:"timeout_seconds,omitempty"` + MaxRetries int32 `json:"max_retries,omitempty"` + MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"` + RetryOnTimeout bool `json:"retry_on_timeout,omitempty"` + // END Jobs API 2.0 + + // BEGIN Jobs API 2.1 + Tasks []JobTaskSettings `json:"tasks,omitempty" tf:"alias:task"` + Format string `json:"format,omitempty" tf:"computed"` + // END Jobs API 2.1 + + Schedule *CronSchedule `json:"schedule,omitempty"` + MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"` + EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"` +} + +func (js *JobSettings) isMultiTask() bool { + return js.Format == "MULTI_TASK" || len(js.Tasks) > 0 +} + +func (js *JobSettings) sortTasksByKey() { + sort.Slice(js.Tasks, func(i, j int) bool { + return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey + }) } // JobList ... diff --git a/compute/resource_cluster.go b/compute/resource_cluster.go index ae31fa4c40..3d95799007 100644 --- a/compute/resource_cluster.go +++ b/compute/resource_cluster.go @@ -72,10 +72,6 @@ func resourceClusterSchema() map[string]*schema.Schema { s["aws_attributes"].ConflictsWith = []string{"azure_attributes", "gcp_attributes"} s["azure_attributes"].ConflictsWith = []string{"aws_attributes", "gcp_attributes"} s["gcp_attributes"].ConflictsWith = []string{"aws_attributes", "azure_attributes"} - s["aws_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("aws_attributes.#") - s["azure_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("azure_attributes.#") - s["gcp_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("gcp_attributes.#") - s["instance_pool_id"].ConflictsWith = []string{"driver_node_type_id", "node_type_id"} s["driver_instance_pool_id"].ConflictsWith = []string{"driver_node_type_id", "node_type_id"} s["driver_node_type_id"].ConflictsWith = []string{"driver_instance_pool_id", "instance_pool_id"} @@ -115,6 +111,7 @@ func resourceClusterSchema() map[string]*schema.Schema { } func validateClusterDefinition(cluster Cluster) error { + // TODO: rewrite with CustomizeDiff if cluster.NumWorkers > 0 || cluster.Autoscale != nil { return nil } diff --git a/compute/resource_instance_pool.go b/compute/resource_instance_pool.go index 9f3c652ebd..d5753979fa 100644 --- a/compute/resource_instance_pool.go +++ b/compute/resource_instance_pool.go @@ -59,8 +59,6 @@ func ResourceInstancePool() *schema.Resource { s["enable_elastic_disk"].Default = true s["aws_attributes"].ConflictsWith = []string{"azure_attributes"} s["azure_attributes"].ConflictsWith = []string{"aws_attributes"} - s["aws_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("aws_attributes.#") - s["azure_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("azure_attributes.#") if v, err := common.SchemaPath(s, "aws_attributes", "availability"); err == nil { v.Default = AwsAvailabilitySpot v.ValidateFunc = validation.StringInSlice([]string{ diff --git a/compute/resource_job.go b/compute/resource_job.go index 64f3359b15..a9c0387a2d 100644 --- a/compute/resource_job.go +++ b/compute/resource_job.go @@ -17,7 +17,8 @@ import ( // NewJobsAPI creates JobsAPI instance from provider meta func NewJobsAPI(ctx context.Context, m interface{}) JobsAPI { - return JobsAPI{m.(*common.DatabricksClient), ctx} + client := m.(*common.DatabricksClient) + return JobsAPI{client, ctx} } // JobsAPI exposes the Jobs API @@ -129,6 +130,7 @@ func (a JobsAPI) Restart(id string, timeout time.Duration) error { // Create creates a job on the workspace given the job settings func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) { var job Job + jobSettings.sortTasksByKey() err := a.client.Post(a.context, "/jobs/create", jobSettings, &job) return job, err } @@ -154,6 +156,9 @@ func (a JobsAPI) Read(id string) (job Job, err error) { err = wrapMissingJobError(a.client.Get(a.context, "/jobs/get", map[string]int64{ "job_id": jobID, }, &job), id) + if job.Settings != nil { + job.Settings.sortTasksByKey() + } return } @@ -188,39 +193,36 @@ func wrapMissingJobError(err error, id string) error { return err } +func jobSettingsSchema(s *map[string]*schema.Schema, prefix string) { + if p, err := common.SchemaPath(*s, "new_cluster", "num_workers"); err == nil { + p.Optional = true + p.Default = 0 + p.Type = schema.TypeInt + p.ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(0)) + p.Required = false + } + if v, err := common.SchemaPath(*s, "new_cluster", "spark_conf"); err == nil { + reSize := common.MustCompileKeyRE(prefix + "new_cluster.0.spark_conf.%") + reConf := common.MustCompileKeyRE(prefix + "new_cluster.0.spark_conf.spark.databricks.delta.preview.enabled") + v.DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool { + isPossiblyLegacyConfig := reSize.Match([]byte(k)) && old == "1" && new == "0" + isLegacyConfig := reConf.Match([]byte(k)) + if isPossiblyLegacyConfig || isLegacyConfig { + log.Printf("[DEBUG] Suppressing diff for k=%#v old=%#v new=%#v", k, old, new) + return true + } + return false + } + } +} + var jobSchema = common.StructToSchema(JobSettings{}, func(s map[string]*schema.Schema) map[string]*schema.Schema { - if p, err := common.SchemaPath(s, "new_cluster", "num_workers"); err == nil { - p.Optional = true - p.Default = 0 - p.Type = schema.TypeInt - p.ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(0)) - p.Required = false - } + jobSettingsSchema(&s, "") + jobSettingsSchema(&s["task"].Elem.(*schema.Resource).Schema, "task.0.") if p, err := common.SchemaPath(s, "schedule", "pause_status"); err == nil { p.ValidateFunc = validation.StringInSlice([]string{"PAUSED", "UNPAUSED"}, false) } - if v, err := common.SchemaPath(s, "new_cluster", "spark_conf"); err == nil { - v.DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool { - isPossiblyLegacyConfig := k == "new_cluster.0.spark_conf.%" && old == "1" && new == "0" - isLegacyConfig := k == "new_cluster.0.spark_conf.spark.databricks.delta.preview.enabled" - if isPossiblyLegacyConfig || isLegacyConfig { - log.Printf("[DEBUG] Suppressing diff for k=%#v old=%#v new=%#v", k, old, new) - return true - } - return false - } - } - if v, err := common.SchemaPath(s, "new_cluster", "aws_attributes"); err == nil { - v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.aws_attributes.#") - } - if v, err := common.SchemaPath(s, "new_cluster", "azure_attributes"); err == nil { - v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.azure_attributes.#") - } - if v, err := common.SchemaPath(s, "new_cluster", "gcp_attributes"); err == nil { - v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#") - } - s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#") s["max_concurrent_runs"].ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(1)) s["max_concurrent_runs"].Default = 1 s["url"] = &schema.Schema{ @@ -237,6 +239,18 @@ var jobSchema = common.StructToSchema(JobSettings{}, // ResourceJob ... func ResourceJob() *schema.Resource { + getReadCtx := func(ctx context.Context, d *schema.ResourceData) context.Context { + var js JobSettings + err := common.DataToStructPointer(d, jobSchema, &js) + if err != nil { + log.Printf("[INFO] no job resource data available. Returning default context") + return ctx + } + if js.isMultiTask() { + return context.WithValue(ctx, common.Api, common.API_2_1) + } + return ctx + } return common.Resource{ Schema: jobSchema, SchemaVersion: 2, @@ -244,12 +258,31 @@ func ResourceJob() *schema.Resource { Create: schema.DefaultTimeout(DefaultProvisionTimeout), Update: schema.DefaultTimeout(DefaultProvisionTimeout), }, - CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error { + CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, m interface{}) error { + var js JobSettings + err := common.DiffToStructPointer(d, jobSchema, &js) + if err != nil { + return err + } alwaysRunning := d.Get("always_running").(bool) - maxConcurrentRuns := d.Get("max_concurrent_runs").(int) - if alwaysRunning && maxConcurrentRuns > 1 { + if alwaysRunning && js.MaxConcurrentRuns > 1 { return fmt.Errorf("`always_running` must be specified only with `max_concurrent_runs = 1`") } + for _, task := range js.Tasks { + if task.NewCluster == nil { + continue + } + err = validateClusterDefinition(*task.NewCluster) + if err != nil { + return fmt.Errorf("task %s invalid: %w", task.TaskKey, err) + } + } + if js.NewCluster != nil { + err = validateClusterDefinition(*js.NewCluster) + if err != nil { + return fmt.Errorf("invalid job cluster: %w", err) + } + } return nil }, Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { @@ -258,10 +291,8 @@ func ResourceJob() *schema.Resource { if err != nil { return err } - if js.NewCluster != nil { - if err = validateClusterDefinition(*js.NewCluster); err != nil { - return err - } + if js.isMultiTask() { + ctx = context.WithValue(ctx, common.Api, common.API_2_1) } jobsAPI := NewJobsAPI(ctx, c) job, err := jobsAPI.Create(js) @@ -275,6 +306,7 @@ func ResourceJob() *schema.Resource { return nil }, Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + ctx = getReadCtx(ctx, d) job, err := NewJobsAPI(ctx, c).Read(d.Id()) if err != nil { return err @@ -288,11 +320,8 @@ func ResourceJob() *schema.Resource { if err != nil { return err } - if js.NewCluster != nil { - err = validateClusterDefinition(*js.NewCluster) - if err != nil { - return err - } + if js.isMultiTask() { + ctx = context.WithValue(ctx, common.Api, common.API_2_1) } jobsAPI := NewJobsAPI(ctx, c) err = jobsAPI.Update(d.Id(), js) @@ -305,6 +334,7 @@ func ResourceJob() *schema.Resource { return nil }, Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + ctx = getReadCtx(ctx, d) return NewJobsAPI(ctx, c).Delete(d.Id()) }, }.ToResource() diff --git a/compute/resource_job_test.go b/compute/resource_job_test.go index dc5a8acd8f..80c06b6b48 100644 --- a/compute/resource_job_test.go +++ b/compute/resource_job_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/databrickslabs/terraform-provider-databricks/common" - "github.com/databrickslabs/terraform-provider-databricks/qa" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -106,6 +105,106 @@ func TestResourceJobCreate(t *testing.T) { assert.Equal(t, "789", d.Id()) } +func TestResourceJobCreate_MultiTask(t *testing.T) { + d, err := qa.ResourceFixture{ + Fixtures: []qa.HTTPFixture{ + { + Method: "POST", + Resource: "/api/2.1/jobs/create", + ExpectedRequest: JobSettings{ + Name: "Featurizer", + Tasks: []JobTaskSettings{ + { + TaskKey: "a", + ExistingClusterID: "abc", + Libraries: []Library{ + { + Jar: "dbfs://aa/bb/cc.jar", + }, + }, + SparkJarTask: &SparkJarTask{ + MainClassName: "com.labs.BarMain", + }, + }, + { + TaskKey: "b", + NewCluster: &Cluster{ + SparkVersion: "a", + NodeTypeID: "b", + NumWorkers: 1, + AzureAttributes: &AzureAttributes{ + SpotBidMaxPrice: 0.99, + }, + }, + NotebookTask: &NotebookTask{ + NotebookPath: "/Stuff", + }, + }, + }, + MaxConcurrentRuns: 1, + }, + Response: Job{ + JobID: 789, + }, + }, + { + Method: "GET", + Resource: "/api/2.1/jobs/get?job_id=789", + Response: Job{ + // good enough for mock + Settings: &JobSettings{ + Tasks: []JobTaskSettings{ + { + TaskKey: "b", + }, + { + TaskKey: "a", + }, + }, + }, + }, + }, + }, + Create: true, + Resource: ResourceJob(), + HCL: ` + name = "Featurizer" + + task { + task_key = "a" + + existing_cluster_id = "abc" + + spark_jar_task { + main_class_name = "com.labs.BarMain" + } + + library { + jar = "dbfs://aa/bb/cc.jar" + } + } + + task { + task_key = "b" + + new_cluster { + spark_version = "a" + node_type_id = "b" + num_workers = 1 + azure_attributes { + spot_bid_max_price = 0.99 + } + } + + notebook_task { + notebook_path = "/Stuff" + } + }`, + }.Apply(t) + assert.NoError(t, err, err) + assert.Equal(t, "789", d.Id()) +} + func TestResourceJobCreate_AlwaysRunning(t *testing.T) { d, err := qa.ResourceFixture{ Fixtures: []qa.HTTPFixture{ @@ -357,42 +456,6 @@ func TestResourceJobCreateSingleNode_Fail(t *testing.T) { require.Equal(t, true, strings.Contains(err.Error(), "NumWorkers could be 0 only for SingleNode clusters")) } -func TestResourceJobCreate_Error(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/jobs/create", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 400, - }, - }, - Resource: ResourceJob(), - HCL: `existing_cluster_id = "abc" - max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer" - retry_on_timeout = true - - spark_jar_task { - main_class_name = "com.labs.BarMain" - } - library { - jar = "dbfs://aa/bb/cc.jar" - } - library { - jar = "dbfs://ff/gg/hh.jar" - }`, - Create: true, - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "Internal error happened") - assert.Equal(t, "", d.Id(), "Id should be empty for error creates") -} - func TestResourceJobRead(t *testing.T) { d, err := qa.ResourceFixture{ Fixtures: []qa.HTTPFixture{ @@ -576,6 +639,62 @@ func TestResourceJobUpdate(t *testing.T) { assert.Equal(t, "Featurizer New", d.Get("name")) } +func TestResourceJobUpdate_Tasks(t *testing.T) { + qa.ResourceFixture{ + Fixtures: []qa.HTTPFixture{ + { + Method: "POST", + Resource: "/api/2.1/jobs/reset", + ExpectedRequest: UpdateJobRequest{ + JobID: 789, + NewSettings: &JobSettings{ + Name: "Featurizer New", + Tasks: []JobTaskSettings{ + { + ExistingClusterID: "abc", + SparkJarTask: &SparkJarTask{ + MainClassName: "com.labs.BarMain", + }, + }, + }, + MaxConcurrentRuns: 1, + }, + }, + Response: Job{ + JobID: 789, + }, + }, + { + Method: "GET", + Resource: "/api/2.1/jobs/get?job_id=789", + Response: Job{ + Settings: &JobSettings{ + Tasks: []JobTaskSettings{ + { + ExistingClusterID: "abc", + SparkJarTask: &SparkJarTask{ + MainClassName: "com.labs.BarMain", + }, + }, + }, + }, + }, + }, + }, + ID: "789", + Update: true, + Resource: ResourceJob(), + HCL: ` + name = "Featurizer New" + task { + existing_cluster_id = "abc" + spark_jar_task { + main_class_name = "com.labs.BarMain" + } + }`, + }.ApplyNoError(t) +} + func TestResourceJobUpdate_Restart(t *testing.T) { d, err := qa.ResourceFixture{ Fixtures: []qa.HTTPFixture{ @@ -805,44 +924,6 @@ func TestJobRestarts(t *testing.T) { }) } -func TestResourceJobUpdate_Error(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/jobs/reset", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 400, - }, - }, - ID: "789", - Update: true, - Resource: ResourceJob(), - HCL: `existing_cluster_id = "abc" - max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer New" - retry_on_timeout = true - - spark_jar_task { - main_class_name = "com.labs.BarMain" - parameters = ["--cleanup", "full"] - } - library { - jar = "dbfs://aa/bb/cc.jar" - } - library { - jar = "dbfs://ff/gg/hh.jar" - }`, - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "Internal error happened") - assert.Equal(t, "789", d.Id()) -} - func TestResourceJobDelete(t *testing.T) { d, err := qa.ResourceFixture{ Fixtures: []qa.HTTPFixture{ @@ -887,29 +968,29 @@ func TestResourceJobUpdate_FailNumWorkersZero(t *testing.T) { require.Equal(t, true, strings.Contains(err.Error(), "NumWorkers could be 0 only for SingleNode clusters")) } -func TestResourceJobDelete_Error(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/jobs/delete", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", +func TestJobsAPIList(t *testing.T) { + qa.HTTPFixturesApply(t, []qa.HTTPFixture{ + { + Method: "GET", + Resource: "/api/2.0/jobs/list", + Response: JobList{ + Jobs: []Job{ + { + JobID: 1, + }, }, - Status: 400, }, }, - ID: "789", - Delete: true, - Resource: ResourceJob(), - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "Internal error happened") - assert.Equal(t, "789", d.Id()) + }, func(ctx context.Context, client *common.DatabricksClient) { + a := NewJobsAPI(ctx, client) + l, err := a.List() + require.NoError(t, err) + assert.Len(t, l.Jobs, 1) + }) } func TestJobsAPIRunsList(t *testing.T) { - c, s, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{ + qa.HTTPFixturesApply(t, []qa.HTTPFixture{ { Method: "GET", Resource: "/api/2.0/jobs/runs/list?completed_only=true&job_id=234&limit=1", @@ -924,17 +1005,33 @@ func TestJobsAPIRunsList(t *testing.T) { }, }, }, + }, func(ctx context.Context, client *common.DatabricksClient) { + a := NewJobsAPI(ctx, client) + l, err := a.RunsList(JobRunsListRequest{ + JobID: 234, + CompletedOnly: true, + Limit: 1, + Offset: 0, + }) + require.NoError(t, err) + assert.Len(t, l.Runs, 1) }) - require.NoError(t, err) - defer s.Close() - - a := NewJobsAPI(context.Background(), c) - l, err := a.RunsList(JobRunsListRequest{ - JobID: 234, - CompletedOnly: true, - Limit: 1, - Offset: 0, - }) - require.NoError(t, err) - assert.Len(t, l.Runs, 1) +} + +func TestJobResourceCornerCases_HTTP(t *testing.T) { + qa.ResourceCornerCases(t, ResourceJob(), qa.CornerCaseID("10")) +} + +func TestJobResourceCornerCases_WrongID(t *testing.T) { + qa.ResourceCornerCases(t, ResourceJob(), + qa.CornerCaseID("x"), + qa.CornerCaseSkipCRUD("create"), + qa.CornerCaseExpectError(`strconv.ParseInt: parsing "x": invalid syntax`)) +} + +func TestJobResource_SparkConfDiffSuppress(t *testing.T) { + jr := ResourceJob() + scs := common.MustSchemaPath(jr.Schema, "new_cluster", "spark_conf") + assert.True(t, scs.DiffSuppressFunc("new_cluster.0.spark_conf.%", "1", "0", nil)) + assert.False(t, scs.DiffSuppressFunc("new_cluster.0.spark_conf.%", "1", "1", nil)) } diff --git a/docs/resources/job.md b/docs/resources/job.md index ff72fbef4e..dc43622c2a 100644 --- a/docs/resources/job.md +++ b/docs/resources/job.md @@ -47,6 +47,48 @@ output "job_url" { } ``` +## Jobs with Multiple Tasks + +-> **Note** In terraform configuration, you must define tasks in alphabetical order of their `task_key` arguments, so that you get consistent and readable diff. Whenever tasks are added or removed, or `task_key` is renamed, you'll observe a change in the majority of tasks. It's related to the fact that the current version of the provider treats `task` blocks as an ordered list. Alternatively, `task` block could have been an unordered set, though end-users would see the entire block replaced upon a change in single property of the task. + +It is possible to create [jobs with multiple tasks](https://docs.databricks.com/data-engineering/jobs/jobs-user-guide.html) using the `task` blocks: + +```hcl +resource "databricks_job" "this" { + name = "Job with multiple tasks" + + task { + task_key = "a" + + new_cluster { + num_workers = 1 + spark_version = data.databricks_spark_version.latest.id + node_type_id = data.databricks_node_type.smallest.id + } + + notebook_task { + notebook_path = databricks_notebook.this.path + } + } + + task { + task_key = "b" + + depends_on { + task_key = "a" + } + + existing_cluster_id = databricks_cluster.shared.id + + spark_jar_task { + main_class_name = "com.acme.data.Main" + } + } +} +``` + +Every `task` block can have almos all available arguments with the addition of `task_key` attribute and `depends_on` blocks to define cross-task dependencies. + ## Argument Reference The following arguments are required: diff --git a/qa/testing.go b/qa/testing.go index 87db02e38f..7de433b640 100644 --- a/qa/testing.go +++ b/qa/testing.go @@ -265,8 +265,29 @@ func (f ResourceFixture) ExpectError(t *testing.T, msg string) { assert.EqualError(t, err, msg) } +type CornerCase struct { + part string + value string +} + +func CornerCaseID(id string) CornerCase { + return CornerCase{"id", id} +} + +func CornerCaseExpectError(msg string) CornerCase { + return CornerCase{"expect_error", msg} +} + +func CornerCaseSkipCRUD(method string) CornerCase { + return CornerCase{"skip_crud", method} +} + // ResourceCornerCases checks for corner cases of error handling. Optional field name used to create error -func ResourceCornerCases(t *testing.T, resource *schema.Resource, id ...string) { +func ResourceCornerCases(t *testing.T, resource *schema.Resource, cc ...CornerCase) { + config := map[string]string{ + "id": "x", + "expect_error": "I'm a teapot", + } teapot := "I'm a teapot" m := map[string]func(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics{ "create": resource.CreateContext, @@ -274,9 +295,11 @@ func ResourceCornerCases(t *testing.T, resource *schema.Resource, id ...string) "update": resource.UpdateContext, "delete": resource.DeleteContext, } - fakeID := "x" - if len(id) > 0 { - fakeID = id[0] + for _, corner := range cc { + if corner.part == "skip_crud" { + delete(m, corner.value) + } + config[corner.part] = corner.value } HTTPFixturesApply(t, []HTTPFixture{ { @@ -291,14 +314,14 @@ func ResourceCornerCases(t *testing.T, resource *schema.Resource, id ...string) }, }, func(ctx context.Context, client *common.DatabricksClient) { validData := resource.TestResourceData() - validData.SetId(fakeID) + validData.SetId(config["id"]) for n, v := range m { if v == nil { continue } diags := v(ctx, validData, client) if assert.Len(t, diags, 1) { - assert.Equalf(t, diags[0].Summary, teapot, + assert.Equalf(t, diags[0].Summary, config["expect_error"], "%s didn't handle correct error on valid data", n) } } diff --git a/qa/testing_test.go b/qa/testing_test.go index 8df0d01fcb..dfe1e95f57 100644 --- a/qa/testing_test.go +++ b/qa/testing_test.go @@ -281,3 +281,32 @@ func TestDiagsToString(t *testing.T) { }, })) } + +func TestResourceCornerCases(t *testing.T) { + type dummy struct{} + x := map[string]string{ + "foo": "bar", + } + ResourceCornerCases(t, common.Resource{ + Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + var b dummy + return c.Post(ctx, "/dummy", x, &b) + }, + Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + var b dummy + return c.Get(ctx, "/dummy", x, &b) + }, + Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + return c.Put(ctx, "/dummy", x) + }, + Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + return c.Delete(ctx, "/dummy", x) + }, + Schema: map[string]*schema.Schema{ + "foo": { + Type: schema.TypeInt, + Required: true, + }, + }, + }.ToResource(), CornerCaseID("x")) +} diff --git a/sqlanalytics/resource_sql_endpoint.go b/sqlanalytics/resource_sql_endpoint.go index ae4ab1e209..dae1468ed2 100644 --- a/sqlanalytics/resource_sql_endpoint.go +++ b/sqlanalytics/resource_sql_endpoint.go @@ -34,7 +34,7 @@ type SQLEndpoint struct { State string `json:"state,omitempty" tf:"computed"` JdbcURL string `json:"jdbc_url,omitempty" tf:"computed"` OdbcParams *OdbcParams `json:"odbc_params,omitempty" tf:"computed"` - Tags *Tags `json:"tags,omitempty"` + Tags *Tags `json:"tags,omitempty" tf:"suppress_diff"` SpotInstancePolicy string `json:"spot_instance_policy,omitempty"` // The data source ID is not part of the endpoint API response. @@ -191,7 +191,6 @@ func ResourceSQLEndpoint() *schema.Resource { m["num_clusters"].Default = 1 m["spot_instance_policy"].Default = "COST_OPTIMIZED" m["enable_photon"].Default = true - m["tags"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("tags.#") return m }) return common.Resource{ diff --git a/sqlanalytics/resource_visualization_test.go b/sqlanalytics/resource_visualization_test.go index 6be8ea80f5..22f20caf3a 100644 --- a/sqlanalytics/resource_visualization_test.go +++ b/sqlanalytics/resource_visualization_test.go @@ -274,5 +274,5 @@ func TestVisualizationDelete(t *testing.T) { } func TestResourceVisualizationCornerCases(t *testing.T) { - qa.ResourceCornerCases(t, ResourceVisualization(), "foo/bar") + qa.ResourceCornerCases(t, ResourceVisualization(), qa.CornerCaseID("foo/bar")) } diff --git a/sqlanalytics/resource_widget_test.go b/sqlanalytics/resource_widget_test.go index ee5a8980d3..6f10993f47 100644 --- a/sqlanalytics/resource_widget_test.go +++ b/sqlanalytics/resource_widget_test.go @@ -652,5 +652,5 @@ func TestWidgetDelete(t *testing.T) { } func TestResourceWidgetCornerCases(t *testing.T) { - qa.ResourceCornerCases(t, ResourceWidget(), "foo/bar") + qa.ResourceCornerCases(t, ResourceWidget(), qa.CornerCaseID("foo/bar")) }