Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Streams on views #1112

Merged
merged 4 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ resource "snowflake_stream" "stream" {
- `comment` (String) Specifies a comment for the stream.
- `insert_only` (Boolean) Create an insert only stream type.
- `on_table` (String) Name of the table the stream will monitor.
- `on_view` (String) Name of the view the stream will monitor.
- `show_initial_rows` (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

### Read-Only
Expand Down
12 changes: 6 additions & 6 deletions pkg/resources/external_table_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ func ExternalTestTableIDFromString(t *testing.T) {

// Bad ID -- not enough fields
id = "database"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("3 fields allowed"), err)

// Bad ID
id = "||"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.NoError(err)

// 0 lines
id = ""
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)

// 2 lines
id = `database_name|schema_name|table
database_name|schema_name|table`
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func ExternalTestTableStruct(t *testing.T) {
}
sID, err = table.String()
r.NoError(err)
newTable, err := streamOnTableIDFromString(sID)
newTable, err := streamOnObjectIDFromString(sID)
r.NoError(err)
r.Equal("database|name", newTable.DatabaseName)
r.Equal("table|name", newTable.OnTableName)
r.Equal("table|name", newTable.Name)
}
89 changes: 62 additions & 27 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
streamIDDelimiter = '|'
streamOnTableIDDelimiter = '.'
streamIDDelimiter = '|'
streamOnObjectIDDelimiter = '.'
)

var streamSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -43,10 +43,18 @@ var streamSchema = map[string]*schema.Schema{
Description: "Specifies a comment for the stream.",
},
"on_table": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the table the stream will monitor.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the table the stream will monitor.",
ExactlyOneOf: []string{"on_table", "on_view"},
},
"on_view": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Name of the view the stream will monitor.",
ExactlyOneOf: []string{"on_table", "on_view"},
},
"append_only": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -96,10 +104,10 @@ type streamID struct {
StreamName string
}

type streamOnTableID struct {
type streamOnObjectID struct {
DatabaseName string
SchemaName string
OnTableName string
Name string
}

//String() takes in a streamID object and returns a pipe-delimited string:
Expand Down Expand Up @@ -142,11 +150,11 @@ func streamIDFromString(stringID string) (*streamID, error) {
return streamResult, nil
}

// streamOnTableIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName
// and returns a streamOnTableID object
func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) {
// streamOnObjectIDFromString() takes in a dot-delimited string: DatabaseName.SchemaName.TableName
// and returns a streamOnObjectID object
func streamOnObjectIDFromString(stringID string) (*streamOnObjectID, error) {
reader := csv.NewReader(strings.NewReader(stringID))
reader.Comma = streamOnTableIDDelimiter
reader.Comma = streamOnObjectIDDelimiter
lines, err := reader.ReadAll()
if err != nil {
return nil, fmt.Errorf("Not CSV compatible")
Expand All @@ -160,10 +168,10 @@ func streamOnTableIDFromString(stringID string) (*streamOnTableID, error) {
return nil, fmt.Errorf("invalid format for on_table: %v , expected: <database_name.schema_name.target_table_name>", strings.Join(lines[0], "."))
}

streamOnTableResult := &streamOnTableID{
streamOnTableResult := &streamOnObjectID{
DatabaseName: lines[0][0],
SchemaName: lines[0][1],
OnTableName: lines[0][2],
Name: lines[0][2],
}
return streamOnTableResult, nil
}
Expand All @@ -174,28 +182,50 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
database := d.Get("database").(string)
schema := d.Get("schema").(string)
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
appendOnly := d.Get("append_only").(bool)
insertOnly := d.Get("insert_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)

builder := snowflake.Stream(name, database, schema)

resultOnTable, err := streamOnTableIDFromString(onTable)
if err != nil {
return err
}
onTable, onTableSet := d.GetOk("on_table")
onView, onViewSet := d.GetOk("on_view")

tq := snowflake.Table(resultOnTable.OnTableName, resultOnTable.DatabaseName, resultOnTable.SchemaName).Show()
tableRow := snowflake.QueryRow(db, tq)
if (onTableSet && onViewSet) || !(onTableSet || onViewSet) {
return fmt.Errorf("exactly one of 'on_table' or 'on_view' expected")
} else if onTableSet {
id, err := streamOnObjectIDFromString(onTable.(string))
if err != nil {
return err
}

t, err := snowflake.ScanTable(tableRow)
if err != nil {
return err
tq := snowflake.Table(id.Name, id.DatabaseName, id.SchemaName).Show()
tableRow := snowflake.QueryRow(db, tq)

t, err := snowflake.ScanTable(tableRow)
if err != nil {
return err
}

builder.WithExternalTable(t.IsExternal.String == "Y")
builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String)
} else if onViewSet {
id, err := streamOnObjectIDFromString(onView.(string))
if err != nil {
return err
}

tq := snowflake.View(id.Name).WithDB(id.DatabaseName).WithSchema(id.SchemaName).Show()
viewRow := snowflake.QueryRow(db, tq)

t, err := snowflake.ScanView(viewRow)
if err != nil {
return err
}

builder.WithOnView(t.DatabaseName.String, t.SchemaName.String, t.Name.String)
}

builder.WithExternalTable(t.IsExternal.String == "Y")
builder.WithOnTable(t.DatabaseName.String, t.SchemaName.String, t.TableName.String)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
builder.WithShowInitialRows(showInitialRows)
Expand All @@ -206,7 +236,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
}

stmt := builder.Create()
err = snowflake.Exec(db, stmt)
err := snowflake.Exec(db, stmt)
if err != nil {
return errors.Wrapf(err, "error creating stream %v", name)
}
Expand Down Expand Up @@ -270,6 +300,11 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("on_view", stream.ViewName.String)
if err != nil {
return err
}

err = d.Set("append_only", stream.Mode.String == "APPEND_ONLY")
if err != nil {
return err
Expand Down
68 changes: 68 additions & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ func TestAcc_Stream(t *testing.T) {
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
Config: viewStreamConfig(accName, false),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "name", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "database", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "schema", accName),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_view", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_VIEW")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "insert_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
{
ResourceName: "snowflake_stream.test_stream",
ImportState: true,
Expand Down Expand Up @@ -192,3 +205,58 @@ resource "snowflake_stream" "test_external_table_stream" {

return fmt.Sprintf(s, name, name, name, name, locations, name, insert_only_config)
}

func viewStreamConfig(name string, append_only bool) string {
append_only_config := ""
if append_only {
append_only_config = "append_only = true"
}

s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_stream_on_view" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "STREAM_ON_VIEW_TABLE"
comment = "Terraform acceptance test"
change_tracking = true

column {
name = "column1"
type = "VARIANT"
}
column {
name = "column2"
type = "VARCHAR(16777216)"
}
}

resource "snowflake_view" "test_stream_on_view" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "STREAM_ON_VIEW"

statement = "select * from ${snowflake_table.test_stream_on_view.name}"
}

resource "snowflake_stream" "test_stream" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
on_view = "${snowflake_database.test_database.name}.${snowflake_schema.test_schema.name}.${snowflake_view.test_stream_on_view.name}"
%s
}
`
return fmt.Sprintf(s, name, name, name, append_only_config)
}
12 changes: 6 additions & 6 deletions pkg/resources/stream_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,30 @@ func TestStreamOnTableIDFromString(t *testing.T) {
r := require.New(t)
// Vanilla
id := "database_name.schema_name.target_table_name"
streamOnTable, err := streamOnTableIDFromString(id)
streamOnTable, err := streamOnObjectIDFromString(id)
r.NoError(err)
r.Equal("database_name", streamOnTable.DatabaseName)
r.Equal("schema_name", streamOnTable.SchemaName)
r.Equal("target_table_name", streamOnTable.OnTableName)
r.Equal("target_table_name", streamOnTable.Name)

// Bad ID -- not enough fields
id = "database.schema"
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("invalid format for on_table: database.schema , expected: <database_name.schema_name.target_table_name>"), err)

// Bad ID
id = ".."
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.NoError(err)

// 0 lines
id = ""
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)

// 2 lines
id = `database_name.schema_name.target_table_name
database_name.schema_name.target_table_name`
_, err = streamOnTableIDFromString(id)
_, err = streamOnObjectIDFromString(id)
r.Equal(fmt.Errorf("1 line at a time"), err)
}
52 changes: 52 additions & 0 deletions pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,53 @@ func TestStreamCreateOnExternalTable(t *testing.T) {
})
}

func TestStreamCreateOnView(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_view": "target_db.target_schema.target_view",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON VIEW "target_db"."target_schema"."target_view" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
expectOnViewRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
r.Equal("stream_name", d.Get("name").(string))
})
}

func TestStreamOnTableOrView(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "stream_name",
"database": "database_name",
"schema": "schema_name",
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"on_view": "target_db.target_schema.target_view",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
err := resources.CreateStream(d, db)
r.ErrorContains(err, "exactly one of")
})
}

func expectStreamRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"name", "database_name", "schema_name", "owner", "comment", "table_name", "type", "stale", "mode"}).AddRow("stream_name", "database_name", "schema_name", "owner_name", "grand comment", "target_table", "DELTA", false, "APPEND_ONLY")
mock.ExpectQuery(`SHOW STREAMS LIKE 'stream_name' IN SCHEMA "database_name"."schema_name"`).WillReturnRows(rows)
Expand All @@ -83,6 +130,11 @@ func expectOnExternalTableRead(mock sqlmock.Sqlmock) {
mock.ExpectQuery(`SHOW TABLES LIKE 'target_table' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func expectOnViewRead(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{"created_on", "name", "database_name", "schema_name", "kind", "comment", "cluster_by", "row", "bytes", "owner", "retention_time", "automatic_clustering", "change_tracking", "is_external"}).AddRow("", "target_view", "target_db", "target_schema", "VIEW", "mock comment", "", "", "", "", 1, "OFF", "OFF", "Y")
mock.ExpectQuery(`SHOW VIEWS LIKE 'target_view' IN SCHEMA "target_db"."target_schema"`).WillReturnRows(rows)
}

func TestStreamRead(t *testing.T) {
r := require.New(t)

Expand Down
Loading