From d837341c2d18b6fbb4657ad3a1837190a8ee77d8 Mon Sep 17 00:00:00 2001 From: Jakub Michalak Date: Thu, 10 Oct 2024 15:23:02 +0200 Subject: [PATCH] feat: Stream on external table resource (#3122) - add snowflake_stream_on_external_table resource - adjust copy grants documentation - adjust external table helper client - move copy grants handling to `resource_helpers_create.go` - move baseModel to `ext` - move common stream code to `stream_common.go` - fix using empty columns in views recreation ## Test Plan * [x] acceptance tests ## References https://docs.snowflake.com/en/sql-reference/sql/create-stream #3073 ## TODO - add remaining resources (stage, view) - rework data source --- MIGRATION_GUIDE.md | 7 +- docs/resources/stream_on_external_table.md | 164 +++++ docs/resources/stream_on_table.md | 4 +- .../import.sh | 1 + .../resource.tf | 45 ++ .../resourceassert/gen/resource_schema_def.go | 4 + .../stream_on_external_table_resource_gen.go | 137 ++++ .../stream_on_external_table_model_ext.go | 9 + .../stream_on_external_table_model_gen.go | 160 +++++ .../config/model/stream_on_table_model_ext.go | 9 + pkg/acceptance/check_destroy.go | 3 + .../helpers/external_table_client.go | 11 +- pkg/provider/provider.go | 1 + pkg/provider/resources/resources.go | 1 + pkg/resources/resource_helpers_create.go | 10 + pkg/resources/stream_common.go | 172 +++-- pkg/resources/stream_on_external_table.go | 223 ++++++ ...tream_on_external_table_acceptance_test.go | 654 ++++++++++++++++++ pkg/resources/stream_on_table.go | 149 ++-- .../stream_on_table_acceptance_test.go | 33 +- .../TestAcc_StreamOnExternalTable/at/test.tf | 18 + .../at/variables.tf | 31 + .../before/test.tf | 18 + .../before/variables.tf | 31 + pkg/resources/view.go | 5 +- pkg/resources/view_acceptance_test.go | 57 ++ pkg/sdk/streams_gen.go | 4 + .../testint/streams_gen_integration_test.go | 2 +- .../stream_on_external_table.md.tmpl | 37 + templates/resources/stream_on_table.md.tmpl | 2 + 30 files changed, 1822 insertions(+), 180 deletions(-) create mode 100644 docs/resources/stream_on_external_table.md create mode 100644 examples/resources/snowflake_stream_on_external_table/import.sh create mode 100644 examples/resources/snowflake_stream_on_external_table/resource.tf create mode 100644 pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go create mode 100644 pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_ext.go create mode 100644 pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_gen.go create mode 100644 pkg/acceptance/bettertestspoc/config/model/stream_on_table_model_ext.go create mode 100644 pkg/resources/stream_on_external_table.go create mode 100644 pkg/resources/stream_on_external_table_acceptance_test.go create mode 100644 pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/test.tf create mode 100644 pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/variables.tf create mode 100644 pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/test.tf create mode 100644 pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/variables.tf create mode 100644 templates/resources/stream_on_external_table.md.tmpl diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index 460c40bf92..48b7946ad3 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -9,12 +9,15 @@ across different versions. ## v0.96.0 ➞ v0.97.0 -### *(new feature)* snowflake_stream_on_table resource +### *(new feature)* snowflake_stream_on_table, snowflake_stream_on_external_table resource -To enhance clarity and functionality, the new resource `snowflake_stream_on_table` has been introduced to replace the previous `snowflake_stream`. Recognizing that the old resource carried multiple responsibilities within a single entity, we opted to divide it into more specialized resources. +To enhance clarity and functionality, the new resources `snowflake_stream_on_table` and `snowflake_stream_on_external_table` have been introduced to replace the previous `snowflake_stream`. Recognizing that the old resource carried multiple responsibilities within a single entity, we opted to divide it into more specialized resources. The newly introduced resources are aligned with the latest Snowflake documentation at the time of implementation, and adhere to our [new conventions](#general-changes). This segregation was based on the object on which the stream is created. The mapping between SQL statements and the resources is the following: - `ON TABLE ` -> `snowflake_stream_on_table` +- `ON EXTERNAL TABLE ` -> `snowflake_stream_on_external_table` (this was previously not supported) + +The resources for streams on directory tables and streams on views will be implemented in the future releases. To use the new `stream_on_table`, change the old `stream` from ```terraform diff --git a/docs/resources/stream_on_external_table.md b/docs/resources/stream_on_external_table.md new file mode 100644 index 0000000000..2899aa7a8e --- /dev/null +++ b/docs/resources/stream_on_external_table.md @@ -0,0 +1,164 @@ +--- +page_title: "snowflake_stream_on_external_table Resource - terraform-provider-snowflake" +subcategory: "" +description: |- + Resource used to manage streams on external tables. For more information, check stream documentation https://docs.snowflake.com/en/sql-reference/sql/create-stream. +--- + +!> **V1 release candidate** This resource was reworked and is a release candidate for the V1. We do not expect significant changes in it before the V1. We will welcome any feedback and adjust the resource if needed. Any errors reported will be resolved with a higher priority. We encourage checking this resource out before the V1 release. Please follow the [migration guide](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/MIGRATION_GUIDE.md#v0960--v0970) to use it. + +!> **Note about copy_grants** Fields like `external_table`, `insert_only`, `at`, `before` can not be ALTERed on Snowflake side (check [docs](https://docs.snowflake.com/en/sql-reference/sql/alter-stream)), and a change means recreation of the resource. ForceNew can not be used because it does not preserve grants from `copy_grants`. Beware that even though a change is marked as update, the resource is recreated. + +# snowflake_stream_on_external_table (Resource) + +Resource used to manage streams on external tables. For more information, check [stream documentation](https://docs.snowflake.com/en/sql-reference/sql/create-stream). + +## Example Usage + +```terraform +resource "snowflake_external_table" "external_table" { + database = "db" + schema = "schema" + name = "external_table" + comment = "External table" + file_format = "TYPE = CSV FIELD_DELIMITER = '|'" + location = "@stage/directory/" + + column { + name = "id" + type = "int" + } + + column { + name = "data" + type = "text" + } +} + +# basic resource +resource "snowflake_stream_on_external_table" "stream" { + name = "stream" + schema = "schema" + database = "database" + + external_table = snowflake_external_table.external_table.fully_qualified_name +} + + +# resource with additional fields +resource "snowflake_stream_on_external_table" "stream" { + name = "stream" + schema = "schema" + database = "database" + + copy_grants = true + external_table = snowflake_external_table.external_table.fully_qualified_name + insert_only = "true" + + at { + statement = "8e5d0ca9-005e-44e6-b858-a8f5b37c5726" + } + + comment = "A stream." +} +``` +-> **Note** Instead of using fully_qualified_name, you can reference objects managed outside Terraform by constructing a correct ID, consult [identifiers guide](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/guides/identifiers#new-computed-fully-qualified-name-field-in-resources). + + + +## Schema + +### Required + +- `database` (String) The database in which to create the stream. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `external_table` (String) Specifies an identifier for the external table the stream will monitor. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `name` (String) Specifies the identifier for the stream; must be unique for the database and schema in which the stream is created. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `schema` (String) The schema in which to create the stream. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` + +### Optional + +- `at` (Block List, Max: 1) This field specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--at)) +- `before` (Block List, Max: 1) This field specifies that the request refers to a point immediately preceding the specified parameter. This point in time is just before the statement, identified by its query ID, is completed. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--before)) +- `comment` (String) Specifies a comment for the stream. +- `copy_grants` (Boolean) Retains the access permissions from the original stream when a stream is recreated using the OR REPLACE clause. That is sometimes used when the provider detects changes for fields that can not be changed by ALTER. This value will not have any effect when creating a new stream. +- `insert_only` (String) Specifies whether this is an insert-only stream. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value. + +### Read-Only + +- `describe_output` (List of Object) Outputs the result of `DESCRIBE STREAM` for the given stream. (see [below for nested schema](#nestedatt--describe_output)) +- `fully_qualified_name` (String) Fully qualified name of the resource. For more information, see [object name resolution](https://docs.snowflake.com/en/sql-reference/name-resolution). +- `id` (String) The ID of this resource. +- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output)) + + +### Nested Schema for `at` + +Optional: + +- `offset` (String) Specifies the difference in seconds from the current time to use for Time Travel, in the form -N where N can be an integer or arithmetic expression (e.g. -120 is 120 seconds, -30*60 is 1800 seconds or 30 minutes). +- `statement` (String) Specifies the query ID of a statement to use as the reference point for Time Travel. This parameter supports any statement of one of the following types: DML (e.g. INSERT, UPDATE, DELETE), TCL (BEGIN, COMMIT transaction), SELECT. +- `stream` (String) Specifies the identifier (i.e. name) for an existing stream on the queried table or view. The current offset in the stream is used as the AT point in time for returning change data for the source object. +- `timestamp` (String) Specifies an exact date and time to use for Time Travel. The value must be explicitly cast to a TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_NTZ, or TIMESTAMP_TZ data type. + + + +### Nested Schema for `before` + +Optional: + +- `offset` (String) Specifies the difference in seconds from the current time to use for Time Travel, in the form -N where N can be an integer or arithmetic expression (e.g. -120 is 120 seconds, -30*60 is 1800 seconds or 30 minutes). +- `statement` (String) Specifies the query ID of a statement to use as the reference point for Time Travel. This parameter supports any statement of one of the following types: DML (e.g. INSERT, UPDATE, DELETE), TCL (BEGIN, COMMIT transaction), SELECT. +- `stream` (String) Specifies the identifier (i.e. name) for an existing stream on the queried table or view. The current offset in the stream is used as the AT point in time for returning change data for the source object. +- `timestamp` (String) Specifies an exact date and time to use for Time Travel. The value must be explicitly cast to a TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_NTZ, or TIMESTAMP_TZ data type. + + + +### Nested Schema for `describe_output` + +Read-Only: + +- `base_tables` (List of String) +- `comment` (String) +- `created_on` (String) +- `database_name` (String) +- `invalid_reason` (String) +- `mode` (String) +- `name` (String) +- `owner` (String) +- `owner_role_type` (String) +- `schema_name` (String) +- `source_type` (String) +- `stale` (String) +- `stale_after` (String) +- `table_name` (String) +- `type` (String) + + + +### Nested Schema for `show_output` + +Read-Only: + +- `base_tables` (List of String) +- `comment` (String) +- `created_on` (String) +- `database_name` (String) +- `invalid_reason` (String) +- `mode` (String) +- `name` (String) +- `owner` (String) +- `owner_role_type` (String) +- `schema_name` (String) +- `source_type` (String) +- `stale` (String) +- `stale_after` (String) +- `table_name` (String) +- `type` (String) + +## Import + +Import is supported using the following syntax: + +```shell +terraform import snowflake_stream_on_external_table.example '""."".""' +``` diff --git a/docs/resources/stream_on_table.md b/docs/resources/stream_on_table.md index 6de9fe8563..3f999386b8 100644 --- a/docs/resources/stream_on_table.md +++ b/docs/resources/stream_on_table.md @@ -7,6 +7,8 @@ description: |- !> **V1 release candidate** This resource was reworked and is a release candidate for the V1. We do not expect significant changes in it before the V1. We will welcome any feedback and adjust the resource if needed. Any errors reported will be resolved with a higher priority. We encourage checking this resource out before the V1 release. Please follow the [migration guide](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/MIGRATION_GUIDE.md#v0960--v0970) to use it. +!> **Note about copy_grants** Fields like `table`, `append_only`, `at`, `before`, `show_initial_rows` can not be ALTERed on Snowflake side (check [docs](https://docs.snowflake.com/en/sql-reference/sql/alter-stream)), and a change means recreation of the resource. ForceNew can not be used because it does not preserve grants from `copy_grants`. Beware that even though a change is marked as update, the resource is recreated. + # snowflake_stream_on_table (Resource) Resource used to manage streams on tables. For more information, check [stream documentation](https://docs.snowflake.com/en/sql-reference/sql/create-stream). @@ -63,7 +65,7 @@ resource "snowflake_stream_on_table" "stream" { - `at` (Block List, Max: 1) This field specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--at)) - `before` (Block List, Max: 1) This field specifies that the request refers to a point immediately preceding the specified parameter. This point in time is just before the statement, identified by its query ID, is completed. Due to Snowflake limitations, the provider does not detect external changes on this field. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". (see [below for nested schema](#nestedblock--before)) - `comment` (String) Specifies a comment for the stream. -- `copy_grants` (Boolean) Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause. Use only if the resource is already managed by Terraform. Otherwise, this field is skipped. +- `copy_grants` (Boolean) Retains the access permissions from the original stream when a stream is recreated using the OR REPLACE clause. That is sometimes used when the provider detects changes for fields that can not be changed by ALTER. This value will not have any effect when creating a new stream. - `show_initial_rows` (String) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value. External changes for this field won't be detected. In case you want to apply external changes, you can re-create the resource manually using "terraform taint". ### Read-Only diff --git a/examples/resources/snowflake_stream_on_external_table/import.sh b/examples/resources/snowflake_stream_on_external_table/import.sh new file mode 100644 index 0000000000..0d04d91e8f --- /dev/null +++ b/examples/resources/snowflake_stream_on_external_table/import.sh @@ -0,0 +1 @@ +terraform import snowflake_stream_on_external_table.example '""."".""' diff --git a/examples/resources/snowflake_stream_on_external_table/resource.tf b/examples/resources/snowflake_stream_on_external_table/resource.tf new file mode 100644 index 0000000000..964cb0f342 --- /dev/null +++ b/examples/resources/snowflake_stream_on_external_table/resource.tf @@ -0,0 +1,45 @@ +resource "snowflake_external_table" "external_table" { + database = "db" + schema = "schema" + name = "external_table" + comment = "External table" + file_format = "TYPE = CSV FIELD_DELIMITER = '|'" + location = "@stage/directory/" + + column { + name = "id" + type = "int" + } + + column { + name = "data" + type = "text" + } +} + +# basic resource +resource "snowflake_stream_on_external_table" "stream" { + name = "stream" + schema = "schema" + database = "database" + + external_table = snowflake_external_table.external_table.fully_qualified_name +} + + +# resource with additional fields +resource "snowflake_stream_on_external_table" "stream" { + name = "stream" + schema = "schema" + database = "database" + + copy_grants = true + external_table = snowflake_external_table.external_table.fully_qualified_name + insert_only = "true" + + at { + statement = "8e5d0ca9-005e-44e6-b858-a8f5b37c5726" + } + + comment = "A stream." +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/gen/resource_schema_def.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/gen/resource_schema_def.go index abeb9018b8..4f618ddfa5 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/gen/resource_schema_def.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/gen/resource_schema_def.go @@ -61,4 +61,8 @@ var allResourceSchemaDefs = []ResourceSchemaDef{ name: "StreamOnTable", schema: resources.StreamOnTable().Schema, }, + { + name: "StreamOnExternalTable", + schema: resources.StreamOnExternalTable().Schema, + }, } diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go new file mode 100644 index 0000000000..f8e6f4f749 --- /dev/null +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go @@ -0,0 +1,137 @@ +// Code generated by assertions generator; DO NOT EDIT. + +package resourceassert + +import ( + "testing" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" +) + +type StreamOnExternalTableResourceAssert struct { + *assert.ResourceAssert +} + +func StreamOnExternalTableResource(t *testing.T, name string) *StreamOnExternalTableResourceAssert { + t.Helper() + + return &StreamOnExternalTableResourceAssert{ + ResourceAssert: assert.NewResourceAssert(name, "resource"), + } +} + +func ImportedStreamOnExternalTableResource(t *testing.T, id string) *StreamOnExternalTableResourceAssert { + t.Helper() + + return &StreamOnExternalTableResourceAssert{ + ResourceAssert: assert.NewImportedResourceAssert(id, "imported resource"), + } +} + +/////////////////////////////////// +// Attribute value string checks // +/////////////////////////////////// + +func (s *StreamOnExternalTableResourceAssert) HasAtString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("at", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasBeforeString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("before", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasCommentString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("comment", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasCopyGrantsString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("copy_grants", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasDatabaseString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("database", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasExternalTableString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("external_table", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasFullyQualifiedNameString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("fully_qualified_name", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasInsertOnlyString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("insert_only", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNameString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("name", expected)) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasSchemaString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("schema", expected)) + return s +} + +//////////////////////////// +// Attribute empty checks // +//////////////////////////// + +func (s *StreamOnExternalTableResourceAssert) HasNoAt() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("at")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoBefore() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("before")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoComment() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("comment")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoCopyGrants() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("copy_grants")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoDatabase() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("database")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoExternalTable() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("external_table")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoFullyQualifiedName() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("fully_qualified_name")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoInsertOnly() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("insert_only")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoName() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("name")) + return s +} + +func (s *StreamOnExternalTableResourceAssert) HasNoSchema() *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueNotSet("schema")) + return s +} diff --git a/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_ext.go b/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_ext.go new file mode 100644 index 0000000000..91255a9cde --- /dev/null +++ b/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_ext.go @@ -0,0 +1,9 @@ +package model + +import ( + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +func StreamOnExternalTableBase(resourceName string, id, externalTableId sdk.SchemaObjectIdentifier) *StreamOnExternalTableModel { + return StreamOnExternalTable(resourceName, id.DatabaseName(), externalTableId.FullyQualifiedName(), id.Name(), id.SchemaName()).WithInsertOnly("true") +} diff --git a/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_gen.go b/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_gen.go new file mode 100644 index 0000000000..09c87c5e23 --- /dev/null +++ b/pkg/acceptance/bettertestspoc/config/model/stream_on_external_table_model_gen.go @@ -0,0 +1,160 @@ +// Code generated by config model builder generator; DO NOT EDIT. + +package model + +import ( + tfconfig "github.com/hashicorp/terraform-plugin-testing/config" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider/resources" +) + +type StreamOnExternalTableModel struct { + At tfconfig.Variable `json:"at,omitempty"` + Before tfconfig.Variable `json:"before,omitempty"` + Comment tfconfig.Variable `json:"comment,omitempty"` + CopyGrants tfconfig.Variable `json:"copy_grants,omitempty"` + Database tfconfig.Variable `json:"database,omitempty"` + ExternalTable tfconfig.Variable `json:"external_table,omitempty"` + FullyQualifiedName tfconfig.Variable `json:"fully_qualified_name,omitempty"` + InsertOnly tfconfig.Variable `json:"insert_only,omitempty"` + Name tfconfig.Variable `json:"name,omitempty"` + Schema tfconfig.Variable `json:"schema,omitempty"` + + *config.ResourceModelMeta +} + +///////////////////////////////////////////////// +// Basic builders (resource name and required) // +///////////////////////////////////////////////// + +func StreamOnExternalTable( + resourceName string, + database string, + externalTable string, + name string, + schema string, +) *StreamOnExternalTableModel { + s := &StreamOnExternalTableModel{ResourceModelMeta: config.Meta(resourceName, resources.StreamOnExternalTable)} + s.WithDatabase(database) + s.WithExternalTable(externalTable) + s.WithName(name) + s.WithSchema(schema) + return s +} + +func StreamOnExternalTableWithDefaultMeta( + database string, + externalTable string, + name string, + schema string, +) *StreamOnExternalTableModel { + s := &StreamOnExternalTableModel{ResourceModelMeta: config.DefaultMeta(resources.StreamOnExternalTable)} + s.WithDatabase(database) + s.WithExternalTable(externalTable) + s.WithName(name) + s.WithSchema(schema) + return s +} + +///////////////////////////////// +// below all the proper values // +///////////////////////////////// + +// at attribute type is not yet supported, so WithAt can't be generated + +// before attribute type is not yet supported, so WithBefore can't be generated + +func (s *StreamOnExternalTableModel) WithComment(comment string) *StreamOnExternalTableModel { + s.Comment = tfconfig.StringVariable(comment) + return s +} + +func (s *StreamOnExternalTableModel) WithCopyGrants(copyGrants bool) *StreamOnExternalTableModel { + s.CopyGrants = tfconfig.BoolVariable(copyGrants) + return s +} + +func (s *StreamOnExternalTableModel) WithDatabase(database string) *StreamOnExternalTableModel { + s.Database = tfconfig.StringVariable(database) + return s +} + +func (s *StreamOnExternalTableModel) WithExternalTable(externalTable string) *StreamOnExternalTableModel { + s.ExternalTable = tfconfig.StringVariable(externalTable) + return s +} + +func (s *StreamOnExternalTableModel) WithFullyQualifiedName(fullyQualifiedName string) *StreamOnExternalTableModel { + s.FullyQualifiedName = tfconfig.StringVariable(fullyQualifiedName) + return s +} + +func (s *StreamOnExternalTableModel) WithInsertOnly(insertOnly string) *StreamOnExternalTableModel { + s.InsertOnly = tfconfig.StringVariable(insertOnly) + return s +} + +func (s *StreamOnExternalTableModel) WithName(name string) *StreamOnExternalTableModel { + s.Name = tfconfig.StringVariable(name) + return s +} + +func (s *StreamOnExternalTableModel) WithSchema(schema string) *StreamOnExternalTableModel { + s.Schema = tfconfig.StringVariable(schema) + return s +} + +////////////////////////////////////////// +// below it's possible to set any value // +////////////////////////////////////////// + +func (s *StreamOnExternalTableModel) WithAtValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.At = value + return s +} + +func (s *StreamOnExternalTableModel) WithBeforeValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.Before = value + return s +} + +func (s *StreamOnExternalTableModel) WithCommentValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.Comment = value + return s +} + +func (s *StreamOnExternalTableModel) WithCopyGrantsValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.CopyGrants = value + return s +} + +func (s *StreamOnExternalTableModel) WithDatabaseValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.Database = value + return s +} + +func (s *StreamOnExternalTableModel) WithExternalTableValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.ExternalTable = value + return s +} + +func (s *StreamOnExternalTableModel) WithFullyQualifiedNameValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.FullyQualifiedName = value + return s +} + +func (s *StreamOnExternalTableModel) WithInsertOnlyValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.InsertOnly = value + return s +} + +func (s *StreamOnExternalTableModel) WithNameValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.Name = value + return s +} + +func (s *StreamOnExternalTableModel) WithSchemaValue(value tfconfig.Variable) *StreamOnExternalTableModel { + s.Schema = value + return s +} diff --git a/pkg/acceptance/bettertestspoc/config/model/stream_on_table_model_ext.go b/pkg/acceptance/bettertestspoc/config/model/stream_on_table_model_ext.go new file mode 100644 index 0000000000..b28d8b0364 --- /dev/null +++ b/pkg/acceptance/bettertestspoc/config/model/stream_on_table_model_ext.go @@ -0,0 +1,9 @@ +package model + +import ( + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +func StreamOnTableBase(resourceName string, id, tableId sdk.SchemaObjectIdentifier) *StreamOnTableModel { + return StreamOnTable(resourceName, id.DatabaseName(), id.Name(), id.SchemaName(), tableId.FullyQualifiedName()) +} diff --git a/pkg/acceptance/check_destroy.go b/pkg/acceptance/check_destroy.go index fcde35e9bf..2ee96796d1 100644 --- a/pkg/acceptance/check_destroy.go +++ b/pkg/acceptance/check_destroy.go @@ -200,6 +200,9 @@ var showByIdFunctions = map[resources.Resource]showByIdFunc{ resources.Stream: func(ctx context.Context, client *sdk.Client, id sdk.ObjectIdentifier) error { return runShowById(ctx, id, client.Streams.ShowByID) }, + resources.StreamOnExternalTable: func(ctx context.Context, client *sdk.Client, id sdk.ObjectIdentifier) error { + return runShowById(ctx, id, client.Streams.ShowByID) + }, resources.StreamOnTable: func(ctx context.Context, client *sdk.Client, id sdk.ObjectIdentifier) error { return runShowById(ctx, id, client.Streams.ShowByID) }, diff --git a/pkg/acceptance/helpers/external_table_client.go b/pkg/acceptance/helpers/external_table_client.go index 3766a360d5..8c5396f014 100644 --- a/pkg/acceptance/helpers/external_table_client.go +++ b/pkg/acceptance/helpers/external_table_client.go @@ -33,7 +33,16 @@ func (c *ExternalTableClient) PublishDataToStage(t *testing.T, stageId sdk.Schem require.NoError(t, err) } -func (c *ExternalTableClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateExternalTableRequest) (*sdk.ExternalTable, func()) { +func (c *ExternalTableClient) CreateWithLocation(t *testing.T, location string) (*sdk.ExternalTable, func()) { + t.Helper() + + externalTableId := c.ids.RandomSchemaObjectIdentifier() + req := sdk.NewCreateExternalTableRequest(externalTableId, location).WithFileFormat(*sdk.NewExternalTableFileFormatRequest().WithFileFormatType(sdk.ExternalTableFileFormatTypeJSON)).WithColumns([]*sdk.ExternalTableColumnRequest{sdk.NewExternalTableColumnRequest("id", sdk.DataTypeNumber, "value:time::int")}) + + return c.CreateWithRequest(t, req) +} + +func (c *ExternalTableClient) CreateWithRequest(t *testing.T, req *sdk.CreateExternalTableRequest) (*sdk.ExternalTable, func()) { t.Helper() ctx := context.Background() diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 2c3abf9ce4..6e96b66904 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -480,6 +480,7 @@ func getResources() map[string]*schema.Resource { "snowflake_stage": resources.Stage(), "snowflake_storage_integration": resources.StorageIntegration(), "snowflake_stream": resources.Stream(), + "snowflake_stream_on_external_table": resources.StreamOnExternalTable(), "snowflake_stream_on_table": resources.StreamOnTable(), "snowflake_streamlit": resources.Streamlit(), "snowflake_table": resources.Table(), diff --git a/pkg/provider/resources/resources.go b/pkg/provider/resources/resources.go index bf13a42ba9..59f0bea661 100644 --- a/pkg/provider/resources/resources.go +++ b/pkg/provider/resources/resources.go @@ -48,6 +48,7 @@ const ( Stage resource = "snowflake_stage" StorageIntegration resource = "snowflake_storage_integration" Stream resource = "snowflake_stream" + StreamOnExternalTable resource = "snowflake_stream_on_external_table" StreamOnTable resource = "snowflake_stream_on_table" Streamlit resource = "snowflake_streamlit" Table resource = "snowflake_table" diff --git a/pkg/resources/resource_helpers_create.go b/pkg/resources/resource_helpers_create.go index c97d694998..e1d12cbc17 100644 --- a/pkg/resources/resource_helpers_create.go +++ b/pkg/resources/resource_helpers_create.go @@ -61,3 +61,13 @@ func attributeDirectValueCreate[T any](d *schema.ResourceData, key string, creat } return nil } + +func copyGrantsAttributeCreate(d *schema.ResourceData, isOrReplace bool, orReplaceField, copyGrantsField **bool) error { + if isOrReplace { + *orReplaceField = sdk.Bool(true) + if d.Get("copy_grants").(bool) { + *copyGrantsField = sdk.Bool(true) + } + } + return nil +} diff --git a/pkg/resources/stream_common.go b/pkg/resources/stream_common.go index 52dedf2a6b..93257bfe93 100644 --- a/pkg/resources/stream_common.go +++ b/pkg/resources/stream_common.go @@ -5,58 +5,65 @@ import ( "fmt" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/schemas" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) -func handleStreamTimeTravel(d *schema.ResourceData) *sdk.OnStreamRequest { - if v := d.Get(AtAttributeName).([]any); len(v) > 0 { - return sdk.NewOnStreamRequest().WithAt(true).WithStatement(handleStreamTimeTravelStatement(v[0].(map[string]any))) - } - if v := d.Get(BeforeAttributeName).([]any); len(v) > 0 { - return sdk.NewOnStreamRequest().WithBefore(true).WithStatement(handleStreamTimeTravelStatement(v[0].(map[string]any))) - } - return nil -} - -func handleStreamTimeTravelStatement(timeTravelConfig map[string]any) sdk.OnStreamStatementRequest { - statement := sdk.OnStreamStatementRequest{} - if v := timeTravelConfig["timestamp"].(string); len(v) > 0 { - statement.WithTimestamp(v) - } - if v := timeTravelConfig["offset"].(string); len(v) > 0 { - statement.WithOffset(v) - } - if v := timeTravelConfig["statement"].(string); len(v) > 0 { - statement.WithStatement(v) - } - if v := timeTravelConfig["stream"].(string); len(v) > 0 { - statement.WithStream(v) - } - return statement -} - -func DeleteStreamContext(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { - client := meta.(*provider.Context).Client - id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) - if err != nil { - return diag.FromErr(err) - } - - err = client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id).WithIfExists(true)) - if err != nil { - return diag.Diagnostics{ - diag.Diagnostic{ - Severity: diag.Error, - Summary: "Error deleting stream", - Detail: fmt.Sprintf("id %v err = %v", id.Name(), err), - }, - } - } - - d.SetId("") - return nil +var streamCommonSchema = map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: blocklistedCharactersFieldDescription("Specifies the identifier for the stream; must be unique for the database and schema in which the stream is created."), + DiffSuppressFunc: suppressIdentifierQuoting, + }, + "schema": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: blocklistedCharactersFieldDescription("The schema in which to create the stream."), + DiffSuppressFunc: suppressIdentifierQuoting, + }, + "database": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: blocklistedCharactersFieldDescription("The database in which to create the stream."), + DiffSuppressFunc: suppressIdentifierQuoting, + }, + "copy_grants": { + Type: schema.TypeBool, + Optional: true, + Default: false, + Description: "Retains the access permissions from the original stream when a stream is recreated using the OR REPLACE clause. That is sometimes used when the provider detects changes for fields that can not be changed by ALTER. This value will not have any effect when creating a new stream.", + DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool { + return oldValue != "" && oldValue != newValue + }, + }, + "comment": { + Type: schema.TypeString, + Optional: true, + Description: "Specifies a comment for the stream.", + }, + ShowOutputAttributeName: { + Type: schema.TypeList, + Computed: true, + Description: "Outputs the result of `SHOW STREAMS` for the given stream.", + Elem: &schema.Resource{ + Schema: schemas.ShowStreamSchema, + }, + }, + DescribeOutputAttributeName: { + Type: schema.TypeList, + Computed: true, + Description: "Outputs the result of `DESCRIBE STREAM` for the given stream.", + Elem: &schema.Resource{ + Schema: schemas.DescribeStreamSchema, + }, + }, + FullyQualifiedNameAttributeName: schemas.FullyQualifiedNameSchema, } var atSchema = &schema.Schema{ @@ -134,3 +141,74 @@ var beforeSchema = &schema.Schema{ }, ConflictsWith: []string{"at"}, } + +func handleStreamTimeTravel(d *schema.ResourceData) *sdk.OnStreamRequest { + if v := d.Get(AtAttributeName).([]any); len(v) > 0 { + return sdk.NewOnStreamRequest().WithAt(true).WithStatement(handleStreamTimeTravelStatement(v[0].(map[string]any))) + } + if v := d.Get(BeforeAttributeName).([]any); len(v) > 0 { + return sdk.NewOnStreamRequest().WithBefore(true).WithStatement(handleStreamTimeTravelStatement(v[0].(map[string]any))) + } + return nil +} + +func handleStreamTimeTravelStatement(timeTravelConfig map[string]any) sdk.OnStreamStatementRequest { + statement := sdk.OnStreamStatementRequest{} + if v := timeTravelConfig["timestamp"].(string); len(v) > 0 { + statement.WithTimestamp(v) + } + if v := timeTravelConfig["offset"].(string); len(v) > 0 { + statement.WithOffset(v) + } + if v := timeTravelConfig["statement"].(string); len(v) > 0 { + statement.WithStatement(v) + } + if v := timeTravelConfig["stream"].(string); len(v) > 0 { + statement.WithStream(v) + } + return statement +} + +func DeleteStreamContext(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + client := meta.(*provider.Context).Client + id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) + if err != nil { + return diag.FromErr(err) + } + + err = client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id).WithIfExists(true)) + if err != nil { + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Error deleting stream", + Detail: fmt.Sprintf("id %v err = %v", id.Name(), err), + }, + } + } + + d.SetId("") + return nil +} + +func handleStreamRead(d *schema.ResourceData, + id sdk.SchemaObjectIdentifier, + stream *sdk.Stream, + streamDescription *sdk.Stream, +) error { + if err := d.Set("comment", stream.Comment); err != nil { + return err + } + + if err := d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}); err != nil { + return err + } + if err := d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}); err != nil { + return err + } + if err := d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()); err != nil { + return err + } + + return nil +} diff --git a/pkg/resources/stream_on_external_table.go b/pkg/resources/stream_on_external_table.go new file mode 100644 index 0000000000..e43a9a7c15 --- /dev/null +++ b/pkg/resources/stream_on_external_table.go @@ -0,0 +1,223 @@ +package resources + +import ( + "context" + "errors" + "fmt" + "log" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +var streamOnExternalTableSchema = func() map[string]*schema.Schema { + streamOnExternalTable := map[string]*schema.Schema{ + "external_table": { + Type: schema.TypeString, + Required: true, + Description: blocklistedCharactersFieldDescription("Specifies an identifier for the external table the stream will monitor."), + DiffSuppressFunc: SuppressIfAny(suppressIdentifierQuoting, IgnoreChangeToCurrentSnowflakeValueInShow("table_name")), + ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](), + }, + "insert_only": { + Type: schema.TypeString, + Optional: true, + Default: BooleanDefault, + ValidateDiagFunc: validateBooleanString, + DiffSuppressFunc: IgnoreChangeToCurrentSnowflakeValueInShowWithMapping("mode", func(x any) any { + return x.(string) == string(sdk.StreamModeInsertOnly) + }), + Description: booleanStringFieldDescription("Specifies whether this is an insert-only stream."), + }, + AtAttributeName: atSchema, + BeforeAttributeName: beforeSchema, + } + return helpers.MergeMaps(streamCommonSchema, streamOnExternalTable) +}() + +func StreamOnExternalTable() *schema.Resource { + return &schema.Resource{ + CreateContext: CreateStreamOnExternalTable(false), + ReadContext: ReadStreamOnExternalTable(true), + UpdateContext: UpdateStreamOnExternalTable, + DeleteContext: DeleteStreamContext, + Description: "Resource used to manage streams on external tables. For more information, check [stream documentation](https://docs.snowflake.com/en/sql-reference/sql/create-stream).", + + CustomizeDiff: customdiff.All( + ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"), + ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"), + ), + + Schema: streamOnExternalTableSchema, + + Importer: &schema.ResourceImporter{ + StateContext: ImportStreamOnExternalTable, + }, + } +} + +func ImportStreamOnExternalTable(ctx context.Context, d *schema.ResourceData, meta any) ([]*schema.ResourceData, error) { + log.Printf("[DEBUG] Starting stream import") + client := meta.(*provider.Context).Client + id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) + if err != nil { + return nil, err + } + + v, err := client.Streams.ShowByID(ctx, id) + if err != nil { + return nil, err + } + if err := d.Set("name", id.Name()); err != nil { + return nil, err + } + if err := d.Set("database", id.DatabaseName()); err != nil { + return nil, err + } + if err := d.Set("schema", id.SchemaName()); err != nil { + return nil, err + } + if err := d.Set("insert_only", booleanStringFromBool(v.IsInsertOnly())); err != nil { + return nil, err + } + return []*schema.ResourceData{d}, nil +} + +func CreateStreamOnExternalTable(orReplace bool) schema.CreateContextFunc { + return func(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + client := meta.(*provider.Context).Client + databaseName := d.Get("database").(string) + schemaName := d.Get("schema").(string) + name := d.Get("name").(string) + id := sdk.NewSchemaObjectIdentifier(databaseName, schemaName, name) + + externalTableIdRaw := d.Get("external_table").(string) + externalTableId, err := sdk.ParseSchemaObjectIdentifier(externalTableIdRaw) + if err != nil { + return diag.FromErr(err) + } + + req := sdk.NewCreateOnExternalTableStreamRequest(id, externalTableId) + + errs := errors.Join( + copyGrantsAttributeCreate(d, orReplace, &req.OrReplace, &req.CopyGrants), + booleanStringAttributeCreate(d, "insert_only", &req.InsertOnly), + stringAttributeCreate(d, "comment", &req.Comment), + ) + if errs != nil { + return diag.FromErr(errs) + } + + streamTimeTravelReq := handleStreamTimeTravel(d) + if streamTimeTravelReq != nil { + req.WithOn(*streamTimeTravelReq) + } + + err = client.Streams.CreateOnExternalTable(ctx, req) + if err != nil { + return diag.FromErr(err) + } + d.SetId(helpers.EncodeResourceIdentifier(id)) + + return ReadStreamOnExternalTable(false)(ctx, d, meta) + } +} + +func ReadStreamOnExternalTable(withExternalChangesMarking bool) schema.ReadContextFunc { + return func(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + client := meta.(*provider.Context).Client + id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) + if err != nil { + return diag.FromErr(err) + } + stream, err := client.Streams.ShowByID(ctx, id) + if err != nil { + if errors.Is(err, sdk.ErrObjectNotFound) { + d.SetId("") + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Warning, + Summary: "Failed to query stream. Marking the resource as removed.", + Detail: fmt.Sprintf("stream name: %s, Err: %s", id.FullyQualifiedName(), err), + }, + } + } + return diag.FromErr(err) + } + externalTableId, err := sdk.ParseSchemaObjectIdentifier(*stream.TableName) + if err != nil { + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Error, + Summary: "Failed to parse table ID in Read.", + Detail: fmt.Sprintf("stream name: %s, Err: %s", id.FullyQualifiedName(), err), + }, + } + } + if err := d.Set("external_table", externalTableId.FullyQualifiedName()); err != nil { + return diag.FromErr(err) + } + streamDescription, err := client.Streams.Describe(ctx, id) + if err != nil { + return diag.FromErr(err) + } + if err := handleStreamRead(d, id, stream, streamDescription); err != nil { + return diag.FromErr(err) + } + if withExternalChangesMarking { + var mode sdk.StreamMode + if stream.Mode != nil { + mode = *stream.Mode + } + if err = handleExternalChangesToObjectInShow(d, + showMapping{"mode", "insert_only", string(mode), booleanStringFromBool(stream.IsInsertOnly()), nil}, + ); err != nil { + return diag.FromErr(err) + } + } + + if err = setStateToValuesFromConfig(d, streamOnExternalTableSchema, []string{ + "insert_only", + }); err != nil { + return diag.FromErr(err) + } + + return nil + } +} + +func UpdateStreamOnExternalTable(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + client := meta.(*provider.Context).Client + id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) + if err != nil { + return diag.FromErr(err) + } + + // change on these fields can not be ForceNew because then the object is dropped explicitly and copying grants does not have effect + if keys := changedKeys(d, "external_table", "insert_only", "at", "before"); len(keys) > 0 { + log.Printf("[DEBUG] Detected change on %q, recreating...", keys) + return CreateStreamOnExternalTable(true)(ctx, d, meta) + } + + if d.HasChange("comment") { + comment := d.Get("comment").(string) + if comment == "" { + err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithUnsetComment(true)) + if err != nil { + return diag.FromErr(err) + } + } else { + err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithSetComment(comment)) + if err != nil { + return diag.FromErr(err) + } + } + } + + return ReadStreamOnExternalTable(false)(ctx, d, meta) +} diff --git a/pkg/resources/stream_on_external_table_acceptance_test.go b/pkg/resources/stream_on_external_table_acceptance_test.go new file mode 100644 index 0000000000..2611ffb69f --- /dev/null +++ b/pkg/resources/stream_on_external_table_acceptance_test.go @@ -0,0 +1,654 @@ +package resources_test + +import ( + "fmt" + "regexp" + "testing" + + acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceassert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config" + tfconfig "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config/model" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/testenvs" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/snowflakeroles" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider/resources" + r "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" + pluginconfig "github.com/hashicorp/terraform-plugin-testing/config" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/plancheck" + "github.com/hashicorp/terraform-plugin-testing/tfversion" +) + +func TestAcc_StreamOnExternalTable_Basic(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + resourceId := helpers.EncodeResourceIdentifier(id) + resourceName := "snowflake_stream_on_external_table.test" + + stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName()) + _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID) + t.Cleanup(stageCleanup) + + externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation) + t.Cleanup(externalTableCleanup) + + var createdOn string + + baseModel := model.StreamOnExternalTableBase("test", id, externalTable.ID()) + + modelWithExtraFields := model.StreamOnExternalTableBase("test", id, externalTable.ID()). + WithCopyGrants(true). + WithComment("foo"). + WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + })) + + modelWithExtraFieldsModified := model.StreamOnExternalTableBase("test", id, externalTable.ID()). + WithCopyGrants(true). + WithComment("bar"). + WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + })) + + modelWithExtraFieldsModifiedCauseRecreation := model.StreamOnExternalTableBase("test", id, externalTable.ID()). + WithCopyGrants(true). + WithComment("bar"). + WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + })) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnExternalTable), + Steps: []resource.TestStep{ + // without optionals + { + Config: config.FromModel(t, baseModel), + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + assert.Check(resource.TestCheckResourceAttrWith(resourceName, "show_output.0.created_on", func(value string) error { + createdOn = value + return nil + })), + ), + }, + // import without optionals + { + Config: config.FromModel(t, baseModel), + ResourceName: resourceName, + ImportState: true, + ImportStateCheck: assert.AssertThatImport(t, + resourceassert.ImportedStreamOnExternalTableResource(t, resourceId). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + ), + }, + // set all fields + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithExtraFields), + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasComment("foo"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "foo")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + ), + }, + // external change + { + PreConfig: func() { + acc.TestClient().Stream.Alter(t, sdk.NewAlterStreamRequest(id).WithSetComment("bar")) + }, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithExtraFields), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(resourceName, plancheck.ResourceActionUpdate), + }, + }, + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasComment("foo"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "foo")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + ), + }, + // update fields + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithExtraFieldsModified), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(resourceName, plancheck.ResourceActionUpdate), + }, + }, + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasComment("bar"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "bar")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + ), + }, + // update fields to force recreation + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/before"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithExtraFieldsModifiedCauseRecreation), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(resourceName, plancheck.ResourceActionUpdate), + }, + }, + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasComment("bar"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "bar")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + assert.Check(resource.TestCheckResourceAttrWith(resourceName, "show_output.0.created_on", func(value string) error { + if value == createdOn { + return fmt.Errorf("view was not recreated") + } + return nil + })), + ), + }, + // import + { + Config: config.FromModel(t, modelWithExtraFieldsModified), + ResourceName: resourceName, + ImportState: true, + ImportStateCheck: assert.AssertThatImport(t, + resourceassert.ImportedStreamOnExternalTableResource(t, resourceId). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasExternalTableString(externalTable.ID().FullyQualifiedName()). + HasCommentString("bar"), + ), + }, + }, + }) +} + +func TestAcc_StreamOnExternalTable_CopyGrants(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + resourceName := "snowflake_stream_on_external_table.test" + + var createdOn string + + stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName()) + _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID) + t.Cleanup(stageCleanup) + + externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation) + t.Cleanup(externalTableCleanup) + + model := model.StreamOnExternalTable("test", id.DatabaseName(), externalTable.ID().FullyQualifiedName(), id.Name(), id.SchemaName()).WithInsertOnly(r.BooleanTrue) + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnTable), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, model.WithCopyGrants(true)), + Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). + HasNameString(id.Name()), + assert.Check(resource.TestCheckResourceAttrWith(resourceName, "show_output.0.created_on", func(value string) error { + createdOn = value + return nil + })), + ), + }, + { + Config: config.FromModel(t, model.WithCopyGrants(false)), + Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). + HasNameString(id.Name()), + assert.Check(resource.TestCheckResourceAttrWith(resourceName, "show_output.0.created_on", func(value string) error { + if value != createdOn { + return fmt.Errorf("view was recreated") + } + return nil + })), + ), + }, + { + Config: config.FromModel(t, model.WithCopyGrants(true)), + Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). + HasNameString(id.Name()), + assert.Check(resource.TestCheckResourceAttrWith(resourceName, "show_output.0.created_on", func(value string) error { + if value != createdOn { + return fmt.Errorf("view was recreated") + } + return nil + })), + ), + }, + }, + }) +} + +// There is no way to check at/before fields in show and describe. That's why we try creating with these values, but do not assert them. +func TestAcc_StreamOnExternalTable_At(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + resourceName := "snowflake_stream_on_external_table.test" + + stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName()) + _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID) + t.Cleanup(stageCleanup) + + externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation) + t.Cleanup(externalTableCleanup) + + commonModel := func() *model.StreamOnExternalTableModel { + return model.StreamOnExternalTableBase("test", id, externalTable.ID()). + WithComment("foo"). + WithInsertOnly(r.BooleanTrue). + WithCopyGrants(false) + } + + modelWithOffset := commonModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + })) + modelWithStream := commonModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "stream": pluginconfig.StringVariable(id.FullyQualifiedName()), + })) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnExternalTable), + Steps: []resource.TestStep{ + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithOffset), + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasExternalTableString(externalTable.ID().FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasCommentString("foo"), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasComment("foo"). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "foo")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + ), + }, + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithStream), + Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). + HasNameString(id.Name()), + ), + }, + // TODO(SNOW-1689111): test timestamps and statements + }, + }) +} + +// There is no way to check at/before fields in show and describe. That's why we try creating with these values, but do not assert them. +func TestAcc_StreamOnExternalTable_Before(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + resourceName := "snowflake_stream_on_external_table.test" + + stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName()) + _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID) + t.Cleanup(stageCleanup) + + externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation) + t.Cleanup(externalTableCleanup) + + commonModel := func() *model.StreamOnExternalTableModel { + return model.StreamOnExternalTableBase("test", id, externalTable.ID()). + WithComment("foo"). + WithInsertOnly(r.BooleanTrue). + WithCopyGrants(false) + } + + modelWithOffset := commonModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + })) + modelWithStream := commonModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "stream": pluginconfig.StringVariable(id.FullyQualifiedName()), + })) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnExternalTable), + Steps: []resource.TestStep{ + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/before"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithOffset), + Check: assert.AssertThat(t, resourceassert.StreamOnExternalTableResource(t, resourceName). + HasNameString(id.Name()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasExternalTableString(externalTable.ID().FullyQualifiedName()). + HasInsertOnlyString(r.BooleanTrue). + HasCommentString("foo"), + resourceshowoutputassert.StreamShowOutput(t, resourceName). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(snowflakeroles.Accountadmin.Name()). + HasComment("foo"). + HasTableName(externalTable.ID().FullyQualifiedName()). + HasSourceType(sdk.StreamSourceTypeExternalTable). + HasBaseTables([]sdk.SchemaObjectIdentifier{externalTable.ID()}). + HasType("DELTA"). + HasStale("false"). + HasMode(sdk.StreamModeInsertOnly). + HasStaleAfterNotEmpty(). + HasInvalidReason("N/A"). + HasOwnerRoleType("ROLE"), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.created_on")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.name", id.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.database_name", id.DatabaseName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.schema_name", id.SchemaName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner", snowflakeroles.Accountadmin.Name())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.comment", "foo")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.table_name", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.source_type", string(sdk.StreamSourceTypeExternalTable))), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.#", "1")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.base_tables.0", externalTable.ID().FullyQualifiedName())), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.type", "DELTA")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.stale", "false")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.mode", string(sdk.StreamModeInsertOnly))), + assert.Check(resource.TestCheckResourceAttrSet(resourceName, "describe_output.0.stale_after")), + assert.Check(resource.TestCheckResourceAttr(resourceName, "describe_output.0.owner_role_type", "ROLE")), + ), + }, + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/before"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithStream), + Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). + HasNameString(id.Name()), + ), + }, + // TODO(SNOW-1689111): test timestamps and statements + }, + }) +} + +func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) { + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + + modelWithInvalidExternalTableId := model.StreamOnExternalTable("test", id.DatabaseName(), "invalid", id.Name(), id.SchemaName()) + + modelWithBefore := model.StreamOnExternalTable("test", id.DatabaseName(), "foo.bar.hoge", id.Name(), id.SchemaName()). + WithComment("foo"). + WithCopyGrants(false). + WithInsertOnly(r.BooleanTrue). + WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + "timestamp": pluginconfig.StringVariable("0"), + "statement": pluginconfig.StringVariable("0"), + "stream": pluginconfig.StringVariable("0"), + })) + + modelWithAt := model.StreamOnExternalTable("test", id.DatabaseName(), "foo.bar.hoge", id.Name(), id.SchemaName()). + WithComment("foo"). + WithCopyGrants(false). + WithInsertOnly(r.BooleanTrue). + WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + "offset": pluginconfig.StringVariable("0"), + "timestamp": pluginconfig.StringVariable("0"), + "statement": pluginconfig.StringVariable("0"), + "stream": pluginconfig.StringVariable("0"), + })) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + Steps: []resource.TestStep{ + // multiple excluding options - before + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/before"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithBefore), + ExpectError: regexp.MustCompile("Error: Invalid combination of arguments"), + }, + // multiple excluding options - at + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_StreamOnExternalTable/at"), + ConfigVariables: tfconfig.ConfigVariablesFromModel(t, modelWithAt), + ExpectError: regexp.MustCompile("Error: Invalid combination of arguments"), + }, + // invalid table id + { + Config: config.FromModel(t, modelWithInvalidExternalTableId), + ExpectError: regexp.MustCompile("Error: Invalid identifier type"), + }, + }, + }) +} diff --git a/pkg/resources/stream_on_table.go b/pkg/resources/stream_on_table.go index f80e20f6eb..e9eacd028f 100644 --- a/pkg/resources/stream_on_table.go +++ b/pkg/resources/stream_on_table.go @@ -7,7 +7,6 @@ import ( "log" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/schemas" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" @@ -16,86 +15,37 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) -var streamOnTableSchema = map[string]*schema.Schema{ - "name": { - Type: schema.TypeString, - Required: true, - ForceNew: true, - Description: blocklistedCharactersFieldDescription("Specifies the identifier for the stream; must be unique for the database and schema in which the stream is created."), - DiffSuppressFunc: suppressIdentifierQuoting, - }, - "schema": { - Type: schema.TypeString, - Required: true, - ForceNew: true, - Description: blocklistedCharactersFieldDescription("The schema in which to create the stream."), - DiffSuppressFunc: suppressIdentifierQuoting, - }, - "database": { - Type: schema.TypeString, - Required: true, - ForceNew: true, - Description: blocklistedCharactersFieldDescription("The database in which to create the stream."), - DiffSuppressFunc: suppressIdentifierQuoting, - }, - "copy_grants": { - Type: schema.TypeBool, - Optional: true, - Default: false, - Description: "Retains the access permissions from the original stream when a new stream is created using the OR REPLACE clause. Use only if the resource is already managed by Terraform. Otherwise, this field is skipped.", - DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool { - return oldValue != "" && oldValue != newValue +var streamOnTableSchema = func() map[string]*schema.Schema { + streamOnTable := map[string]*schema.Schema{ + "table": { + Type: schema.TypeString, + Required: true, + Description: blocklistedCharactersFieldDescription("Specifies an identifier for the table the stream will monitor."), + DiffSuppressFunc: SuppressIfAny(suppressIdentifierQuoting, IgnoreChangeToCurrentSnowflakeValueInShow("table_name")), + ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](), }, - }, - "table": { - Type: schema.TypeString, - Required: true, - Description: blocklistedCharactersFieldDescription("Specifies an identifier for the table the stream will monitor."), - DiffSuppressFunc: SuppressIfAny(suppressIdentifierQuoting, IgnoreChangeToCurrentSnowflakeValueInShow("table_name")), - ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](), - }, - "append_only": { - Type: schema.TypeString, - Optional: true, - Default: BooleanDefault, - ValidateDiagFunc: validateBooleanString, - DiffSuppressFunc: IgnoreChangeToCurrentSnowflakeValueInShowWithMapping("mode", func(x any) any { - return x.(string) == string(sdk.StreamModeAppendOnly) - }), - Description: booleanStringFieldDescription("Specifies whether this is an append-only stream."), - }, - "show_initial_rows": { - Type: schema.TypeString, - Optional: true, - Default: BooleanDefault, - ValidateDiagFunc: validateBooleanString, - Description: externalChangesNotDetectedFieldDescription(booleanStringFieldDescription("Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.")), - }, - "comment": { - Type: schema.TypeString, - Optional: true, - Description: "Specifies a comment for the stream.", - }, - AtAttributeName: atSchema, - BeforeAttributeName: beforeSchema, - ShowOutputAttributeName: { - Type: schema.TypeList, - Computed: true, - Description: "Outputs the result of `SHOW STREAMS` for the given stream.", - Elem: &schema.Resource{ - Schema: schemas.ShowStreamSchema, + "append_only": { + Type: schema.TypeString, + Optional: true, + Default: BooleanDefault, + ValidateDiagFunc: validateBooleanString, + DiffSuppressFunc: IgnoreChangeToCurrentSnowflakeValueInShowWithMapping("mode", func(x any) any { + return x.(string) == string(sdk.StreamModeAppendOnly) + }), + Description: booleanStringFieldDescription("Specifies whether this is an append-only stream."), }, - }, - DescribeOutputAttributeName: { - Type: schema.TypeList, - Computed: true, - Description: "Outputs the result of `DESCRIBE STREAM` for the given stream.", - Elem: &schema.Resource{ - Schema: schemas.DescribeStreamSchema, + "show_initial_rows": { + Type: schema.TypeString, + Optional: true, + Default: BooleanDefault, + ValidateDiagFunc: validateBooleanString, + Description: externalChangesNotDetectedFieldDescription(booleanStringFieldDescription("Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.")), }, - }, - FullyQualifiedNameAttributeName: schemas.FullyQualifiedNameSchema, -} + AtAttributeName: atSchema, + BeforeAttributeName: beforeSchema, + } + return helpers.MergeMaps(streamCommonSchema, streamOnTable) +}() func StreamOnTable() *schema.Resource { return &schema.Resource{ @@ -160,21 +110,15 @@ func CreateStreamOnTable(orReplace bool) schema.CreateContextFunc { } req := sdk.NewCreateOnTableStreamRequest(id, tableId) - if orReplace { - req.WithOrReplace(true) - if d.Get("copy_grants").(bool) { - req.WithCopyGrants(true) - } - } - err = booleanStringAttributeCreate(d, "append_only", &req.AppendOnly) - if err != nil { - return diag.FromErr(err) - } - - err = booleanStringAttributeCreate(d, "show_initial_rows", &req.ShowInitialRows) - if err != nil { - return diag.FromErr(err) + errs := errors.Join( + copyGrantsAttributeCreate(d, orReplace, &req.OrReplace, &req.CopyGrants), + booleanStringAttributeCreate(d, "append_only", &req.AppendOnly), + booleanStringAttributeCreate(d, "show_initial_rows", &req.ShowInitialRows), + stringAttributeCreate(d, "comment", &req.Comment), + ) + if errs != nil { + return diag.FromErr(errs) } streamTimeTravelReq := handleStreamTimeTravel(d) @@ -182,10 +126,6 @@ func CreateStreamOnTable(orReplace bool) schema.CreateContextFunc { req.WithOn(*streamTimeTravelReq) } - if v, ok := d.GetOk("comment"); ok { - req.WithComment(v.(string)) - } - err = client.Streams.CreateOnTable(ctx, req) if err != nil { return diag.FromErr(err) @@ -210,16 +150,13 @@ func ReadStreamOnTable(withExternalChangesMarking bool) schema.ReadContextFunc { return diag.Diagnostics{ diag.Diagnostic{ Severity: diag.Warning, - Summary: "Failed to stream. Marking the resource as removed.", + Summary: "Failed to query stream. Marking the resource as removed.", Detail: fmt.Sprintf("stream name: %s, Err: %s", id.FullyQualifiedName(), err), }, } } return diag.FromErr(err) } - if err := d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()); err != nil { - return diag.FromErr(err) - } tableId, err := sdk.ParseSchemaObjectIdentifier(*stream.TableName) if err != nil { return diag.Diagnostics{ @@ -233,13 +170,13 @@ func ReadStreamOnTable(withExternalChangesMarking bool) schema.ReadContextFunc { if err := d.Set("table", tableId.FullyQualifiedName()); err != nil { return diag.FromErr(err) } - if err := d.Set("comment", *stream.Comment); err != nil { - return diag.FromErr(err) - } streamDescription, err := client.Streams.Describe(ctx, id) if err != nil { return diag.FromErr(err) } + if err := handleStreamRead(d, id, stream, streamDescription); err != nil { + return diag.FromErr(err) + } if withExternalChangesMarking { var mode sdk.StreamMode if stream.Mode != nil { @@ -258,12 +195,6 @@ func ReadStreamOnTable(withExternalChangesMarking bool) schema.ReadContextFunc { return diag.FromErr(err) } - if err = d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}); err != nil { - return diag.FromErr(err) - } - if err = d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}); err != nil { - return diag.FromErr(err) - } return nil } } diff --git a/pkg/resources/stream_on_table_acceptance_test.go b/pkg/resources/stream_on_table_acceptance_test.go index 99604e330c..bfd5edafa3 100644 --- a/pkg/resources/stream_on_table_acceptance_test.go +++ b/pkg/resources/stream_on_table_acceptance_test.go @@ -34,11 +34,9 @@ func TestAcc_StreamOnTable_Basic(t *testing.T) { table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) t.Cleanup(cleanupTable) - baseModel := func() *model.StreamOnTableModel { - return model.StreamOnTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), table.ID().FullyQualifiedName()) - } + baseModel := model.StreamOnTableBase("test", id, table.ID()) - modelWithExtraFields := baseModel(). + modelWithExtraFields := model.StreamOnTableBase("test", id, table.ID()). WithCopyGrants(false). WithComment("foo"). WithAppendOnly(r.BooleanTrue). @@ -47,7 +45,7 @@ func TestAcc_StreamOnTable_Basic(t *testing.T) { "offset": pluginconfig.StringVariable("0"), })) - modelWithExtraFieldsDefaultMode := baseModel(). + modelWithExtraFieldsDefaultMode := model.StreamOnTableBase("test", id, table.ID()). WithCopyGrants(false). WithComment("foo"). WithAppendOnly(r.BooleanFalse). @@ -65,7 +63,7 @@ func TestAcc_StreamOnTable_Basic(t *testing.T) { Steps: []resource.TestStep{ // without optionals { - Config: config.FromModel(t, baseModel()), + Config: config.FromModel(t, baseModel), Check: assert.AssertThat(t, resourceassert.StreamOnTableResource(t, resourceName). HasNameString(id.Name()). HasDatabaseString(id.DatabaseName()). @@ -107,7 +105,7 @@ func TestAcc_StreamOnTable_Basic(t *testing.T) { }, // import without optionals { - Config: config.FromModel(t, baseModel()), + Config: config.FromModel(t, baseModel), ResourceName: resourceName, ImportState: true, ImportStateCheck: assert.AssertThatImport(t, @@ -354,21 +352,21 @@ func TestAcc_StreamOnTable_At(t *testing.T) { lastQueryId := acc.TestClient().Context.LastQueryId(t) - baseModel := func() *model.StreamOnTableModel { - return model.StreamOnTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), table.ID().FullyQualifiedName()). + commonModel := func() *model.StreamOnTableModel { + return model.StreamOnTableBase("test", id, table.ID()). WithComment("foo"). WithAppendOnly(r.BooleanTrue). WithShowInitialRows(r.BooleanTrue). WithCopyGrants(false) } - modelWithOffset := baseModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithOffset := commonModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "offset": pluginconfig.StringVariable("0"), })) - modelWithStream := baseModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithStream := commonModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "stream": pluginconfig.StringVariable(id.FullyQualifiedName()), })) - modelWithStatement := baseModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithStatement := commonModel().WithAtValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "statement": pluginconfig.StringVariable(lastQueryId), })) @@ -471,21 +469,21 @@ func TestAcc_StreamOnTable_Before(t *testing.T) { lastQueryId := acc.TestClient().Context.LastQueryId(t) - baseModel := func() *model.StreamOnTableModel { - return model.StreamOnTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), table.ID().FullyQualifiedName()). + commonModel := func() *model.StreamOnTableModel { + return model.StreamOnTableBase("test", id, table.ID()). WithComment("foo"). WithAppendOnly(r.BooleanTrue). WithShowInitialRows(r.BooleanTrue). WithCopyGrants(false) } - modelWithOffset := baseModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithOffset := commonModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "offset": pluginconfig.StringVariable("0"), })) - modelWithStream := baseModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithStream := commonModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "stream": pluginconfig.StringVariable(id.FullyQualifiedName()), })) - modelWithStatement := baseModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ + modelWithStatement := commonModel().WithBeforeValue(pluginconfig.MapVariable(map[string]pluginconfig.Variable{ "statement": pluginconfig.StringVariable(lastQueryId), })) @@ -595,7 +593,6 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) { TerraformVersionChecks: []tfversion.TerraformVersionCheck{ tfversion.RequireAbove(tfversion.Version1_5_0), }, - CheckDestroy: acc.CheckDestroy(t, resources.Saml2SecurityIntegration), Steps: []resource.TestStep{ // multiple excluding options - before { diff --git a/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/test.tf b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/test.tf new file mode 100644 index 0000000000..7e2820c23b --- /dev/null +++ b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/test.tf @@ -0,0 +1,18 @@ +resource "snowflake_stream_on_external_table" "test" { + name = var.name + database = var.database + schema = var.schema + + copy_grants = var.copy_grants + external_table = var.external_table + insert_only = var.insert_only + + at { + timestamp = try(var.at["timestamp"], null) + offset = try(var.at["offset"], null) + stream = try(var.at["stream"], null) + statement = try(var.at["statement"], null) + } + + comment = var.comment +} diff --git a/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/variables.tf b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/variables.tf new file mode 100644 index 0000000000..72babf610c --- /dev/null +++ b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/at/variables.tf @@ -0,0 +1,31 @@ +variable "name" { + type = string +} + +variable "database" { + type = string +} + +variable "schema" { + type = string +} + +variable "external_table" { + type = string +} + +variable "copy_grants" { + type = bool +} + +variable "insert_only" { + type = bool +} + +variable "at" { + type = map(string) +} + +variable "comment" { + type = string +} diff --git a/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/test.tf b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/test.tf new file mode 100644 index 0000000000..08f3ba72b6 --- /dev/null +++ b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/test.tf @@ -0,0 +1,18 @@ +resource "snowflake_stream_on_external_table" "test" { + name = var.name + database = var.database + schema = var.schema + + copy_grants = var.copy_grants + external_table = var.external_table + insert_only = var.insert_only + + before { + timestamp = try(var.before["timestamp"], null) + offset = try(var.before["offset"], null) + stream = try(var.before["stream"], null) + statement = try(var.before["statement"], null) + } + + comment = var.comment +} diff --git a/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/variables.tf b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/variables.tf new file mode 100644 index 0000000000..e54e572e35 --- /dev/null +++ b/pkg/resources/testdata/TestAcc_StreamOnExternalTable/before/variables.tf @@ -0,0 +1,31 @@ +variable "name" { + type = string +} + +variable "database" { + type = string +} + +variable "schema" { + type = string +} + +variable "external_table" { + type = string +} + +variable "copy_grants" { + type = bool +} + +variable "insert_only" { + type = bool +} + +variable "before" { + type = map(string) +} + +variable "comment" { + type = string +} diff --git a/pkg/resources/view.go b/pkg/resources/view.go index a5fb1dbcb8..cfdd757257 100644 --- a/pkg/resources/view.go +++ b/pkg/resources/view.go @@ -390,7 +390,10 @@ func CreateView(orReplace bool) schema.CreateContextFunc { req.WithComment(v) } - if v := d.Get("column"); len(v.([]any)) > 0 { + // Read directly from the config. Otherwise, when recreating the resource with columns already in the state, + // the column would get populated with old values. This could cause errors when columns are not set in the config, + // but are changed in `statement`. + if v := d.Get("column"); len(d.GetRawConfig().AsValueMap()["column"].AsValueSlice()) > 0 { columns, err := extractColumns(v) if err != nil { return diag.FromErr(err) diff --git a/pkg/resources/view_acceptance_test.go b/pkg/resources/view_acceptance_test.go index 52b0780ccb..016add5fac 100644 --- a/pkg/resources/view_acceptance_test.go +++ b/pkg/resources/view_acceptance_test.go @@ -923,6 +923,63 @@ func TestAcc_View_Issue3073(t *testing.T) { }) } +func TestAcc_View_IncorrectColumnsWithOrReplace(t *testing.T) { + t.Setenv(string(testenvs.ConfigureClientOnce), "") + statement := `SELECT ROLE_NAME as "role_name", ROLE_OWNER as "role_owner" FROM INFORMATION_SCHEMA.APPLICABLE_ROLES` + statementUnquotedColumns := `SELECT ROLE_NAME as role_name, ROLE_OWNER as role_owner FROM INFORMATION_SCHEMA.APPLICABLE_ROLES` + statementUnquotedColumns3 := `SELECT ROLE_NAME as role_name, ROLE_OWNER as role_owner, IS_GRANTABLE as is_grantable FROM INFORMATION_SCHEMA.APPLICABLE_ROLES` + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + viewModel := model.View("test", id.DatabaseName(), id.Name(), id.SchemaName(), statement) + viewLowercaseStatementModel := model.View("test", id.DatabaseName(), id.Name(), id.SchemaName(), statementUnquotedColumns) + viewLowercaseStatementModel3 := model.View("test", id.DatabaseName(), id.Name(), id.SchemaName(), statementUnquotedColumns3) + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.View), + Steps: []resource.TestStep{ + { + Config: accconfig.FromModel(t, viewModel), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PostApplyPostRefresh: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction("snowflake_view.test", plancheck.ResourceActionNoop), + }, + }, + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_view.test", "name", id.Name()), + resource.TestCheckResourceAttr("snowflake_view.test", "column.#", "2"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.0.column_name", "role_name"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.1.column_name", "role_owner"), + ), + }, + // use columns without quotes in the statement + { + Config: accconfig.FromModel(t, viewLowercaseStatementModel), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_view.test", "name", id.Name()), + resource.TestCheckResourceAttr("snowflake_view.test", "column.#", "2"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.0.column_name", "ROLE_NAME"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.1.column_name", "ROLE_OWNER"), + ), + }, + // add a new column to the statement + { + Config: accconfig.FromModel(t, viewLowercaseStatementModel3), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_view.test", "name", id.Name()), + resource.TestCheckResourceAttr("snowflake_view.test", "column.#", "3"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.0.column_name", "ROLE_NAME"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.1.column_name", "ROLE_OWNER"), + resource.TestCheckResourceAttr("snowflake_view.test", "column.2.column_name", "IS_GRANTABLE"), + ), + }, + }, + }) +} + func TestAcc_ViewChangeCopyGrants(t *testing.T) { t.Setenv(string(testenvs.ConfigureClientOnce), "") id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() diff --git a/pkg/sdk/streams_gen.go b/pkg/sdk/streams_gen.go index 7ca207b1e9..41e2eef921 100644 --- a/pkg/sdk/streams_gen.go +++ b/pkg/sdk/streams_gen.go @@ -181,6 +181,10 @@ func (v *Stream) IsAppendOnly() bool { return v != nil && v.Mode != nil && *v.Mode == StreamModeAppendOnly } +func (v *Stream) IsInsertOnly() bool { + return v != nil && v.Mode != nil && *v.Mode == StreamModeInsertOnly +} + // DescribeStreamOptions is based on https://docs.snowflake.com/en/sql-reference/sql/desc-stream. type DescribeStreamOptions struct { describe bool `ddl:"static" sql:"DESCRIBE"` diff --git a/pkg/sdk/testint/streams_gen_integration_test.go b/pkg/sdk/testint/streams_gen_integration_test.go index f1900073c7..79f30b23c6 100644 --- a/pkg/sdk/testint/streams_gen_integration_test.go +++ b/pkg/sdk/testint/streams_gen_integration_test.go @@ -116,7 +116,7 @@ func TestInt_Streams(t *testing.T) { externalTableId := testClientHelper().Ids.RandomSchemaObjectIdentifier() externalTableReq := sdk.NewCreateExternalTableRequest(externalTableId, stageLocation).WithFileFormat(*sdk.NewExternalTableFileFormatRequest().WithFileFormatType(sdk.ExternalTableFileFormatTypeJSON)) - _, externalTableCleanup := testClientHelper().ExternalTable.CreateOnTableWithRequest(t, externalTableReq) + _, externalTableCleanup := testClientHelper().ExternalTable.CreateWithRequest(t, externalTableReq) t.Cleanup(externalTableCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() diff --git a/templates/resources/stream_on_external_table.md.tmpl b/templates/resources/stream_on_external_table.md.tmpl new file mode 100644 index 0000000000..c82f23be97 --- /dev/null +++ b/templates/resources/stream_on_external_table.md.tmpl @@ -0,0 +1,37 @@ +--- +page_title: "{{.Name}} {{.Type}} - {{.ProviderName}}" +subcategory: "" +description: |- +{{ if gt (len (split .Description "")) 1 -}} +{{ index (split .Description "") 1 | plainmarkdown | trimspace | prefixlines " " }} +{{- else -}} +{{ .Description | plainmarkdown | trimspace | prefixlines " " }} +{{- end }} +--- + +!> **V1 release candidate** This resource was reworked and is a release candidate for the V1. We do not expect significant changes in it before the V1. We will welcome any feedback and adjust the resource if needed. Any errors reported will be resolved with a higher priority. We encourage checking this resource out before the V1 release. Please follow the [migration guide](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/MIGRATION_GUIDE.md#v0960--v0970) to use it. + +!> **Note about copy_grants** Fields like `external_table`, `insert_only`, `at`, `before` can not be ALTERed on Snowflake side (check [docs](https://docs.snowflake.com/en/sql-reference/sql/alter-stream)), and a change means recreation of the resource. ForceNew can not be used because it does not preserve grants from `copy_grants`. Beware that even though a change is marked as update, the resource is recreated. + +# {{.Name}} ({{.Type}}) + +{{ .Description | trimspace }} + +{{ if .HasExample -}} +## Example Usage + +{{ tffile (printf "examples/resources/%s/resource.tf" .Name)}} +-> **Note** Instead of using fully_qualified_name, you can reference objects managed outside Terraform by constructing a correct ID, consult [identifiers guide](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/guides/identifiers#new-computed-fully-qualified-name-field-in-resources). + + +{{- end }} + +{{ .SchemaMarkdown | trimspace }} +{{- if .HasImport }} + +## Import + +Import is supported using the following syntax: + +{{ codefile "shell" (printf "examples/resources/%s/import.sh" .Name)}} +{{- end }} diff --git a/templates/resources/stream_on_table.md.tmpl b/templates/resources/stream_on_table.md.tmpl index 20eda79f70..53dd2b9daf 100644 --- a/templates/resources/stream_on_table.md.tmpl +++ b/templates/resources/stream_on_table.md.tmpl @@ -11,6 +11,8 @@ description: |- !> **V1 release candidate** This resource was reworked and is a release candidate for the V1. We do not expect significant changes in it before the V1. We will welcome any feedback and adjust the resource if needed. Any errors reported will be resolved with a higher priority. We encourage checking this resource out before the V1 release. Please follow the [migration guide](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/MIGRATION_GUIDE.md#v0960--v0970) to use it. +!> **Note about copy_grants** Fields like `table`, `append_only`, `at`, `before`, `show_initial_rows` can not be ALTERed on Snowflake side (check [docs](https://docs.snowflake.com/en/sql-reference/sql/alter-stream)), and a change means recreation of the resource. ForceNew can not be used because it does not preserve grants from `copy_grants`. Beware that even though a change is marked as update, the resource is recreated. + # {{.Name}} ({{.Type}}) {{ .Description | trimspace }}