diff --git a/docs/resources/stream.md b/docs/resources/stream.md index 70edbb486b..c32d9b11ca 100644 --- a/docs/resources/stream.md +++ b/docs/resources/stream.md @@ -13,7 +13,7 @@ description: |- ## Example Usage ```terraform -resource snowflake_stream stream { +resource "snowflake_stream" "stream" { comment = "A stream." database = "db" diff --git a/examples/resources/snowflake_stream/resource.tf b/examples/resources/snowflake_stream/resource.tf index d490d3433a..3e579e3773 100644 --- a/examples/resources/snowflake_stream/resource.tf +++ b/examples/resources/snowflake_stream/resource.tf @@ -1,4 +1,4 @@ -resource snowflake_stream stream { +resource "snowflake_stream" "stream" { comment = "A stream." database = "db" diff --git a/pkg/resources/external_table_acceptance_test.go b/pkg/resources/external_table_acceptance_test.go index afd302134d..8e2e2ef529 100644 --- a/pkg/resources/external_table_acceptance_test.go +++ b/pkg/resources/external_table_acceptance_test.go @@ -70,12 +70,12 @@ resource "snowflake_external_table" "test_table" { column { name = "column1" type = "STRING" - as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')" + as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')" } column { name = "column2" type = "TIMESTAMP_NTZ(9)" - as = "($1:'CreatedDate'::timestamp)" + as = "($1:\"CreatedDate\"::timestamp)" } file_format = "TYPE = CSV" location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}" diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go index 9a635d8a94..db96c49832 100644 --- a/pkg/resources/stream.go +++ b/pkg/resources/stream.go @@ -186,7 +186,16 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { return err } - builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName) + tq := snowflake.Table(resultOnTable.OnTableName, resultOnTable.DatabaseName, resultOnTable.SchemaName).Show() + tableRow := snowflake.QueryRow(db, tq) + + t, err := snowflake.ScanTable(tableRow) + if err != nil { + return err + } + + builder.WithExternalTable(t.IsExternal.String == "Y") + builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String) builder.WithAppendOnly(appendOnly) builder.WithInsertOnly(insertOnly) builder.WithShowInitialRows(showInitialRows) diff --git a/pkg/resources/stream_acceptance_test.go b/pkg/resources/stream_acceptance_test.go index 2a335d0474..78aba2df46 100644 --- a/pkg/resources/stream_acceptance_test.go +++ b/pkg/resources/stream_acceptance_test.go @@ -2,6 +2,7 @@ package resources_test import ( "fmt" + "os" "strings" "testing" @@ -10,7 +11,11 @@ import ( ) func TestAcc_Stream(t *testing.T) { + if _, ok := os.LookupEnv("SKIP_EXTERNAL_TABLE_TESTS"); ok { + t.Skip("Skipping TestAccStream") + } accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + accNameExternalTable := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ Providers: providers(), @@ -41,6 +46,18 @@ func TestAcc_Stream(t *testing.T) { checkBool("snowflake_stream.test_stream", "show_initial_rows", false), ), }, + { + Config: externalTableStreamConfig(accNameExternalTable), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accNameExternalTable), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accNameExternalTable, accNameExternalTable, "STREAM_ON_EXTERNAL_TABLE")), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"), + checkBool("snowflake_stream.test_stream", "append_only", false), + checkBool("snowflake_stream.test_stream", "show_initial_rows", false), + ), + }, { ResourceName: "snowflake_stream.test_stream", ImportState: true, @@ -96,3 +113,61 @@ resource "snowflake_stream" "test_stream" { ` return fmt.Sprintf(s, name, name, name, append_only_config) } + +func externalTableStreamConfig(name string) string { + // Refer to external_table_acceptance_test.go for the original source on + // external table resources and dependents (modified slightly here). + locations := []string{"s3://com.example.bucket/prefix"} + s := ` +resource "snowflake_database" "test" { + name = "%v" + comment = "Terraform acceptance test" +} +resource "snowflake_schema" "test" { + name = "%v" + database = snowflake_database.test.name + comment = "Terraform acceptance test" +} +resource "snowflake_stage" "test" { + name = "%v" + url = "s3://com.example.bucket/prefix" + database = snowflake_database.test.name + schema = snowflake_schema.test.name + comment = "Terraform acceptance test" + storage_integration = snowflake_storage_integration.external_table_stream_integration.name +} +resource "snowflake_storage_integration" "external_table_stream_integration" { + name = "%v" + storage_allowed_locations = %q + storage_provider = "S3" + storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test" +} +resource "snowflake_external_table" "test_external_stream_table" { + database = snowflake_database.test.name + schema = snowflake_schema.test.name + name = "STREAM_ON_EXTERNAL_TABLE" + comment = "Terraform acceptance test" + column { + name = "column1" + type = "STRING" + as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')" + } + column { + name = "column2" + type = "TIMESTAMP_NTZ(9)" + as = "($1:\"CreatedDate\"::timestamp)" + } + file_format = "TYPE = CSV" + location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}" +} +resource "snowflake_stream" "test_external_table_stream" { + database = snowflake_database.test.name + schema = snowflake_schema.test.name + name = "%s" + comment = "Terraform acceptance test" + on_table = "${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_external_table.test_external_stream_table.name}" +} +` + + return fmt.Sprintf(s, name, name, name, name, locations, name) +} diff --git a/pkg/resources/stream_test.go b/pkg/resources/stream_test.go index 0964971690..903d17cdf4 100644 --- a/pkg/resources/stream_test.go +++ b/pkg/resources/stream_test.go @@ -36,6 +36,32 @@ func TestStreamCreate(t *testing.T) { WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1)) expectStreamRead(mock) + expectOnTableRead(mock) + err := resources.CreateStream(d, db) + r.NoError(err) + r.Equal("stream_name", d.Get("name").(string)) + }) +} + +func TestStreamCreateOnExternalTable(t *testing.T) { + r := require.New(t) + + in := map[string]interface{}{ + "name": "stream_name", + "database": "database_name", + "schema": "schema_name", + "comment": "great comment", + "on_table": "target_db.target_schema.target_table", + "append_only": true, + "insert_only": false, + "show_initial_rows": true, + } + d := stream(t, "database_name|schema_name|stream_name", in) + + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON EXTERNAL TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1)) + expectStreamRead(mock) + expectOnExternalTableRead(mock) err := resources.CreateStream(d, db) r.NoError(err) r.Equal("stream_name", d.Get("name").(string)) @@ -47,6 +73,16 @@ func expectStreamRead(mock sqlmock.Sqlmock) { mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN SCHEMA "database_name"."schema_name"`).WillReturnRows(rows) } +func expectOnTableRead(mock sqlmock.Sqlmock) { + rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_table", "target_db", "target_schema", "TABLE", "mock comment", "", "", "", "", 1, "OFF", "OFF", "N") + mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows) +} + +func expectOnExternalTableRead(mock sqlmock.Sqlmock) { + rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_table", "target_db", "target_schema", "TABLE", "mock comment", "", "", "", "", 1, "OFF", "OFF", "Y") + mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows) +} + func TestStreamRead(t *testing.T) { r := require.New(t) diff --git a/pkg/snowflake/stream.go b/pkg/snowflake/stream.go index dc48cde236..644973b0c3 100644 --- a/pkg/snowflake/stream.go +++ b/pkg/snowflake/stream.go @@ -15,6 +15,7 @@ type StreamBuilder struct { name string db string schema string + externalTable bool onTable string appendOnly bool insertOnly bool @@ -53,6 +54,11 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde return sb } +func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder { + sb.externalTable = b + return sb +} + func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder { sb.appendOnly = b return sb @@ -90,7 +96,13 @@ func (sb *StreamBuilder) Create() string { q := strings.Builder{} q.WriteString(fmt.Sprintf(`CREATE STREAM %v`, sb.QualifiedName())) - q.WriteString(fmt.Sprintf(` ON TABLE %v`, sb.onTable)) + q.WriteString(` ON`) + + if sb.externalTable { + q.WriteString(` EXTERNAL`) + } + + q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable)) if sb.comment != "" { q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment))) diff --git a/pkg/snowflake/stream_test.go b/pkg/snowflake/stream_test.go index fa4d54c053..412e6bcef7 100644 --- a/pkg/snowflake/stream_test.go +++ b/pkg/snowflake/stream_test.go @@ -24,6 +24,9 @@ func TestStreamCreate(t *testing.T) { s.WithInsertOnly(true) r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`) + + s.WithExternalTable(true) + r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON EXTERNAL TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`) } func TestStreamChangeComment(t *testing.T) {