Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add option to create streams which reference EXTERNAL TABLEs #661

Closed
wants to merge 12 commits into from
Closed
8 changes: 5 additions & 3 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ resource snowflake_stream stream {
schema = "schema"
name = "stream"

on_table = "table"
append_only = false
insert_only = false
on_external_table = true
on_table = "table"
append_only = false
insert_only = false

owner = "role1"
}
Expand All @@ -43,6 +44,7 @@ resource snowflake_stream stream {
- **comment** (String) Specifies a comment for the stream.
- **id** (String) The ID of this resource.
- **insert_only** (Boolean) Create an insert only stream type.
- **on_external_table** (Boolean) Specifies whether the table being monitored is an external table.
- **on_table** (String) Name of the table the stream will monitor.
- **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

Expand Down
7 changes: 4 additions & 3 deletions examples/resources/snowflake_stream/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ resource snowflake_stream stream {
schema = "schema"
name = "stream"

on_table = "table"
append_only = false
insert_only = false
on_external_table = true
on_table = "table"
append_only = false
insert_only = false

owner = "role1"
}
8 changes: 8 additions & 0 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ var streamSchema = map[string]*schema.Schema{
ForceNew: true,
Description: "Name of the table the stream will monitor.",
},
"on_external_table": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Description: "Specifies whether the table being monitored is an external table.",
},
"append_only": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -175,6 +181,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
schema := d.Get("schema").(string)
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
onExternalTable := d.Get("on_external_table").(bool)
appendOnly := d.Get("append_only").(bool)
insertOnly := d.Get("insert_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)
Expand All @@ -186,6 +193,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
return err
}

builder.WithExternalTable(onExternalTable)
builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
Expand Down
76 changes: 76 additions & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ func TestAcc_Stream(t *testing.T) {
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
Config: externalTableStreamConfig(accName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_TABLE")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be on_external_table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think the name is supposed be accName, as well... not STREAM_ON_TABLE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @alldoami on_external_table is a bool, and creating a stream on an external table without ... EXTERNAL TABLE will raise an error; so, the acceptance test is simply ensuring that the table referenced by on_table (which is an external table created via externalTableStreamConfig) exists.

That said, I should definitely have added a check that on_external_table is true - see the most recent commits, which also resolve the STREAM_ON_TABLE reference - thanks! I'll have to set up an environment to run these tests locally which will reduce impact on you via these PRs, sorry.

resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
},
})
}
Expand Down Expand Up @@ -90,3 +102,67 @@ resource "snowflake_stream" "test_stream" {
`
return fmt.Sprintf(s, name, name, name, append_only_config)
}

func externalTableStreamConfig(name string) string {
// Refer to external_table_acceptance_test.go for the original source on
// external table resources and dependents (modified slightly here).
locations := []string{"s3://com.example.bucket/prefix"}
s := `
resource "snowflake_database" "test" {
name = "%v"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test" {
name = "%v"
database = snowflake_database.test.name
comment = "Terraform acceptance test"
}

resource "snowflake_stage" "test" {
name = "%v"
url = "s3://com.example.bucket/prefix"
database = snowflake_database.test.name
schema = snowflake_schema.test.name
comment = "Terraform acceptance test"
storage_integration = snowflake_storage_integration.external_table_stream_integration.name
}

resource "snowflake_storage_integration" "external_table_stream_integration" {
name = "%v"
storage_allowed_locations = %q
storage_provider = "S3"
storage_aws_role_arn = "arn:aws:iam::000000000001:/role/test"
}

resource "snowflake_external_table" "test_external_stream_table" {
database = snowflake_database.test.name
schema = snowflake_schema.test.name
name = "%v"
comment = "Terraform acceptance test"
column {
name = "column1"
type = "STRING"
as = "TO_VARCHAR(TO_TIMESTAMP_NTZ(value:unix_timestamp_property::NUMBER, 3), 'yyyy-mm-dd-hh')"
}
column {
name = "column2"
type = "TIMESTAMP_NTZ(9)"
as = "($1:'CreatedDate'::timestamp)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's not happy with this... Is this written correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should be; I grabbed it directly from the external_table_acceptance_test.go test. I can dig in a bit tonight hopefully, sorry for the many issues with this test

}
file_format = "TYPE = CSV"
location = "@${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_stage.test.name}"
}

resource "snowflake_stream" "test_external_table_stream" {
database = snowflake_database.test_database.name
momer marked this conversation as resolved.
Show resolved Hide resolved
schema = snowflake_schema.test_schema.name
momer marked this conversation as resolved.
Show resolved Hide resolved
name = "%s"
comment = "Terraform acceptance test"
on_external_table = true
on_table = "${snowflake_database.test_database.name}.${snowflake_schema.test_schema.name}.${snowflake_external_table.test_external_stream_table.name}"
momer marked this conversation as resolved.
Show resolved Hide resolved
}
`

return fmt.Sprintf(s, name, name, name, name, locations, name, name)
}
25 changes: 25 additions & 0 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ func TestStreamCreate(t *testing.T) {
})
}

func TestStreamCreateOnExternalTable(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"on_external_table": true,
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON EXTERNAL TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
})
}

func expectStreamRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY")
mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN DATABASE "database_name"`).WillReturnRows(rows)
Expand Down
14 changes: 13 additions & 1 deletion pkg/snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type StreamBuilder struct {
name string
db string
schema string
externalTable bool
onTable string
appendOnly bool
insertOnly bool
Expand Down Expand Up @@ -53,6 +54,11 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde
return sb
}

func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder {
sb.externalTable = b
return sb
}

func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
sb.appendOnly = b
return sb
Expand Down Expand Up @@ -90,7 +96,13 @@ func (sb *StreamBuilder) Create() string {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE STREAM %v`, sb.QualifiedName()))

q.WriteString(fmt.Sprintf(` ON TABLE %v`, sb.onTable))
q.WriteString(` ON`)

if sb.externalTable {
q.WriteString(` EXTERNAL`)
}

q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable))

if sb.comment != "" {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment)))
Expand Down
3 changes: 3 additions & 0 deletions pkg/snowflake/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func TestStreamCreate(t *testing.T) {

s.WithInsertOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)

s.WithExternalTable(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON EXTERNAL TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)
}

func TestStreamChangeComment(t *testing.T) {
Expand Down