From 4c82363b86cc9c93697c23fc75eb2a21987d75a8 Mon Sep 17 00:00:00 2001 From: Miguel Lorek Date: Thu, 20 Oct 2022 10:36:28 +0100 Subject: [PATCH 1/2] task with allow_overlapping_execution option --- examples/resources/snowflake_task/resource.tf | 13 ++++ pkg/resources/task.go | 41 ++++++++++-- pkg/snowflake/task.go | 67 ++++++++++++++----- pkg/snowflake/task_test.go | 9 +++ 4 files changed, 107 insertions(+), 23 deletions(-) diff --git a/examples/resources/snowflake_task/resource.tf b/examples/resources/snowflake_task/resource.tf index bf249a9a49..5356b89047 100644 --- a/examples/resources/snowflake_task/resource.tf +++ b/examples/resources/snowflake_task/resource.tf @@ -39,3 +39,16 @@ resource snowflake_task serverless_task { when = "foo AND bar" enabled = true } + +resource snowflake_task test_task { + comment = "task with allow_overlapping_execution" + + database = "db" + schema = "schema" + + name = "test_task" + sql_statement = "select 1 as c;" + + allow_overlapping_execution = true + enabled = true +} diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 96bf6f554b..d44bc963c3 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -16,7 +16,8 @@ import ( ) const ( - taskIDDelimiter = '|' + taskIDDelimiter = '|' + AllowOverlappingExecution = "allow_overlapping_execution" ) var taskSchema = map[string]*schema.Schema{ @@ -106,6 +107,12 @@ var taskSchema = map[string]*schema.Schema{ Optional: true, Description: "Specifies the name of the notification integration used for error notifications.", }, + AllowOverlappingExecution: { + Type: schema.TypeBool, + Optional: true, + Default: false, + Description: "By default, Snowflake ensures that only one instance of a particular DAG is allowed to run at a time, setting the parameter value to TRUE permits DAG runs to overlap.", + }, } type taskID struct { @@ -129,7 +136,7 @@ func (t *taskID) String() (string, error) { return strTaskID, nil } -// difference find keys in a but not in b. +// difference find keys in 'a' but not in 'b'. func difference(a, b map[string]interface{}) map[string]interface{} { diff := make(map[string]interface{}) for k := range a { @@ -279,7 +286,7 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error { row := snowflake.QueryRow(db, q) t, err := snowflake.ScanTask(row) if err == sql.ErrNoRows { - // If not found, mark resource to be removed from statefile during apply or refresh + // If not found, mark resource to be removed from state file during apply or refresh log.Printf("[DEBUG] task (%s) not found", d.Id()) d.SetId("") return nil @@ -323,6 +330,11 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error { return err } + err = d.Set(AllowOverlappingExecution, t.AllowOverlappingExecution) + if err != nil { + return err + } + // The "DESCRIBE TASK ..." command returns the string "null" for error_integration if t.ErrorIntegration.String == "null" { t.ErrorIntegration.Valid = false @@ -445,8 +457,12 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { builder.WithComment(v.(string)) } + if v, ok := d.GetOk(AllowOverlappingExecution); ok { + builder.WithAllowOverlappingExecution(v.(bool)) + } + if v, ok := d.GetOk("error_integration"); ok { - builder.WithErrorIntegration((v.(string))) + builder.WithErrorIntegration(v.(string)) } if v, ok := d.GetOk("after"); ok { @@ -619,6 +635,21 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } + if d.HasChange(AllowOverlappingExecution) { + var q string + n := d.Get(AllowOverlappingExecution) + flag := n.(bool) + if flag == false { + q = builder.UnsetAllowOverlappingExecution() + } else { + q = builder.ChangeAllowOverlappingExecution(flag) + } + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error updating allow_overlapping_execution on task %v", d.Id()) + } + } + if d.HasChange("after") { var ( q string @@ -695,7 +726,7 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } else { q = builder.Suspend() // make sure defer doesn't enable task again - // when standalone or root task and status is supsended + // when standalone or root task and status is suspended needResumeCurrentTask = false if root != nil && builder.QualifiedName() == root.QualifiedName() { root = root.SetDisabled() //nolint diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index a26b54302f..45518047f6 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -29,6 +29,7 @@ type TaskBuilder struct { disabled bool userTaskManagedInitialWarehouseSize string errorIntegration string + allowOverlappingExecution bool } // GetFullName prepends db and schema to in parameter. @@ -74,6 +75,12 @@ func (tb *TaskBuilder) WithComment(c string) *TaskBuilder { return tb } +// WithAllowOverlappingExecution set the ALLOW_OVERLAPPING_EXECUTION on the TaskBuilder. +func (tb *TaskBuilder) WithAllowOverlappingExecution(flag bool) *TaskBuilder { + tb.allowOverlappingExecution = flag + return tb +} + // WithTimeout adds a timeout to the TaskBuilder. func (tb *TaskBuilder) WithTimeout(t int) *TaskBuilder { tb.userTaskTimeoutMS = t @@ -86,7 +93,7 @@ func (tb *TaskBuilder) WithDependency(after string) *TaskBuilder { return tb } -// WithCondition adds a when condition to the TaskBuilder. +// WithCondition adds a WHEN condition to the TaskBuilder. func (tb *TaskBuilder) WithCondition(when string) *TaskBuilder { tb.when = when return tb @@ -104,7 +111,7 @@ func (tb *TaskBuilder) WithInitialWarehouseSize(initialWarehouseSize string) *Ta return tb } -// / WithErrorIntegration adds ErrorIntegration specification to the TaskBuilder. +// WithErrorIntegration adds ErrorIntegration specification to the TaskBuilder. func (tb *TaskBuilder) WithErrorIntegration(s string) *TaskBuilder { tb.errorIntegration = s return tb @@ -124,7 +131,7 @@ func Task(name, db, schema string) *TaskBuilder { name: name, db: db, schema: schema, - disabled: false, // helper for when started root or standalone task gets supspended + disabled: false, // helper for when started root or standalone task gets suspended } } @@ -165,6 +172,10 @@ func (tb *TaskBuilder) Create() string { q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment))) } + if tb.allowOverlappingExecution { + q.WriteString(` ALLOW_OVERLAPPING_EXECUTION = TRUE`) + } + if tb.errorIntegration != "" { q.WriteString(fmt.Sprintf(` ERROR_INTEGRATION = '%v'`, EscapeString(tb.errorIntegration))) } @@ -233,6 +244,16 @@ func (tb *TaskBuilder) RemoveComment() string { return fmt.Sprintf(`ALTER TASK %v UNSET COMMENT`, tb.QualifiedName()) } +// ChangeAllowOverlappingExecution returns the sql that will change the ALLOW_OVERLAPPING_EXECUTION for the task. +func (tb *TaskBuilder) ChangeAllowOverlappingExecution(flag bool) string { + return fmt.Sprintf(`ALTER TASK %v SET ALLOW_OVERLAPPING_EXECUTION = %v`, tb.QualifiedName(), flag) +} + +// UnsetAllowOverlappingExecution returns the sql that will unset the ALLOW_OVERLAPPING_EXECUTION for the task. +func (tb *TaskBuilder) UnsetAllowOverlappingExecution() string { + return fmt.Sprintf(`ALTER TASK %v UNSET ALLOW_OVERLAPPING_EXECUTION`, tb.QualifiedName()) +} + // AddDependency returns the sql that will add the after dependency for the task. func (tb *TaskBuilder) AddDependency(after string) string { return fmt.Sprintf(`ALTER TASK %v ADD AFTER %v`, tb.QualifiedName(), tb.GetFullName(after)) @@ -270,7 +291,7 @@ func (tb *TaskBuilder) RemoveSessionParameters(params map[string]interface{}) st return fmt.Sprintf(`ALTER TASK %v UNSET %v`, tb.QualifiedName(), strings.Join(sortedKeys, ", ")) } -// ChangeCondition returns the sql that will update the when condition for the task. +// ChangeCondition returns the sql that will update the WHEN condition for the task. func (tb *TaskBuilder) ChangeCondition(newCondition string) string { return fmt.Sprintf(`ALTER TASK %v MODIFY WHEN %v`, tb.QualifiedName(), newCondition) } @@ -331,21 +352,31 @@ func (tb *TaskBuilder) RemoveErrorIntegration() string { return fmt.Sprintf(`ALTER TASK %v UNSET ERROR_INTEGRATION`, tb.QualifiedName()) } +func (tb *TaskBuilder) SetAllowOverlappingExecution() *TaskBuilder { + tb.allowOverlappingExecution = true + return tb +} + +func (tb *TaskBuilder) IsAllowOverlappingExecution() bool { + return tb.allowOverlappingExecution +} + type task struct { - ID string `db:"id"` - CreatedOn string `db:"created_on"` - Name string `db:"name"` - DatabaseName string `db:"database_name"` - SchemaName string `db:"schema_name"` - Owner string `db:"owner"` - Comment *string `db:"comment"` - Warehouse *string `db:"warehouse"` - Schedule *string `db:"schedule"` - Predecessors *string `db:"predecessors"` - State string `db:"state"` - Definition string `db:"definition"` - Condition *string `db:"condition"` - ErrorIntegration sql.NullString `db:"error_integration"` + ID string `db:"id"` + CreatedOn string `db:"created_on"` + Name string `db:"name"` + DatabaseName string `db:"database_name"` + SchemaName string `db:"schema_name"` + Owner string `db:"owner"` + Comment *string `db:"comment"` + Warehouse *string `db:"warehouse"` + Schedule *string `db:"schedule"` + Predecessors *string `db:"predecessors"` + State string `db:"state"` + Definition string `db:"definition"` + Condition *string `db:"condition"` + ErrorIntegration sql.NullString `db:"error_integration"` + AllowOverlappingExecution *bool `db:"allow_overlapping_execution"` } func (t *task) IsEnabled() bool { diff --git a/pkg/snowflake/task_test.go b/pkg/snowflake/task_test.go index 56879707e6..b3e7c77619 100644 --- a/pkg/snowflake/task_test.go +++ b/pkg/snowflake/task_test.go @@ -34,6 +34,9 @@ func TestTaskCreate(t *testing.T) { st.WithStatement("SELECT * FROM table WHERE column = 'name'") r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' USER_TASK_TIMEOUT_MS = 12 AFTER "test_db"."test_schema"."other_task" WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT * FROM table WHERE column = 'name'`) + + st.WithAllowOverlappingExecution(true) + r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' ALLOW_OVERLAPPING_EXECUTION = TRUE USER_TASK_TIMEOUT_MS = 12 AFTER "test_db"."test_schema"."other_task" WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT * FROM table WHERE column = 'name'`) } func TestChangeWarehouse(t *testing.T) { @@ -163,3 +166,9 @@ func TestShow(t *testing.T) { st := Task("test_task", "test_db", "test_schema") r.Equal(st.Show(), `SHOW TASKS LIKE 'test_task' IN SCHEMA "test_db"."test_schema"`) } + +func TestChangeAllowOverlappingExecution(t *testing.T) { + r := require.New(t) + st := Task("test_task", "test_db", "test_schema") + r.Equal(st.ChangeAllowOverlappingExecution(true), `ALTER TASK "test_db"."test_schema"."test_task" SET ALLOW_OVERLAPPING_EXECUTION = TRUE`) +} From 01e6931574c7bb7084cfba8a949bce78fcdc6c76 Mon Sep 17 00:00:00 2001 From: Miguel Lorek Date: Fri, 21 Oct 2022 08:57:46 +0100 Subject: [PATCH 2/2] comments --- pkg/resources/task.go | 12 ++++++------ pkg/snowflake/task.go | 10 +++++----- pkg/snowflake/task_test.go | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index d44bc963c3..3842242fa2 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -637,16 +637,16 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if d.HasChange(AllowOverlappingExecution) { var q string - n := d.Get(AllowOverlappingExecution) - flag := n.(bool) - if flag == false { - q = builder.UnsetAllowOverlappingExecution() + _, new := d.GetChange(AllowOverlappingExecution) + flag := new.(bool) + if flag { + q = builder.SetAllowOverlappingExecutionParameter() } else { - q = builder.ChangeAllowOverlappingExecution(flag) + q = builder.UnsetAllowOverlappingExecutionParameter() } err := snowflake.Exec(db, q) if err != nil { - return errors.Wrapf(err, "error updating allow_overlapping_execution on task %v", d.Id()) + return errors.Wrapf(err, "error updating %s on task %v", AllowOverlappingExecution, d.Id()) } } diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index 45518047f6..50f089c326 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -244,13 +244,13 @@ func (tb *TaskBuilder) RemoveComment() string { return fmt.Sprintf(`ALTER TASK %v UNSET COMMENT`, tb.QualifiedName()) } -// ChangeAllowOverlappingExecution returns the sql that will change the ALLOW_OVERLAPPING_EXECUTION for the task. -func (tb *TaskBuilder) ChangeAllowOverlappingExecution(flag bool) string { - return fmt.Sprintf(`ALTER TASK %v SET ALLOW_OVERLAPPING_EXECUTION = %v`, tb.QualifiedName(), flag) +// SetAllowOverlappingExecutionParameter returns the sql that will change the ALLOW_OVERLAPPING_EXECUTION for the task. +func (tb *TaskBuilder) SetAllowOverlappingExecutionParameter() string { + return fmt.Sprintf(`ALTER TASK %v SET ALLOW_OVERLAPPING_EXECUTION = TRUE`, tb.QualifiedName()) } -// UnsetAllowOverlappingExecution returns the sql that will unset the ALLOW_OVERLAPPING_EXECUTION for the task. -func (tb *TaskBuilder) UnsetAllowOverlappingExecution() string { +// UnsetAllowOverlappingExecutionParameter returns the sql that will unset the ALLOW_OVERLAPPING_EXECUTION for the task. +func (tb *TaskBuilder) UnsetAllowOverlappingExecutionParameter() string { return fmt.Sprintf(`ALTER TASK %v UNSET ALLOW_OVERLAPPING_EXECUTION`, tb.QualifiedName()) } diff --git a/pkg/snowflake/task_test.go b/pkg/snowflake/task_test.go index b3e7c77619..5b44527511 100644 --- a/pkg/snowflake/task_test.go +++ b/pkg/snowflake/task_test.go @@ -167,8 +167,8 @@ func TestShow(t *testing.T) { r.Equal(st.Show(), `SHOW TASKS LIKE 'test_task' IN SCHEMA "test_db"."test_schema"`) } -func TestChangeAllowOverlappingExecution(t *testing.T) { +func TestSetAllowOverlappingExecution(t *testing.T) { r := require.New(t) st := Task("test_task", "test_db", "test_schema") - r.Equal(st.ChangeAllowOverlappingExecution(true), `ALTER TASK "test_db"."test_schema"."test_task" SET ALLOW_OVERLAPPING_EXECUTION = TRUE`) + r.Equal(st.SetAllowOverlappingExecutionParameter(), `ALTER TASK "test_db"."test_schema"."test_task" SET ALLOW_OVERLAPPING_EXECUTION = TRUE`) }