Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for databricks_pipeline & make test-preview runnable #673

Merged
merged 6 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
* Added support for preloading of Docker images into instance pools ([#663](https://github.com/databrickslabs/terraform-provider-databricks/issues/663))
* Added the `databricks_user` data source ([#648](https://github.com/databrickslabs/terraform-provider-databricks/pull/648))
* Fixed support for `spot_instance_policy` in SQLA Endpoints ([#665](https://github.com/databrickslabs/terraform-provider-databricks/issues/665))
* Added documentation for `databricks_pipeline` resource ([#673](https://github.com/databrickslabs/terraform-provider-databricks/pull/673))
* Made preview environment tests to run on a release basis

Updated dependency versions:

* Bump github.com/zclconf/go-cty from 1.8.2 to 1.8.3
* Bump github.com/aws/aws-sdk-go from 1.38.30 to 1.38.51

## 0.3.4

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
| [databricks_notebook](docs/data-sources/notebook.md) data
| [databricks_notebook_paths](docs/data-sources/notebook_paths.md) data
| [databricks_permissions](docs/resources/permissions.md)
| [databricks_pipeline](docs/resources/pipeline.md)
| [databricks_secret](docs/resources/secret.md)
| [databricks_secret_acl](docs/resources/secret_acl.md)
| [databricks_secret_scope](docs/resources/secret_scope.md)
Expand Down
11 changes: 11 additions & 0 deletions common/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,14 @@ func (r Resource) ToResource() *schema.Resource {
Timeouts: r.Timeouts,
}
}

func MakeEmptyBlockSuppressFunc(name string) func(k, old, new string, d *schema.ResourceData) bool {
return func(k, old, new string, d *schema.ResourceData) bool {
log.Printf("[DEBUG] k='%v', old='%v', new='%v'", k, old, new)
if k == name && old == "1" && new == "0" {
log.Printf("[DEBUG] Disable removal of empty block")
return true
}
return false
}
}
59 changes: 50 additions & 9 deletions compute/acceptance/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,58 @@ func TestPreviewAccPipelineResource_CreatePipeline(t *testing.T) {
locals {
name = "pipeline-acceptance-{var.RANDOM}"
}
resource "databricks_notebook" "this" {
content_base64 = base64encode(<<-EOT
CREATE LIVE TABLE clickstream_raw AS
SELECT * FROM json.` + "`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`" + `

-- COMMAND ----------

CREATE LIVE TABLE clickstream_clean(
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL),
CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE
) TBLPROPERTIES ("quality" = "silver")
AS SELECT
CAST (curr_id AS INT) AS current_page_id,
curr_title AS current_page_title,
CAST(n AS INT) AS click_count,
CAST (prev_id AS INT) AS previous_page_id,
prev_title AS previous_page_title
FROM live.clickstream_raw

-- COMMAND ----------

CREATE LIVE TABLE top_spark_referers TBLPROPERTIES ("quality" = "gold")
AS SELECT
previous_page_title as referrer,
click_count
FROM live.clickstream_clean
WHERE current_page_title = 'Apache_Spark'
ORDER BY click_count DESC
LIMIT 10
EOT
)
path = "/Shared/${local.name}"
language = "SQL"
}

resource "databricks_pipeline" "this" {
name = locals.name
storage = "/test/${locals.name}"
name = local.name
storage = "/test/${local.name}"

configuration = {
key1 = "value1"
key2 = "value2"
}
clusters {

library {
notebook {
path = databricks_notebook.this.path
}
}

cluster {
instance_pool_id = "{var.COMMON_INSTANCE_POOL_ID}"
label = "default"
num_workers = 2
custom_tags = {
Expand All @@ -29,22 +73,19 @@ func TestPreviewAccPipelineResource_CreatePipeline(t *testing.T) {
}

cluster {
instance_pool_id = "{var.COMMON_INSTANCE_POOL_ID}"
label = "maintenance"
num_workers = 1
custom_tags = {
cluster_type = "maintenance
cluster_type = "maintenance"
}
}

library {
maven {
coordinates = "com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.7"
}
}
filters {
include = ["com.databricks.include"]
exclude = ["com.databricks.exclude"]
}

continuous = false
}
`,
Expand Down
6 changes: 3 additions & 3 deletions compute/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func resourceClusterSchema() map[string]*schema.Schema {
s["aws_attributes"].ConflictsWith = []string{"azure_attributes", "gcp_attributes"}
s["azure_attributes"].ConflictsWith = []string{"aws_attributes", "gcp_attributes"}
s["gcp_attributes"].ConflictsWith = []string{"aws_attributes", "azure_attributes"}
s["aws_attributes"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("aws_attributes.#")
s["azure_attributes"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("azure_attributes.#")
s["gcp_attributes"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("gcp_attributes.#")
s["aws_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("aws_attributes.#")
s["azure_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("azure_attributes.#")
s["gcp_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("gcp_attributes.#")

s["is_pinned"] = &schema.Schema{
Type: schema.TypeBool,
Expand Down
16 changes: 2 additions & 14 deletions compute/resource_instance_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package compute

import (
"context"
"log"

"github.com/databrickslabs/terraform-provider-databricks/common"

Expand Down Expand Up @@ -54,17 +53,6 @@ func (a InstancePoolsAPI) Delete(instancePoolID string) error {
}, nil)
}

func makeEmptyBlockSuppressFunc(name string) func(k, old, new string, d *schema.ResourceData) bool {
return func(k, old, new string, d *schema.ResourceData) bool {
log.Printf("[DEBUG] k='%v', old='%v', new='%v'", k, old, new)
if k == name && old == "1" && new == "0" {
log.Printf("[DEBUG] Disable removal of empty block")
return true
}
return false
}
}

// ResourceInstancePool ...
func ResourceInstancePool() *schema.Resource {
s := common.StructToSchema(InstancePool{}, func(s map[string]*schema.Schema) map[string]*schema.Schema {
Expand All @@ -79,8 +67,8 @@ func ResourceInstancePool() *schema.Resource {
s["enable_elastic_disk"].Default = true
s["aws_attributes"].ConflictsWith = []string{"azure_attributes"}
s["azure_attributes"].ConflictsWith = []string{"aws_attributes"}
s["aws_attributes"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("aws_attributes.#")
s["azure_attributes"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("azure_attributes.#")
s["aws_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("aws_attributes.#")
s["azure_attributes"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("azure_attributes.#")
if v, err := common.SchemaPath(s, "aws_attributes", "availability"); err == nil {
v.ForceNew = true
v.Default = AwsAvailabilitySpot
Expand Down
8 changes: 4 additions & 4 deletions compute/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,16 @@ var jobSchema = common.StructToSchema(JobSettings{},
}

if v, err := common.SchemaPath(s, "new_cluster", "aws_attributes"); err == nil {
v.DiffSuppressFunc = makeEmptyBlockSuppressFunc("new_cluster.0.aws_attributes.#")
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.aws_attributes.#")
}
if v, err := common.SchemaPath(s, "new_cluster", "azure_attributes"); err == nil {
v.DiffSuppressFunc = makeEmptyBlockSuppressFunc("new_cluster.0.azure_attributes.#")
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.azure_attributes.#")
}
if v, err := common.SchemaPath(s, "new_cluster", "gcp_attributes"); err == nil {
v.DiffSuppressFunc = makeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#")
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#")
}

s["email_notifications"].DiffSuppressFunc = makeEmptyBlockSuppressFunc("email_notifications.#")
s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#")

s["name"].Description = "An optional name for the job. The default value is Untitled."
s["library"].Description = "An optional list of libraries to be installed on " +
Expand Down
15 changes: 11 additions & 4 deletions compute/resource_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type pipelineSpec struct {
Name string `json:"name,omitempty"`
Storage string `json:"storage,omitempty"`
Configuration map[string]string `json:"configuration,omitempty"`
Clusters []pipelineCluster `json:"clusters,omitempty"`
Libraries []pipelineLibrary `json:"libraries,omitempty"`
Clusters []pipelineCluster `json:"clusters,omitempty" tf:"slice_set,alias:cluster"`
Libraries []pipelineLibrary `json:"libraries,omitempty" tf:"slice_set,alias:library"`
Filters *filters `json:"filters"`
Continuous bool `json:"continuous,omitempty"`
AllowDuplicateNames bool `json:"allow_duplicate_names,omitempty"`
Expand Down Expand Up @@ -183,14 +183,18 @@ func (a pipelinesAPI) waitForState(id string, timeout time.Duration, desiredStat
if state == StateFailed {
return resource.NonRetryableError(fmt.Errorf("pipeline %s has failed", id))
}
if i.Spec.Continuous {
// continuous pipelines just need a non-FAILED check
return nil
}
message := fmt.Sprintf("Pipeline %s is in state %s, not yet in state %s", id, state, desiredState)
log.Printf("[DEBUG] %s", message)
return resource.RetryableError(fmt.Errorf(message))
})
}

func adjustPipelineResourceSchema(m map[string]*schema.Schema) map[string]*schema.Schema {
clusters, _ := m["clusters"].Elem.(*schema.Resource)
clusters, _ := m["cluster"].Elem.(*schema.Resource)
clustersSchema := clusters.Schema
clustersSchema["spark_conf"].DiffSuppressFunc = sparkConfDiffSuppressFunc

Expand All @@ -203,7 +207,7 @@ func adjustPipelineResourceSchema(m map[string]*schema.Schema) map[string]*schem
delete(awsAttributesSchema, "ebs_volume_count")
delete(awsAttributesSchema, "ebs_volume_size")

m["libraries"].MinItems = 1
m["library"].MinItems = 1

return m
}
Expand Down Expand Up @@ -232,6 +236,9 @@ func ResourcePipeline() *schema.Resource {
if err != nil {
return err
}
if i.Spec == nil {
return fmt.Errorf("pipeline spec is nil for '%v'", i.PipelineID)
}
return common.StructToData(*i.Spec, pipelineSchema, d)
},
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
Expand Down
25 changes: 11 additions & 14 deletions compute/resource_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ func TestResourcePipelineCreate(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.0/pipelines",
ExpectedRequest: basicPipelineSpec,
Method: "POST",
Resource: "/api/2.0/pipelines",
Response: createPipelineResponse{
PipelineID: "abcd",
},
Expand Down Expand Up @@ -90,16 +89,16 @@ func TestResourcePipelineCreate(t *testing.T) {
key1 = "value1"
key2 = "value2"
}
clusters {
cluster {
label = "default"
custom_tags = {
"cluster_tag1" = "cluster_value1"
}
}
libraries {
library {
jar = "dbfs:/pipelines/code/abcde.jar"
}
libraries {
library {
maven {
coordinates = "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18"
}
Expand Down Expand Up @@ -131,7 +130,7 @@ func TestResourcePipelineCreate_Error(t *testing.T) {
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
jar = "jar"
}
filters {
Expand Down Expand Up @@ -180,7 +179,7 @@ func TestResourcePipelineCreate_ErrorWhenWaitingFailedCleanup(t *testing.T) {
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
jar = "jar"
}
filters {
Expand Down Expand Up @@ -229,7 +228,7 @@ func TestResourcePipelineCreate_ErrorWhenWaitingSuccessfulCleanup(t *testing.T)
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
jar = "jar"
}
filters {
Expand Down Expand Up @@ -263,8 +262,6 @@ func TestResourcePipelineRead(t *testing.T) {
assert.Equal(t, "abcd", d.Id(), "Id should not be empty")
assert.Equal(t, "/test/storage", d.Get("storage"))
assert.Equal(t, "value1", d.Get("configuration.key1"))
assert.Equal(t, "cluster_value1", d.Get("clusters.0.custom_tags.cluster_tag1"))
assert.Equal(t, "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18", d.Get("libraries.1.maven.0.coordinates"))
assert.Equal(t, "com.databricks.include", d.Get("filters.0.include.0"))
assert.Equal(t, false, d.Get("continuous"))
}
Expand Down Expand Up @@ -356,7 +353,7 @@ func TestResourcePipelineUpdate(t *testing.T) {
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
maven {
coordinates = "coordinates"
}
Expand Down Expand Up @@ -387,7 +384,7 @@ func TestResourcePipelineUpdate_Error(t *testing.T) {
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
maven {
coordinates = "coordinates"
}
Expand Down Expand Up @@ -439,7 +436,7 @@ func TestResourcePipelineUpdate_FailsAfterUpdate(t *testing.T) {
Resource: ResourcePipeline(),
HCL: `name = "test"
storage = "/test/storage"
libraries {
library {
maven {
coordinates = "coordinates"
}
Expand Down
Loading