Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add option to create streams which reference EXTERNAL TABLEs #661

Closed
wants to merge 12 commits into from
Closed
8 changes: 5 additions & 3 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ resource snowflake_stream stream {
schema = "schema"
name = "stream"

on_table = "table"
append_only = false
insert_only = false
on_external_table = true
on_table = "table"
append_only = false
insert_only = false

owner = "role1"
}
Expand All @@ -43,6 +44,7 @@ resource snowflake_stream stream {
- **comment** (String) Specifies a comment for the stream.
- **id** (String) The ID of this resource.
- **insert_only** (Boolean) Create an insert only stream type.
- **on_external_table** (Boolean) Specifies whether the table being monitored is an external table.
- **on_table** (String) Name of the table the stream will monitor.
- **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

Expand Down
7 changes: 4 additions & 3 deletions examples/resources/snowflake_stream/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ resource snowflake_stream stream {
schema = "schema"
name = "stream"

on_table = "table"
append_only = false
insert_only = false
on_external_table = true
on_table = "table"
append_only = false
insert_only = false

owner = "role1"
}
8 changes: 8 additions & 0 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ var streamSchema = map[string]*schema.Schema{
ForceNew: true,
Description: "Name of the table the stream will monitor.",
},
"on_external_table": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Description: "Specifies whether the table being monitored is an external table.",
},
"append_only": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -175,6 +181,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
schema := d.Get("schema").(string)
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
onExternalTable := d.Get("on_external_table").(bool)
appendOnly := d.Get("append_only").(bool)
insertOnly := d.Get("insert_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)
Expand All @@ -186,6 +193,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
return err
}

builder.WithExternalTable(onExternalTable)
builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
Expand Down
79 changes: 79 additions & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

func TestAcc_Stream(t *testing.T) {
accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))
// Ref: https://github.com/chanzuckerberg/terraform-provider-snowflake/pull/661#discussion_r696067813
accNameExternalTable := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

resource.ParallelTest(t, resource.TestCase{
Providers: providers(),
Expand Down Expand Up @@ -40,6 +42,19 @@ func TestAcc_Stream(t *testing.T) {
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
Config: externalTableStreamConfig(accNameExternalTable),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accNameExternalTable),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accNameExternalTable, accNameExternalTable, "STREAM_ON_EXTERNAL_TABLE")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "on_external_table", true),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
},
})
}
Expand Down Expand Up @@ -90,3 +105,67 @@ resource "snowflake_stream" "test_stream" {
`
return fmt.Sprintf(s, name, name, name, append_only_config)
}

func externalTableStreamConfig(name string) string {
// Refer to external_table_acceptance_test.go for the original source on
// external table resources and dependents (modified slightly here).
locations := []string{"s3://com.example.bucket/prefix"}
s := `
resource "snowflake_database" "test" {
name = "%v"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test" {
name = "%v"
database = snowflake_database.test.name
comment = "Terraform acceptance test"
}

resource "snowflake_stage" "test" {
name = "%v"
url = "s3://com.example.bucket/prefix"
database = snowflake_database.test.name
schema = snowflake_schema.test.name
comment = "Terraform acceptance test"
storage_integration = snowflake_storage_integration.external_table_stream_integration.name
}

resource "snowflake_storage_integration" "external_table_stream_integration" {
name = "%v"
storage_allowed_locations = %q
storage_provider = "S3"
storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test"
}

resource "snowflake_external_table" "test_external_stream_table" {
database = snowflake_database.test.name
schema = snowflake_schema.test.name
name = "STREAM_ON_EXTERNAL_TABLE"
comment = "Terraform acceptance test"
column {
name = "column1"
type = "STRING"
as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')"
}
column {
name = "column2"
type = "TIMESTAMP_NTZ(9)"
as = "($1:'CreatedDate'::timestamp)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's not happy with this... Is this written correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should be; I grabbed it directly from the external_table_acceptance_test.go test. I can dig in a bit tonight hopefully, sorry for the many issues with this test

}
file_format = "TYPE = CSV"
location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}"
}

resource "snowflake_stream" "test_external_table_stream" {
database = snowflake_database.test.name
schema = snowflake_schema.test.name
name = "%s"
comment = "Terraform acceptance test"
on_external_table = true
on_table = "${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_external_table.test_external_stream_table.name}"
}
`

return fmt.Sprintf(s, name, name, name, name, locations, name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't want to reuse the config above, you'll have to create another random string to pass in so that the names are different. Just put
accNameExternalTable := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) below the definition ofaccName above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then replace the accNames with accNameExternalTable in your config above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - thank you again, see the most recent commits!

}
25 changes: 25 additions & 0 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ func TestStreamCreate(t *testing.T) {
})
}

func TestStreamCreateOnExternalTable(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"on_external_table": true,
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON EXTERNAL TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
})
}

func expectStreamRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY")
mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN DATABASE "database_name"`).WillReturnRows(rows)
Expand Down
14 changes: 13 additions & 1 deletion pkg/snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type StreamBuilder struct {
name string
db string
schema string
externalTable bool
onTable string
appendOnly bool
insertOnly bool
Expand Down Expand Up @@ -53,6 +54,11 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde
return sb
}

func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder {
sb.externalTable = b
return sb
}

func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
sb.appendOnly = b
return sb
Expand Down Expand Up @@ -90,7 +96,13 @@ func (sb *StreamBuilder) Create() string {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE STREAM %v`, sb.QualifiedName()))

q.WriteString(fmt.Sprintf(` ON TABLE %v`, sb.onTable))
q.WriteString(` ON`)

if sb.externalTable {
q.WriteString(` EXTERNAL`)
}

q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable))

if sb.comment != "" {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment)))
Expand Down
3 changes: 3 additions & 0 deletions pkg/snowflake/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func TestStreamCreate(t *testing.T) {

s.WithInsertOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)

s.WithExternalTable(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON EXTERNAL TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)
}

func TestStreamChangeComment(t *testing.T) {
Expand Down