Skip to content

Commit

Permalink
fix: Handle external type changes in stream resources (#3164)
Browse files Browse the repository at this point in the history
<!-- Feel free to delete comments as you fill this in -->
- handle external type changes in stream resources
<!-- summary of changes -->

## Test Plan
<!-- detail ways in which this PR has been tested or needs to be tested
-->
* [x] acceptance tests
<!-- add more below if you think they are relevant -->
* [ ] …

## References
<!-- issues documentation links, etc  -->
  • Loading branch information
sfc-gh-jmichalak authored Oct 31, 2024
1 parent b18bf30 commit 9fd8f88
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/resources/stream_on_directory_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ resource "snowflake_stream_on_directory_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedatt--describe_output"></a>
### Nested Schema for `describe_output`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_external_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ resource "snowflake_stream_on_external_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ resource "snowflake_stream_on_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_view.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ resource "snowflake_stream_on_view" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions pkg/acceptance/helpers/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnT
return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) CreateOnViewWithRequest(t *testing.T, req *sdk.CreateOnViewStreamRequest) (*sdk.Stream, func()) {
t.Helper()
ctx := context.Background()

err := c.client().CreateOnView(ctx, req)
require.NoError(t, err)

stream, err := c.client().ShowByID(ctx, req.GetName())
require.NoError(t, err)

return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) {
t.Helper()
ctx := context.Background()
Expand Down
32 changes: 32 additions & 0 deletions pkg/resources/custom_diffs.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,38 @@ func RecreateWhenSecretTypeChangedExternally(secretType sdk.SecretType) schema.C
}
}

// RecreateWhenStreamTypeChangedExternally recreates a stream when argument streamType is different than in the state.
func RecreateWhenStreamTypeChangedExternally(streamType sdk.StreamSourceType) schema.CustomizeDiffFunc {
return RecreateWhenResourceTypeChangedExternally("stream_type", streamType, sdk.ToStreamSourceType)
}

// RecreateWhenResourceTypeChangedExternally recreates a resource when argument wantType is different than the value in typeField.
func RecreateWhenResourceTypeChangedExternally[T ~string](typeField string, wantType T, toType func(string) (T, error)) schema.CustomizeDiffFunc {
return func(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
if n := diff.Get(typeField); n != nil {
logging.DebugLogger.Printf("[DEBUG] new external value for %s: %s\n", typeField, n.(string))

gotTypeRaw := n.(string)
// if the type is empty, the state is empty - do not recreate
if gotTypeRaw == "" {
return nil
}

gotType, err := toType(gotTypeRaw)
if err != nil {
return fmt.Errorf("unknown type: %w", err)
}
if gotType != wantType {
// we have to set here a value instead of just SetNewComputed
// because with empty value (default snowflake behavior for type) ForceNew fails
// because there are no changes (at least from the SDKv2 point of view) for typeField
return errors.Join(diff.SetNew(typeField, "<changed externally>"), diff.ForceNew(typeField))
}
}
return nil
}
}

// RecreateWhenStreamIsStale detects when the stream is stale, and sets a `false` value for `stale` field.
// This means that the provider can detect that change in `stale` from `true` to `false`, where `false` is our desired state.
func RecreateWhenStreamIsStale() schema.CustomizeDiffFunc {
Expand Down
6 changes: 6 additions & 0 deletions pkg/resources/stream_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ var streamCommonSchema = map[string]*schema.Schema{
Optional: true,
Description: "Specifies a comment for the stream.",
},
"stream_type": {
Type: schema.TypeString,
Computed: true,
Description: "Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.",
},
ShowOutputAttributeName: {
Type: schema.TypeList,
Computed: true,
Expand Down Expand Up @@ -203,6 +208,7 @@ func handleStreamRead(d *schema.ResourceData,
) error {
return errors.Join(
d.Set("comment", stream.Comment),
d.Set("stream_type", stream.SourceType),
d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}),
d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}),
d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()),
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/stream_on_directory_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func StreamOnDirectoryTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, ShowOutputAttributeName, "stage", "comment"),
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, DescribeOutputAttributeName, "stage", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeStage),
),

Schema: streamOnDirectoryTableSchema,
Expand Down
55 changes: 55 additions & 0 deletions pkg/resources/stream_on_directory_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnDirectoryTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -459,3 +460,57 @@ func TestAcc_StreamOnDirectoryTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnDirectoryTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stage, cleanupStage := acc.TestClient().Stage.CreateStageWithDirectory(t)
t.Cleanup(cleanupStage)
model := model.StreamOnDirectoryTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), stage.ID().FullyQualifiedName())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func StreamOnExternalTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeExternalTable),
),

Schema: streamOnExternalTableSchema,
Expand Down
60 changes: 60 additions & 0 deletions pkg/resources/stream_on_external_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnExternalTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -890,3 +891,62 @@ func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnExternalTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName())
_, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID)
t.Cleanup(stageCleanup)

externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation)
t.Cleanup(externalTableCleanup)
model := model.StreamOnExternalTableBase("test", id, externalTable.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnTableSchema, ShowOutputAttributeName, "table", "append_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnTableSchema, DescribeOutputAttributeName, "table", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeTable),
),

Schema: streamOnTableSchema,
Expand Down
57 changes: 57 additions & 0 deletions pkg/resources/stream_on_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -835,3 +836,59 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)

model := model.StreamOnTableBase("test", id, table.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName())
view, cleanupView := acc.TestClient().View.CreateView(t, statement)
t.Cleanup(cleanupView)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnViewWithRequest(t, sdk.NewCreateOnViewStreamRequest(id, view.ID()))
t.Cleanup(cleanup)
require.Equal(t, sdk.StreamSourceTypeView, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnView() *schema.Resource {
ComputedIfAnyAttributeChanged(StreamOnViewSchema, ShowOutputAttributeName, "view", "append_only", "comment"),
ComputedIfAnyAttributeChanged(StreamOnViewSchema, DescribeOutputAttributeName, "view", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeView),
),

Schema: StreamOnViewSchema,
Expand Down
Loading

0 comments on commit 9fd8f88

Please sign in to comment.