diff --git a/docs/resources/stream.md b/docs/resources/stream.md index c32d9b11ca..2848dc970e 100644 --- a/docs/resources/stream.md +++ b/docs/resources/stream.md @@ -43,6 +43,7 @@ resource "snowflake_stream" "stream" { - `comment` (String) Specifies a comment for the stream. - `insert_only` (Boolean) Create an insert only stream type. - `on_table` (String) Name of the table the stream will monitor. +- `on_view` (String) Name of the view the stream will monitor. - `show_initial_rows` (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed. ### Read-Only diff --git a/pkg/resources/external_table_internal_test.go b/pkg/resources/external_table_internal_test.go index 5374abfe98..57b6a9dab5 100644 --- a/pkg/resources/external_table_internal_test.go +++ b/pkg/resources/external_table_internal_test.go @@ -19,23 +19,23 @@ func ExternalTestTableIDFromString(t *testing.T) { // Bad ID -- not enough fields id = "database" - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("3 fields allowed"), err) // Bad ID id = "||" - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.NoError(err) // 0 lines id = "" - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("1 line at a time"), err) // 2 lines id = `database_name|schema_name|table database_name|schema_name|table` - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("1 line at a time"), err) } @@ -65,8 +65,8 @@ func ExternalTestTableStruct(t *testing.T) { } sID, err = table.String() r.NoError(err) - newTable, err := streamOnTableIDFromString(sID) + newTable, err := streamOnObjectIDFromString(sID) r.NoError(err) r.Equal("database|name", newTable.DatabaseName) - r.Equal("table|name", newTable.OnTableName) + r.Equal("table|name", newTable.Name) } diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go index df804a542f..abee8d722a 100644 --- a/pkg/resources/stream.go +++ b/pkg/resources/stream.go @@ -14,8 +14,8 @@ import ( ) const ( - streamIDDelimiter = '|' - streamOnTableIDDelimiter = '.' + streamIDDelimiter = '|' + streamOnObjectIDDelimiter = '.' ) var streamSchema = map[string]*schema.Schema{ @@ -43,10 +43,18 @@ var streamSchema = map[string]*schema.Schema{ Description: "Specifies a comment for the stream.", }, "on_table": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - Description: "Name of the table the stream will monitor.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: "Name of the table the stream will monitor.", + ExactlyOneOf: []string{"on_table", "on_view"}, + }, + "on_view": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: "Name of the view the stream will monitor.", + ExactlyOneOf: []string{"on_table", "on_view"}, }, "append_only": { Type: schema.TypeBool, @@ -96,10 +104,10 @@ type streamID struct { StreamName string } -type streamOnTableID struct { +type streamOnObjectID struct { DatabaseName string SchemaName string - OnTableName string + Name string } //String() takes in a streamID object and returns a pipe-delimited string: @@ -142,11 +150,11 @@ func streamIDFromString(stringID string) (*streamID, error) { return streamResult, nil } -// streamOnTableIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName -// and returns a streamOnTableID object -func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) { +// streamOnObjectIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName +// and returns a streamOnObjectID object +func streamOnObjectIDFromString(stringID string) (*streamOnObjectID, error) { reader := csv.NewReader(strings.NewReader(stringID)) - reader.Comma = streamOnTableIDDelimiter + reader.Comma = streamOnObjectIDDelimiter lines, err := reader.ReadAll() if err != nil { return nil, fmt.Errorf("Not CSV compatible") @@ -160,10 +168,10 @@ func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) { return nil, fmt.Errorf("invalid format for on_table: %v , expected: ", strings.Join(lines[0], ".")) } - streamOnTableResult := &streamOnTableID{ + streamOnTableResult := &streamOnObjectID{ DatabaseName: lines[0][0], SchemaName: lines[0][1], - OnTableName: lines[0][2], + Name: lines[0][2], } return streamOnTableResult, nil } @@ -174,28 +182,50 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { database := d.Get("database").(string) schema := d.Get("schema").(string) name := d.Get("name").(string) - onTable := d.Get("on_table").(string) appendOnly := d.Get("append_only").(bool) insertOnly := d.Get("insert_only").(bool) showInitialRows := d.Get("show_initial_rows").(bool) builder := snowflake.Stream(name, database, schema) - resultOnTable, err := streamOnTableIDFromString(onTable) - if err != nil { - return err - } + onTable, onTableSet := d.GetOk("on_table") + onView, onViewSet := d.GetOk("on_view") - tq := snowflake.Table(resultOnTable.OnTableName, resultOnTable.DatabaseName, resultOnTable.SchemaName).Show() - tableRow := snowflake.QueryRow(db, tq) + if (onTableSet && onViewSet) || !(onTableSet || onViewSet) { + return fmt.Errorf("exactly one of 'on_table' or 'on_view' expected") + } else if onTableSet { + id, err := streamOnObjectIDFromString(onTable.(string)) + if err != nil { + return err + } - t, err := snowflake.ScanTable(tableRow) - if err != nil { - return err + tq := snowflake.Table(id.Name, id.DatabaseName, id.SchemaName).Show() + tableRow := snowflake.QueryRow(db, tq) + + t, err := snowflake.ScanTable(tableRow) + if err != nil { + return err + } + + builder.WithExternalTable(t.IsExternal.String == "Y") + builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String) + } else if onViewSet { + id, err := streamOnObjectIDFromString(onView.(string)) + if err != nil { + return err + } + + tq := snowflake.View(id.Name).WithDB(id.DatabaseName).WithSchema(id.SchemaName).Show() + viewRow := snowflake.QueryRow(db, tq) + + t, err := snowflake.ScanView(viewRow) + if err != nil { + return err + } + + builder.WithOnView(t.DatabaseName.String, t.SchemaName.String, t.Name.String) } - builder.WithExternalTable(t.IsExternal.String == "Y") - builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String) builder.WithAppendOnly(appendOnly) builder.WithInsertOnly(insertOnly) builder.WithShowInitialRows(showInitialRows) @@ -206,7 +236,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { } stmt := builder.Create() - err = snowflake.Exec(db, stmt) + err := snowflake.Exec(db, stmt) if err != nil { return errors.Wrapf(err, "error creating stream %v", name) } @@ -270,6 +300,11 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error { return err } + err = d.Set("on_view", stream.ViewName.String) + if err != nil { + return err + } + err = d.Set("append_only", stream.Mode.String == "APPEND_ONLY") if err != nil { return err diff --git a/pkg/resources/stream_acceptance_test.go b/pkg/resources/stream_acceptance_test.go index af64fe1815..74c24fbc8f 100644 --- a/pkg/resources/stream_acceptance_test.go +++ b/pkg/resources/stream_acceptance_test.go @@ -73,6 +73,19 @@ func TestAcc_Stream(t *testing.T) { checkBool("snowflake_stream.test_stream", "show_initial_rows", false), ), }, + { + Config: viewStreamConfig(accName, false), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accName), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accName), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accName), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_view", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_VIEW")), + resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"), + checkBool("snowflake_stream.test_stream", "append_only", false), + checkBool("snowflake_stream.test_stream", "insert_only", false), + checkBool("snowflake_stream.test_stream", "show_initial_rows", false), + ), + }, { ResourceName: "snowflake_stream.test_stream", ImportState: true, @@ -192,3 +205,58 @@ resource "snowflake_stream" "test_external_table_stream" { return fmt.Sprintf(s, name, name, name, name, locations, name, insert_only_config) } + +func viewStreamConfig(name string, append_only bool) string { + append_only_config := "" + if append_only { + append_only_config = "append_only = true" + } + + s := ` +resource "snowflake_database" "test_database" { + name = "%s" + comment = "Terraform acceptance test" +} + +resource "snowflake_schema" "test_schema" { + name = "%s" + database = snowflake_database.test_database.name + comment = "Terraform acceptance test" +} + +resource "snowflake_table" "test_stream_on_view" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "STREAM_ON_VIEW_TABLE" + comment = "Terraform acceptance test" + change_tracking = true + + column { + name = "column1" + type = "VARIANT" + } + column { + name = "column2" + type = "VARCHAR(16777216)" + } +} + +resource "snowflake_view" "test_stream_on_view" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "STREAM_ON_VIEW" + + statement = "select * from ${snowflake_table.test_stream_on_view.name}" +} + +resource "snowflake_stream" "test_stream" { + database = snowflake_database.test_database.name + schema = snowflake_schema.test_schema.name + name = "%s" + comment = "Terraform acceptance test" + on_view = "${snowflake_database.test_database.name}.${snowflake_schema.test_schema.name}.${snowflake_view.test_stream_on_view.name}" + %s +} +` + return fmt.Sprintf(s, name, name, name, append_only_config) +} diff --git a/pkg/resources/stream_internal_test.go b/pkg/resources/stream_internal_test.go index 5d76a56d2a..0b7925cf6d 100644 --- a/pkg/resources/stream_internal_test.go +++ b/pkg/resources/stream_internal_test.go @@ -75,30 +75,30 @@ func TestStreamOnTableIDFromString(t *testing.T) { r := require.New(t) // Vanilla id := "database_name.schema_name.target_table_name" - streamOnTable, err := streamOnTableIDFromString(id) + streamOnTable, err := streamOnObjectIDFromString(id) r.NoError(err) r.Equal("database_name", streamOnTable.DatabaseName) r.Equal("schema_name", streamOnTable.SchemaName) - r.Equal("target_table_name", streamOnTable.OnTableName) + r.Equal("target_table_name", streamOnTable.Name) // Bad ID -- not enough fields id = "database.schema" - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("invalid format for on_table: database.schema , expected: "), err) // Bad ID id = ".." - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.NoError(err) // 0 lines id = "" - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("1 line at a time"), err) // 2 lines id = `database_name.schema_name.target_table_name database_name.schema_name.target_table_name` - _, err = streamOnTableIDFromString(id) + _, err = streamOnObjectIDFromString(id) r.Equal(fmt.Errorf("1 line at a time"), err) } diff --git a/pkg/resources/stream_test.go b/pkg/resources/stream_test.go index 34e5d8be6e..7ea7d23d30 100644 --- a/pkg/resources/stream_test.go +++ b/pkg/resources/stream_test.go @@ -68,6 +68,53 @@ func TestStreamCreateOnExternalTable(t *testing.T) { }) } +func TestStreamCreateOnView(t *testing.T) { + r := require.New(t) + + in := map[string]interface{}{ + "name": "stream_name", + "database": "database_name", + "schema": "schema_name", + "comment": "great comment", + "on_view": "target_db.target_schema.target_view", + "append_only": true, + "insert_only": false, + "show_initial_rows": true, + } + d := stream(t, "database_name|schema_name|stream_name", in) + + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON VIEW "target_db"."target_schema"."target_view" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1)) + expectStreamRead(mock) + expectOnViewRead(mock) + err := resources.CreateStream(d, db) + r.NoError(err) + r.Equal("stream_name", d.Get("name").(string)) + }) +} + +func TestStreamOnTableOrView(t *testing.T) { + r := require.New(t) + + in := map[string]interface{}{ + "name": "stream_name", + "database": "database_name", + "schema": "schema_name", + "comment": "great comment", + "on_table": "target_db.target_schema.target_table", + "on_view": "target_db.target_schema.target_view", + "append_only": true, + "insert_only": false, + "show_initial_rows": true, + } + d := stream(t, "database_name|schema_name|stream_name", in) + + WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { + err := resources.CreateStream(d, db) + r.ErrorContains(err, "exactly one of") + }) +} + func expectStreamRead(mock sqlmock.Sqlmock) { rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY") mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN SCHEMA "database_name"."schema_name"`).WillReturnRows(rows) @@ -83,6 +130,11 @@ func expectOnExternalTableRead(mock sqlmock.Sqlmock) { mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows) } +func expectOnViewRead(mock sqlmock.Sqlmock) { + rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_view", "target_db", "target_schema", "VIEW", "mock comment", "", "", "", "", 1, "OFF", "OFF", "Y") + mock.ExpectQuery(`SHOW VIEWS LIKE 'target_view' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows) +} + func TestStreamRead(t *testing.T) { r := require.New(t) diff --git a/pkg/snowflake/stream.go b/pkg/snowflake/stream.go index 59728aa2fd..28d6cef31e 100644 --- a/pkg/snowflake/stream.go +++ b/pkg/snowflake/stream.go @@ -17,6 +17,7 @@ type StreamBuilder struct { schema string externalTable bool onTable string + onView string appendOnly bool insertOnly bool showInitialRows bool @@ -59,6 +60,11 @@ func (sb *StreamBuilder) WithExternalTable(b bool) *StreamBuilder { return sb } +func (sb *StreamBuilder) WithOnView(d string, s string, t string) *StreamBuilder { + sb.onView = fmt.Sprintf(`"%v"."%v"."%v"`, d, s, t) + return sb +} + func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder { sb.appendOnly = b return sb @@ -98,11 +104,15 @@ func (sb *StreamBuilder) Create() string { q.WriteString(` ON`) - if sb.externalTable { - q.WriteString(` EXTERNAL`) - } + if sb.onTable != "" { + if sb.externalTable { + q.WriteString(` EXTERNAL`) + } - q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable)) + q.WriteString(fmt.Sprintf(` TABLE %v`, sb.onTable)) + } else if sb.onView != "" { + q.WriteString(fmt.Sprintf(` VIEW %v`, sb.onView)) + } if sb.comment != "" { q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(sb.comment))) @@ -146,6 +156,7 @@ type descStreamRow struct { Comment sql.NullString `db:"comment"` ShowInitialRows bool `db:"show_initial_rows"` TableName sql.NullString `db:"table_name"` + ViewName sql.NullString `db:"view_name"` Type sql.NullString `db:"type"` Stale sql.NullString `db:"stale"` Mode sql.NullString `db:"mode"`