diff --git a/docs/resources/stream.md b/docs/resources/stream.md index 88440de244..2e193711a9 100644 --- a/docs/resources/stream.md +++ b/docs/resources/stream.md @@ -20,9 +20,10 @@ resource snowflake_stream stream { schema = "schema" name = "stream" - on_table = "table" - append_only = false - insert_only = false + on_external_table = true + on_table = "table" + append_only = false + insert_only = false owner = "role1" } @@ -43,6 +44,7 @@ resource snowflake_stream stream { - **comment** (String) Specifies a comment for the stream. - **id** (String) The ID of this resource. - **insert_only** (Boolean) Create an insert only stream type. +- **on_external_table** (Boolean) Specifies whether the table being monitored is an external table. - **on_table** (String) Name of the table the stream will monitor. - **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed. diff --git a/examples/resources/snowflake_stream/resource.tf b/examples/resources/snowflake_stream/resource.tf index d490d3433a..1530d51f04 100644 --- a/examples/resources/snowflake_stream/resource.tf +++ b/examples/resources/snowflake_stream/resource.tf @@ -5,9 +5,10 @@ resource snowflake_stream stream { schema = "schema" name = "stream" - on_table = "table" - append_only = false - insert_only = false + on_external_table = true + on_table = "table" + append_only = false + insert_only = false owner = "role1" } diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go index f8bc7aaf91..177cfa41f6 100644 --- a/pkg/resources/stream.go +++ b/pkg/resources/stream.go @@ -48,6 +48,12 @@ var streamSchema = map[string]*schema.Schema{ ForceNew: true, Description: "Name of the table the stream will monitor.", }, + "on_external_table": { + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Description: "Specifies whether the table being monitored is an external table.", + }, "append_only": { Type: schema.TypeBool, Optional: true, @@ -175,6 +181,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { schema := d.Get("schema").(string) name := d.Get("name").(string) onTable := d.Get("on_table").(string) + onExternalTable := d.Get("on_external_table").(bool) appendOnly := d.Get("append_only").(bool) insertOnly := d.Get("insert_only").(bool) showInitialRows := d.Get("show_initial_rows").(bool) @@ -186,6 +193,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { return err } + builder.WithExternalTable(onExternalTable) builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName) builder.WithAppendOnly(appendOnly) builder.WithInsertOnly(insertOnly) diff --git a/pkg/resources/stream_acceptance_test.go b/pkg/resources/stream_acceptance_test.go index d271c377bf..896e5f5f0a 100644 --- a/pkg/resources/stream_acceptance_test.go +++ b/pkg/resources/stream_acceptance_test.go @@ -11,6 +11,8 @@ import ( func TestAcc_Stream(t *testing.T) { accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + // Ref: https://github.com/chanzuckerberg/terraform-provider-snowflake/pull/661#discussion_r696067813 + accNameExternalTable := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ Providers: providers(), @@ -40,6 +42,19 @@ func TestAcc_Stream(t *testing.T) { checkBool("snowflake_stream.test_stream", "show_initial_rows", false), ), }, + { + Config: externalTableStreamConfig(accNameExternalTable), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accNameExternalTable, accNameExternalTable, "STREAM_ON_EXTERNAL_TABLE")), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"), + checkBool("snowflake_stream.test_stream", "on_external_table", true), + checkBool("snowflake_stream.test_stream", "append_only", false), + checkBool("snowflake_stream.test_stream", "show_initial_rows", false), + ), + }, }, }) } @@ -90,3 +105,67 @@ resource "snowflake_stream" "test_stream" { ` return fmt.Sprintf(s, name, name, name, append_only_config) } + +func externalTableStreamConfig(name string) string { + // Refer to external_table_acceptance_test.go for the original source on + // external table resources and dependents (modified slightly here). + locations := []string{"s3://com.example.bucket/prefix"} + s := ` +resource "snowflake_database" "test" { + name = "%v" + comment = "Terraform acceptance test" +} + +resource "snowflake_schema" "test" { + name = "%v" + database = snowflake_database.test.name + comment = "Terraform acceptance test" +} + +resource "snowflake_stage" "test" { + name = "%v" + url = "s3://com.example.bucket/prefix" + database = snowflake_database.test.name + schema = snowflake_schema.test.name + comment = "Terraform acceptance test" + storage_integration = snowflake_storage_integration.external_table_stream_integration.name +} + +resource "snowflake_storage_integration" "external_table_stream_integration" { + name = "%v" + storage_allowed_locations = %q + storage_provider = "S3" + storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test" +} + +resource "snowflake_external_table" "test_external_stream_table" { + database = snowflake_database.test.name + schema = snowflake_schema.test.name + name = "STREAM_ON_EXTERNAL_TABLE" + comment = "Terraform acceptance test" + column { + name = "column1" + type = "STRING" + as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')" + } + column { + name = "column2" + type = "TIMESTAMP_NTZ(9)" + as = "($1:'CreatedDate'::timestamp)" + } + file_format = "TYPE = CSV" + location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}" +} + +resource "snowflake_stream" "test_external_table_stream" { + database = snowflake_database.test.name + schema = snowflake_schema.test.name + name = "%s" + comment = "Terraform acceptance test" + on_external_table = true + on_table = "${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_external_table.test_external_stream_table.name}" +} +` + + return fmt.Sprintf(s, name, name, name, name, locations, name) +} diff --git a/pkg/resources/stream_test.go b/pkg/resources/stream_test.go index 7a532049fc..621f8d45fd 100644 --- a/pkg/resources/stream_test.go +++ b/pkg/resources/stream_test.go @@ -42,6 +42,31 @@ func TestStreamCreate(t *testing.T) { }) } +func TestStreamCreateOnExternalTable(t *testing.T) { + r := require.New(t) + + in := map[string]interface{}{ + "name": "stream_name", + "database": "database_name", + "schema": "schema_name", + "comment": "great comment", + "on_table": "target_db.target_schema.target_table", + "on_external_table": true, + "append_only": true, + "insert_only": false, + "show_initial_rows": true, + } + d := stream(t, "database_name|schema_name|stream_name", in) + + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON EXTERNAL TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1)) + expectStreamRead(mock) + err := resources.CreateStream(d, db) + r.NoError(err) + r.Equal("stream_name", d.Get("name").(string)) + }) +} + func expectStreamRead(mock sqlmock.Sqlmock) { rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY") mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN DATABASE "database_name"`).WillReturnRows(rows) diff --git a/pkg/snowflake/stream.go b/pkg/snowflake/stream.go index 429d8de473..f20d03b599 100644 --- a/pkg/snowflake/stream.go +++ b/pkg/snowflake/stream.go @@ -15,6 +15,7 @@ type StreamBuilder struct { name string db string schema string + externalTable bool onTable string appendOnly bool insertOnly bool @@ -53,6 +54,11 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde return sb } +func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder { + sb.externalTable = b + return sb +} + func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder { sb.appendOnly = b return sb @@ -90,7 +96,13 @@ func (sb *StreamBuilder) Create() string { q := strings.Builder{} q.WriteString(fmt.Sprintf(`CREATE STREAM %v`, sb.QualifiedName())) - q.WriteString(fmt.Sprintf(` ON TABLE %v`, sb.onTable)) + q.WriteString(` ON`) + + if sb.externalTable { + q.WriteString(` EXTERNAL`) + } + + q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable)) if sb.comment != "" { q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment))) diff --git a/pkg/snowflake/stream_test.go b/pkg/snowflake/stream_test.go index b8a5ad86c2..db61199aff 100644 --- a/pkg/snowflake/stream_test.go +++ b/pkg/snowflake/stream_test.go @@ -24,6 +24,9 @@ func TestStreamCreate(t *testing.T) { s.WithInsertOnly(true) r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`) + + s.WithExternalTable(true) + r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON EXTERNAL TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`) } func TestStreamChangeComment(t *testing.T) {