Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: task with allow_overlapping_execution option #1291

Merged
merged 4 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/resources/snowflake_task/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,16 @@ resource snowflake_task serverless_task {
when = "foo AND bar"
enabled = true
}

resource snowflake_task test_task {
comment = "task with allow_overlapping_execution"

database = "db"
schema = "schema"

name = "test_task"
sql_statement = "select 1 as c;"

allow_overlapping_execution = true
enabled = true
}
41 changes: 36 additions & 5 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

const (
taskIDDelimiter = '|'
taskIDDelimiter = '|'
AllowOverlappingExecution = "allow_overlapping_execution"
)

var taskSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -106,6 +107,12 @@ var taskSchema = map[string]*schema.Schema{
Optional: true,
Description: "Specifies the name of the notification integration used for error notifications.",
},
AllowOverlappingExecution: {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: "By default, Snowflake ensures that only one instance of a particular DAG is allowed to run at a time, setting the parameter value to TRUE permits DAG runs to overlap.",
},
}

type taskID struct {
Expand All @@ -129,7 +136,7 @@ func (t *taskID) String() (string, error) {
return strTaskID, nil
}

// difference find keys in a but not in b.
// difference find keys in 'a' but not in 'b'.
func difference(a, b map[string]interface{}) map[string]interface{} {
diff := make(map[string]interface{})
for k := range a {
Expand Down Expand Up @@ -279,7 +286,7 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {
row := snowflake.QueryRow(db, q)
t, err := snowflake.ScanTask(row)
if err == sql.ErrNoRows {
// If not found, mark resource to be removed from statefile during apply or refresh
// If not found, mark resource to be removed from state file during apply or refresh
log.Printf("[DEBUG] task (%s) not found", d.Id())
d.SetId("")
return nil
Expand Down Expand Up @@ -323,6 +330,11 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set(AllowOverlappingExecution, t.AllowOverlappingExecution)
if err != nil {
return err
}

// The "DESCRIBE TASK ..." command returns the string "null" for error_integration
if t.ErrorIntegration.String == "null" {
t.ErrorIntegration.Valid = false
Expand Down Expand Up @@ -445,8 +457,12 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error {
builder.WithComment(v.(string))
}

if v, ok := d.GetOk(AllowOverlappingExecution); ok {
builder.WithAllowOverlappingExecution(v.(bool))
}

if v, ok := d.GetOk("error_integration"); ok {
builder.WithErrorIntegration((v.(string)))
builder.WithErrorIntegration(v.(string))
}

if v, ok := d.GetOk("after"); ok {
Expand Down Expand Up @@ -619,6 +635,21 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
}
}

if d.HasChange(AllowOverlappingExecution) {
var q string
n := d.Get(AllowOverlappingExecution)
mlorek marked this conversation as resolved.
Show resolved Hide resolved
flag := n.(bool)
if flag == false {
q = builder.UnsetAllowOverlappingExecution()
} else {
q = builder.ChangeAllowOverlappingExecution(flag)
mlorek marked this conversation as resolved.
Show resolved Hide resolved
}
err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating allow_overlapping_execution on task %v", d.Id())
}
}

if d.HasChange("after") {
var (
q string
Expand Down Expand Up @@ -695,7 +726,7 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
} else {
q = builder.Suspend()
// make sure defer doesn't enable task again
// when standalone or root task and status is supsended
// when standalone or root task and status is suspended
needResumeCurrentTask = false
if root != nil && builder.QualifiedName() == root.QualifiedName() {
root = root.SetDisabled() //nolint
Expand Down
67 changes: 49 additions & 18 deletions pkg/snowflake/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type TaskBuilder struct {
disabled bool
userTaskManagedInitialWarehouseSize string
errorIntegration string
allowOverlappingExecution bool
}

// GetFullName prepends db and schema to in parameter.
Expand Down Expand Up @@ -74,6 +75,12 @@ func (tb *TaskBuilder) WithComment(c string) *TaskBuilder {
return tb
}

// WithAllowOverlappingExecution set the ALLOW_OVERLAPPING_EXECUTION on the TaskBuilder.
func (tb *TaskBuilder) WithAllowOverlappingExecution(flag bool) *TaskBuilder {
tb.allowOverlappingExecution = flag
return tb
}

// WithTimeout adds a timeout to the TaskBuilder.
func (tb *TaskBuilder) WithTimeout(t int) *TaskBuilder {
tb.userTaskTimeoutMS = t
Expand All @@ -86,7 +93,7 @@ func (tb *TaskBuilder) WithDependency(after string) *TaskBuilder {
return tb
}

// WithCondition adds a when condition to the TaskBuilder.
// WithCondition adds a WHEN condition to the TaskBuilder.
func (tb *TaskBuilder) WithCondition(when string) *TaskBuilder {
tb.when = when
return tb
Expand All @@ -104,7 +111,7 @@ func (tb *TaskBuilder) WithInitialWarehouseSize(initialWarehouseSize string) *Ta
return tb
}

// / WithErrorIntegration adds ErrorIntegration specification to the TaskBuilder.
// WithErrorIntegration adds ErrorIntegration specification to the TaskBuilder.
func (tb *TaskBuilder) WithErrorIntegration(s string) *TaskBuilder {
tb.errorIntegration = s
return tb
Expand All @@ -124,7 +131,7 @@ func Task(name, db, schema string) *TaskBuilder {
name: name,
db: db,
schema: schema,
disabled: false, // helper for when started root or standalone task gets supspended
disabled: false, // helper for when started root or standalone task gets suspended
}
}

Expand Down Expand Up @@ -165,6 +172,10 @@ func (tb *TaskBuilder) Create() string {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment)))
}

if tb.allowOverlappingExecution {
q.WriteString(` ALLOW_OVERLAPPING_EXECUTION = TRUE`)
}

if tb.errorIntegration != "" {
q.WriteString(fmt.Sprintf(` ERROR_INTEGRATION = '%v'`, EscapeString(tb.errorIntegration)))
}
Expand Down Expand Up @@ -233,6 +244,16 @@ func (tb *TaskBuilder) RemoveComment() string {
return fmt.Sprintf(`ALTER TASK %v UNSET COMMENT`, tb.QualifiedName())
}

// ChangeAllowOverlappingExecution returns the sql that will change the ALLOW_OVERLAPPING_EXECUTION for the task.
func (tb *TaskBuilder) ChangeAllowOverlappingExecution(flag bool) string {
mlorek marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf(`ALTER TASK %v SET ALLOW_OVERLAPPING_EXECUTION = %v`, tb.QualifiedName(), flag)
}

// UnsetAllowOverlappingExecution returns the sql that will unset the ALLOW_OVERLAPPING_EXECUTION for the task.
func (tb *TaskBuilder) UnsetAllowOverlappingExecution() string {
return fmt.Sprintf(`ALTER TASK %v UNSET ALLOW_OVERLAPPING_EXECUTION`, tb.QualifiedName())
}

// AddDependency returns the sql that will add the after dependency for the task.
func (tb *TaskBuilder) AddDependency(after string) string {
return fmt.Sprintf(`ALTER TASK %v ADD AFTER %v`, tb.QualifiedName(), tb.GetFullName(after))
Expand Down Expand Up @@ -270,7 +291,7 @@ func (tb *TaskBuilder) RemoveSessionParameters(params map[string]interface{}) st
return fmt.Sprintf(`ALTER TASK %v UNSET %v`, tb.QualifiedName(), strings.Join(sortedKeys, ", "))
}

// ChangeCondition returns the sql that will update the when condition for the task.
// ChangeCondition returns the sql that will update the WHEN condition for the task.
func (tb *TaskBuilder) ChangeCondition(newCondition string) string {
return fmt.Sprintf(`ALTER TASK %v MODIFY WHEN %v`, tb.QualifiedName(), newCondition)
}
Expand Down Expand Up @@ -331,21 +352,31 @@ func (tb *TaskBuilder) RemoveErrorIntegration() string {
return fmt.Sprintf(`ALTER TASK %v UNSET ERROR_INTEGRATION`, tb.QualifiedName())
}

func (tb *TaskBuilder) SetAllowOverlappingExecution() *TaskBuilder {
tb.allowOverlappingExecution = true
return tb
}

func (tb *TaskBuilder) IsAllowOverlappingExecution() bool {
return tb.allowOverlappingExecution
}

type task struct {
ID string `db:"id"`
CreatedOn string `db:"created_on"`
Name string `db:"name"`
DatabaseName string `db:"database_name"`
SchemaName string `db:"schema_name"`
Owner string `db:"owner"`
Comment *string `db:"comment"`
Warehouse *string `db:"warehouse"`
Schedule *string `db:"schedule"`
Predecessors *string `db:"predecessors"`
State string `db:"state"`
Definition string `db:"definition"`
Condition *string `db:"condition"`
ErrorIntegration sql.NullString `db:"error_integration"`
ID string `db:"id"`
CreatedOn string `db:"created_on"`
Name string `db:"name"`
DatabaseName string `db:"database_name"`
SchemaName string `db:"schema_name"`
Owner string `db:"owner"`
Comment *string `db:"comment"`
Warehouse *string `db:"warehouse"`
Schedule *string `db:"schedule"`
Predecessors *string `db:"predecessors"`
State string `db:"state"`
Definition string `db:"definition"`
Condition *string `db:"condition"`
ErrorIntegration sql.NullString `db:"error_integration"`
AllowOverlappingExecution *bool `db:"allow_overlapping_execution"`
}

func (t *task) IsEnabled() bool {
Expand Down
9 changes: 9 additions & 0 deletions pkg/snowflake/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func TestTaskCreate(t *testing.T) {

st.WithStatement("SELECT * FROM table WHERE column = 'name'")
r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' USER_TASK_TIMEOUT_MS = 12 AFTER "test_db"."test_schema"."other_task" WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT * FROM table WHERE column = 'name'`)

st.WithAllowOverlappingExecution(true)
r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' ALLOW_OVERLAPPING_EXECUTION = TRUE USER_TASK_TIMEOUT_MS = 12 AFTER "test_db"."test_schema"."other_task" WHEN SYSTEM$STREAM_HAS_DATA('MYSTREAM') AS SELECT * FROM table WHERE column = 'name'`)
}

func TestChangeWarehouse(t *testing.T) {
Expand Down Expand Up @@ -163,3 +166,9 @@ func TestShow(t *testing.T) {
st := Task("test_task", "test_db", "test_schema")
r.Equal(st.Show(), `SHOW TASKS LIKE 'test_task' IN SCHEMA "test_db"."test_schema"`)
}

func TestChangeAllowOverlappingExecution(t *testing.T) {
r := require.New(t)
st := Task("test_task", "test_db", "test_schema")
r.Equal(st.ChangeAllowOverlappingExecution(true), `ALTER TASK "test_db"."test_schema"."test_task" SET ALLOW_OVERLAPPING_EXECUTION = TRUE`)
}