Skip to content

Commit

Permalink
feat: Add SHOW_INITIAL_ROWS to stream resource (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
alldoami authored Jun 17, 2021
1 parent 3954741 commit 3963193
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ resource snowflake_stream stream {
- **comment** (String) Specifies a comment for the stream.
- **id** (String) The ID of this resource.
- **on_table** (String) Name of the table the stream will monitor.
- **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

### Read-Only

Expand Down
31 changes: 31 additions & 0 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,23 @@ var streamSchema = map[string]*schema.Schema{
"on_table": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the table the stream will monitor.",
},
"append_only": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: false,
Description: "Type of the stream that will be created.",
},
"show_initial_rows": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: false,
Description: "Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.",
},
"owner": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -160,6 +169,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
appendOnly := d.Get("append_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)

builder := snowflake.Stream(name, database, schema)

Expand All @@ -170,6 +180,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {

builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
builder.WithAppendOnly(appendOnly)
builder.WithShowInitialRows(showInitialRows)

// Set optionals
if v, ok := d.GetOk("comment"); ok {
Expand Down Expand Up @@ -226,6 +237,26 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("on_table", stream.TableName.String)
if err != nil {
return err
}

err = d.Set("append_only", stream.AppendOnly)
if err != nil {
return err
}

err = d.Set("show_initial_rows", stream.ShowInitialRows)
if err != nil {
return err
}

err = d.Set("comment", stream.Comment.String)
if err != nil {
return err
}

err = d.Set("owner", stream.Owner.String)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ func TestAcc_Stream(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_TABLE")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", true),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
},
Expand Down Expand Up @@ -64,8 +65,6 @@ resource "snowflake_stream" "test_stream" {
name = "%s"
comment = "Terraform acceptance test"
on_table = "${snowflake_database.test_database.name}.${snowflake_schema.test_schema.name}.${snowflake_table.test_stream_on_table.name}"
append_only = true
}
`
return fmt.Sprintf(s, name, name, name)
Expand Down
5 changes: 2 additions & 3 deletions pkg/resources/stream_grant_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ resource "snowflake_stream" "test" {
name = "{{ .stream_name }}"
comment = "Terraform acceptance test"
on_table = "${snowflake_database.test.name}.${snowflake_schema.test.name}.${snowflake_table.test.name}"
append_only = true
}
resource "snowflake_stream_grant" "test" {
database_name = snowflake_database.test.name
database_name = snowflake_database.test.name
roles = [snowflake_role.test.name]
schema_name = snowflake_schema.test.name
stream_name = snowflake_stream.test.name
Expand Down Expand Up @@ -140,7 +139,7 @@ resource "snowflake_role" "test" {
}
resource "snowflake_stream_grant" "test" {
database_name = snowflake_database.test.name
database_name = snowflake_database.test.name
roles = [snowflake_role.test.name]
schema_name = snowflake_schema.test.name
on_future = true
Expand Down
19 changes: 10 additions & 9 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ func TestStreamCreate(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"append_only": true,
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"append_only": true,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true`).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
Expand All @@ -48,14 +49,14 @@ func expectStreamRead(mock sqlmock.Sqlmock) {
func TestStreamRead(t *testing.T) {
r := require.New(t)

d := stream(t, "database_name|schema_name|stream_name", map[string]interface{}{"name": "stream_name", "comment": "mock comment"})
d := stream(t, "database_name|schema_name|stream_name", map[string]interface{}{"name": "stream_name", "comment": "grand comment"})

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
expectStreamRead(mock)
err := resources.ReadStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
r.Equal("mock comment", d.Get("comment").(string))
r.Equal("grand comment", d.Get("comment").(string))

// Test when resource is not found, checking if state will be empty
r.NotEmpty(d.State())
Expand Down
47 changes: 26 additions & 21 deletions pkg/snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (

// StreamBuilder abstracts the creation of SQL queries for a Snowflake stream
type StreamBuilder struct {
name string
db string
schema string
onTable string
appendOnly bool
comment string
name string
db string
schema string
onTable string
appendOnly bool
showInitialRows bool
comment string
}

// QualifiedName prepends the db and schema if set and escapes everything nicely
Expand Down Expand Up @@ -50,12 +51,12 @@ func (sb *StreamBuilder) WithOnTable(d string, s string, t string) *StreamBuilde
}

func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
sb.appendOnly = false

if b {
sb.appendOnly = b
}
sb.appendOnly = b
return sb
}

func (sb *StreamBuilder) WithShowInitialRows(b bool) *StreamBuilder {
sb.showInitialRows = b
return sb
}

Expand Down Expand Up @@ -89,6 +90,8 @@ func (sb *StreamBuilder) Create() string {

q.WriteString(fmt.Sprintf(` APPEND_ONLY = %v`, sb.appendOnly))

q.WriteString(fmt.Sprintf(` SHOW_INITIAL_ROWS = %v`, sb.showInitialRows))

return q.String()
}

Expand All @@ -113,16 +116,18 @@ func (sb *StreamBuilder) Show() string {
}

type descStreamRow struct {
CreatedOn sql.NullString `db:"created_on"`
StreamName sql.NullString `db:"name"`
DatabaseName sql.NullString `db:"database_name"`
SchemaName sql.NullString `db:"schema_name"`
Owner sql.NullString `db:"owner"`
Comment sql.NullString `db:"comment"`
TableName sql.NullString `db:"table_name"`
Type sql.NullString `db:"type"`
Stale sql.NullString `db:"stale"`
Mode sql.NullString `db:"mode"`
CreatedOn sql.NullString `db:"created_on"`
StreamName sql.NullString `db:"name"`
DatabaseName sql.NullString `db:"database_name"`
SchemaName sql.NullString `db:"schema_name"`
Owner sql.NullString `db:"owner"`
Comment sql.NullString `db:"comment"`
AppendOnly bool `db:"append_only"`
ShowInitialRows bool `db:"show_initial_rows"`
TableName sql.NullString `db:"table_name"`
Type sql.NullString `db:"type"`
Stale sql.NullString `db:"stale"`
Mode sql.NullString `db:"mode"`
}

func ScanStream(row *sqlx.Row) (*descStreamRow, error) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/snowflake/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ func TestStreamCreate(t *testing.T) {
s := Stream("test_stream", "test_db", "test_schema")

s.WithOnTable("test_db", "test_schema", "test_target_table")
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)

s.WithComment("Test Comment")
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)

s.WithShowInitialRows(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = true`)

s.WithAppendOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`)
}

func TestStreamChangeComment(t *testing.T) {
Expand Down

0 comments on commit 3963193

Please sign in to comment.