Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/table clustering #548

Merged
merged 15 commits into from
May 21, 2021
20 changes: 14 additions & 6 deletions docs/resources/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ description: |-

```terraform
resource snowflake_table table {
database = "database"
schema = "schmea"
name = "table"
comment = "A table."
owner = "me"

database = "database"
schema = "schmea"
name = "table"
comment = "A table."
cluster_by = ["to_date(DATE)"]

owner = "me"

column {
name = "id"
type = "int"
Expand All @@ -29,6 +31,11 @@ resource snowflake_table table {
name = "data"
type = "text"
}

column {
name = "DATE"
type = "TIMESTAMP_NTZ(9)"
}
}
```

Expand All @@ -44,6 +51,7 @@ resource snowflake_table table {

### Optional

- **cluster_by** (List of String) A list of one of more table columns/expressions to be used as clustering key(s) for the table
- **comment** (String) Specifies a comment for the table.
- **id** (String) The ID of this resource.

Expand Down
19 changes: 13 additions & 6 deletions examples/resources/snowflake_table/resource.tf
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
resource snowflake_table table {
database = "database"
schema = "schmea"
name = "table"
comment = "A table."
owner = "me"

database = "database"
schema = "schmea"
name = "table"
comment = "A table."
cluster_by = ["to_date(DATE)"]

owner = "me"

column {
name = "id"
type = "int"
Expand All @@ -14,4 +16,9 @@ resource snowflake_table table {
name = "data"
type = "text"
}

column {
name = "DATE"
type = "TIMESTAMP_NTZ(9)"
}
}
40 changes: 34 additions & 6 deletions pkg/resources/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var tableSchema = map[string]*schema.Schema{
ForceNew: true,
Description: "The database in which to create the table.",
},
"cluster_by": {
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
Description: "A list of one of more table columns/expressions to be used as clustering key(s) for the table",
},
"column": {
Type: schema.TypeList,
Required: true,
Expand Down Expand Up @@ -213,6 +219,10 @@ func CreateTable(d *schema.ResourceData, meta interface{}) error {
builder.WithComment(v.(string))
}

if v, ok := d.GetOk("cluster_by"); ok {
builder.WithClustering(expandStringList(v.([]interface{})))
}

stmt := builder.Create()
err := snowflake.Exec(db, stmt)
if err != nil {
Expand Down Expand Up @@ -267,12 +277,13 @@ func ReadTable(d *schema.ResourceData, meta interface{}) error {

// Set the relevant data in the state
toSet := map[string]interface{}{
"name": table.TableName.String,
"owner": table.Owner.String,
"database": tableID.DatabaseName,
"schema": tableID.SchemaName,
"comment": table.Comment.String,
"column": snowflake.NewColumns(tableDescription).Flatten(),
"name": table.TableName.String,
"owner": table.Owner.String,
"database": tableID.DatabaseName,
"schema": tableID.SchemaName,
"comment": table.Comment.String,
"column": snowflake.NewColumns(tableDescription).Flatten(),
"cluster_by": snowflake.ClusterStatementToList(table.ClusterBy.String),
}

for key, val := range toSet {
Expand Down Expand Up @@ -306,6 +317,23 @@ func UpdateTable(d *schema.ResourceData, meta interface{}) error {
return errors.Wrapf(err, "error updating table comment on %v", d.Id())
}
}

if d.HasChange("cluster_by") {
cb := expandStringList(d.Get("cluster_by").([]interface{}))

var q string
if len(cb) != 0 {
builder.WithClustering(cb)
q = builder.ChangeClusterBy(builder.GetClusterKeyString())
} else {
q = builder.DropClustering()
}

err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating table clustering on %v", d.Id())
}
}
if d.HasChange("column") {
old, new := d.GetChange("column")
removed, added, changed := getColumns(old).diffs(getColumns(new))
Expand Down
145 changes: 145 additions & 0 deletions pkg/resources/table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
func TestAcc_Table(t *testing.T) {
accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

table2Name := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha))

resource.ParallelTest(t, resource.TestCase{
Providers: providers(),
Steps: []resource.TestStep{
Expand Down Expand Up @@ -41,6 +43,53 @@ func TestAcc_Table(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.name", "column3"),
resource.TestCheckResourceAttr("snowflake_table.test_table", "column.1.type", "FLOAT"),
resource.TestCheckNoResourceAttr("snowflake_table.test_table", "cluster_by"),
),
},
{
Config: tableConfig3(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.type", "FLOAT"),
),
},
{
Config: tableConfig4(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.1", "\"col2\""),
),
},
{
Config: tableConfig5(accName, table2Name),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_table.test_table2", "name", table2Name),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "database", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "schema", accName),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "comment", "Terraform acceptance test"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.name", "COL1"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.0.type", "VARCHAR(16777216)"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "column.1.name", "col2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.#", "2"),
resource.TestCheckResourceAttr("snowflake_table.test_table2", "cluster_by.0", "\"col2\""),
),
},
},
Expand Down Expand Up @@ -108,3 +157,99 @@ resource "snowflake_table" "test_table" {
`
return fmt.Sprintf(s, name, name, name)
}

func tableConfig3(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["COL1"]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}

func tableConfig4(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["COL1","\"col2\""]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}

func tableConfig5(name string, table2Name string) string {
s := `
resource "snowflake_database" "test_database" {
name = "%s"
comment = "Terraform acceptance test"
}

resource "snowflake_schema" "test_schema" {
name = "%s"
database = snowflake_database.test_database.name
comment = "Terraform acceptance test"
}

resource "snowflake_table" "test_table2" {
database = snowflake_database.test_database.name
schema = snowflake_schema.test_schema.name
name = "%s"
comment = "Terraform acceptance test"
cluster_by = ["\"col2\"","COL1"]
column {
name = "COL1"
type = "VARCHAR(16777216)"
}
column {
name = "col2"
type = "FLOAT"
}
}
`
return fmt.Sprintf(s, name, name, table2Name)
}
58 changes: 53 additions & 5 deletions pkg/snowflake/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ func (c Columns) getColumnDefinitions() string {

// TableBuilder abstracts the creation of SQL queries for a Snowflake schema
type TableBuilder struct {
name string
db string
schema string
columns Columns
comment string
name string
db string
schema string
columns Columns
comment string
clusterBy []string
}

// QualifiedName prepends the db and schema if set and escapes everything nicely
Expand Down Expand Up @@ -111,6 +112,37 @@ func (tb *TableBuilder) WithColumns(c Columns) *TableBuilder {
return tb
}

// WithClustering adds cluster keys/expressions to TableBuilder
func (tb *TableBuilder) WithClustering(c []string) *TableBuilder {
tb.clusterBy = c
return tb
}

//Function to get clustering definition
func (tb *TableBuilder) GetClusterKeyString() string {

return fmt.Sprint(strings.Join(tb.clusterBy[:], ", "))
}

//function to take the literal snowflake cluster statement returned from SHOW TABLES and convert it to a list of keys.
func ClusterStatementToList(clusterStatement string) []string {
if clusterStatement == "" {
return nil
}

cleanStatement := strings.TrimSuffix(strings.Replace(clusterStatement, "LINEAR(", "", 1), ")")
// remove cluster statement and trailing parenthesis

var clean []string

for _, s := range strings.Split(cleanStatement, ",") {
clean = append(clean, strings.TrimSpace(s))
}

return clean

}

// Table returns a pointer to a Builder that abstracts the DDL operations for a table.
//
// Supported DDL operations are:
Expand Down Expand Up @@ -152,9 +184,20 @@ func (tb *TableBuilder) Create() string {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment)))
}

if tb.clusterBy != nil {
//add optional clustering statement
q.WriteString(fmt.Sprintf(` CLUSTER BY LINEAR(%v)`, tb.GetClusterKeyString()))

}

return q.String()
}

// ChangeClusterBy returns the SQL query to change cluastering on table
func (tb *TableBuilder) ChangeClusterBy(cb string) string {
return fmt.Sprintf(`ALTER TABLE %v CLUSTER BY LINEAR(%v)`, tb.QualifiedName(), cb)
}

// ChangeComment returns the SQL query that will update the comment on the table.
func (tb *TableBuilder) ChangeComment(c string) string {
return fmt.Sprintf(`ALTER TABLE %v SET COMMENT = '%v'`, tb.QualifiedName(), EscapeString(c))
Expand Down Expand Up @@ -188,6 +231,11 @@ func (tb *TableBuilder) RemoveComment() string {
return fmt.Sprintf(`ALTER TABLE %v UNSET COMMENT`, tb.QualifiedName())
}

// RemoveClustering returns the SQL query that will remove data clustering from the table
func (tb *TableBuilder) DropClustering() string {
return fmt.Sprintf(`ALTER TABLE %v DROP CLUSTERING KEY`, tb.QualifiedName())
}

// Drop returns the SQL query that will drop a table.
func (tb *TableBuilder) Drop() string {
return fmt.Sprintf(`DROP TABLE %v`, tb.QualifiedName())
Expand Down
Loading