Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Task resource v1 readiness part 2 #3170

Merged
merged 23 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,39 @@ across different versions.
> [!TIP]
> We highly recommend upgrading the versions one by one instead of bulk upgrades.
## v0.98.0 ➞ v0.99.0

### snowflake_task resource changes
Changes:
- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values))
- `shedule` field changed from single value to nested object that allows for specifying either minutes or cron
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (maybe for next PRs): instructions for user what they need to do and what will be handled automatically

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably next pr, because there's no state upgrader yet

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, a typo: schedule


Before:
```terraform
resource "snowflake_task" "example" {
# ...
schedule = "5 MINUTES"
# or
schedule = "USING SCHEDULE * * * * * UTC"
# ...
}
```
After:
```terraform
resource "snowflake_task" "example" {
# ...
schedule {
minutes = 5
# or
using_cron = "* * * * * UTC"
}
# ...
}
```
- All task parameters defined in [the Snowflake documentation](https://docs.snowflake.com/en/sql-reference/parameters) added into the top-level schema and removed `session_paramters` map.
- `show_output` and `paramters` fields added for holding SHOW and SHOW PARAMETERS output (see [raw Snowflake output](./v1-preparations/CHANGES_BEFORE_V1.md#raw-snowflake-output)).
- Added support for finalizer tasks with `finalize` field. It conflicts with `after` and `schedule` (see [finalizer tasks](https://docs.snowflake.com/en/user-guide/tasks-graphs#release-and-cleanup-of-task-graphs)).

## v0.97.0 ➞ v0.98.0

### snowflake_streams data source changes
Expand Down
6 changes: 3 additions & 3 deletions docs/resources/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ resource "snowflake_task" "test_task" {
- `quoted_identifiers_ignore_case` (Boolean) Specifies whether letters in double-quoted object identifiers are stored and resolved as uppercase letters. By default, Snowflake preserves the case of alphabetic characters when storing and resolving double-quoted identifiers (see [Identifier resolution](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html#label-identifier-casing)). You can use this parameter in situations in which [third-party applications always use double quotes around identifiers](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html#label-identifier-casing-parameter). For more information, check [QUOTED_IDENTIFIERS_IGNORE_CASE docs](https://docs.snowflake.com/en/sql-reference/parameters#quoted-identifiers-ignore-case).
- `rows_per_resultset` (Number) Specifies the maximum number of rows returned in a result set. A value of 0 specifies no maximum. For more information, check [ROWS_PER_RESULTSET docs](https://docs.snowflake.com/en/sql-reference/parameters#rows-per-resultset).
- `s3_stage_vpce_dns_name` (String) Specifies the DNS name of an Amazon S3 interface endpoint. Requests sent to the internal stage of an account via [AWS PrivateLink for Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/privatelink-interface-endpoints.html) use this endpoint to connect. For more information, see [Accessing Internal stages with dedicated interface endpoints](https://docs.snowflake.com/en/user-guide/private-internal-stages-aws.html#label-aws-privatelink-internal-stage-network-isolation). For more information, check [S3_STAGE_VPCE_DNS_NAME docs](https://docs.snowflake.com/en/sql-reference/parameters#s3-stage-vpce-dns-name).
- `schedule` (Block List, Max: 1) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after) (see [below for nested schema](#nestedblock--schedule))
- `schedule` (Block List, Max: 1) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after; when set, one of the sub-fields `minutes` or `using_cron` should be set) (see [below for nested schema](#nestedblock--schedule))
- `search_path` (String) Specifies the path to search to resolve unqualified object names in queries. For more information, see [Name resolution in queries](https://docs.snowflake.com/en/sql-reference/name-resolution.html#label-object-name-resolution-search-path). Comma-separated list of identifiers. An identifier can be a fully or partially qualified schema name. For more information, check [SEARCH_PATH docs](https://docs.snowflake.com/en/sql-reference/parameters#search-path).
- `statement_queued_timeout_in_seconds` (Number) Amount of time, in seconds, a SQL statement (query, DDL, DML, etc.) remains queued for a warehouse before it is canceled by the system. This parameter can be used in conjunction with the [MAX_CONCURRENCY_LEVEL](https://docs.snowflake.com/en/sql-reference/parameters#label-max-concurrency-level) parameter to ensure a warehouse is never backlogged. For more information, check [STATEMENT_QUEUED_TIMEOUT_IN_SECONDS docs](https://docs.snowflake.com/en/sql-reference/parameters#statement-queued-timeout-in-seconds).
- `statement_timeout_in_seconds` (Number) Amount of time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system. For more information, check [STATEMENT_TIMEOUT_IN_SECONDS docs](https://docs.snowflake.com/en/sql-reference/parameters#statement-timeout-in-seconds).
Expand Down Expand Up @@ -164,8 +164,8 @@ resource "snowflake_task" "test_task" {

Optional:

- `minutes` (Number) Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only.
- `using_cron` (String) Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax.
- `minutes` (Number) Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only. (conflicts with `using_cron`)
- `using_cron` (String) Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax. (conflicts with `minutes`)


<a id="nestedatt--parameters"></a>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions pkg/acceptance/bettertestspoc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,7 @@ func ConfigVariablesFromModels(t *testing.T, variableName string, models ...Reso
t.Helper()
allVariables := make([]tfconfig.Variable, 0)
for _, model := range models {
rType := reflect.TypeOf(model).Elem()
rValue := reflect.ValueOf(model).Elem()
variables := make(tfconfig.Variables)
for i := 0; i < rType.NumField(); i++ {
field := rType.Field(i)
if jsonTag, ok := field.Tag.Lookup("json"); ok {
name := strings.Split(jsonTag, ",")[0]
if fieldValue, ok := rValue.Field(i).Interface().(tfconfig.Variable); ok {
variables[name] = fieldValue
}
}
}
allVariables = append(allVariables, tfconfig.ObjectVariable(variables))
allVariables = append(allVariables, tfconfig.ObjectVariable(ConfigVariablesFromModel(t, model)))
}
return tfconfig.Variables{
variableName: tfconfig.ListVariable(allVariables...),
Expand Down
4 changes: 3 additions & 1 deletion pkg/datasources/tasks_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func tasks(databaseName string, schemaName string, taskName string) string {
warehouse = snowflake_warehouse.test.name
sql_statement = "SHOW FUNCTIONS"
started = true
schedule = "15 MINUTES"
schedule {
minutes = 15
}
}
data snowflake_tasks "t" {
Expand Down
15 changes: 6 additions & 9 deletions pkg/resources/resource_helpers_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ func setBooleanStringFromBoolProperty(d *schema.ResourceData, key string, proper
return nil
}

func attributeMappedValueReadIfNotEmptyElse[T, R any](d *schema.ResourceData, key string, value *T, mapper func(*T) (R, error), defaultValue any) error {
func attributeMappedValueReadOrDefault[T, R any](d *schema.ResourceData, key string, value *T, mapper func(*T) (R, error), defaultValue *R) error {
if value != nil {
mappedValue, err := mapper(value)
if err != nil {
return err
}
if err := d.Set(key, mappedValue); err != nil {
return err
}
} else {
if err := d.Set(key, defaultValue); err != nil {
return err
}
return d.Set(key, mappedValue)
}
return nil
if defaultValue != nil {
return d.Set(key, *defaultValue)
}
return d.Set(key, nil)
}
38 changes: 18 additions & 20 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,23 @@ var taskSchema = map[string]*schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: "The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after)",
Description: "The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after; when set, one of the sub-fields `minutes` or `using_cron` should be set)",
ConflictsWith: []string{"finalize", "after"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"minutes": {
Type: schema.TypeInt,
Optional: true,
Description: "Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only.",
Description: "Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only. (conflicts with `using_cron`)",
ValidateDiagFunc: validation.ToDiagFunc(validation.IntAtLeast(1)),
ConflictsWith: []string{"schedule.0.using_cron"},
ExactlyOneOf: []string{"schedule.0.minutes", "schedule.0.using_cron"},
},
"using_cron": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax.",
Description: "Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax. (conflicts with `minutes`)",
DiffSuppressFunc: ignoreCaseSuppressFunc,
ConflictsWith: []string{"schedule.0.minutes"},
ExactlyOneOf: []string{"schedule.0.minutes", "schedule.0.using_cron"},
},
},
},
Expand Down Expand Up @@ -329,12 +329,7 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di

defer func() {
if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil {
log.Printf("[WARN] failed to resume tasks in create: %s", err)
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Failed to resume tasks when creating %s", id.FullyQualifiedName()),
Detail: fmt.Sprintf("Failed to resume some of the tasks with the following errors (tasks can be resumed by applying the same configuration again): %v", err),
})
diags = append(diags, resumeTaskErrorDiag(id, "create", err))
}
}()

Expand Down Expand Up @@ -581,13 +576,13 @@ func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc {
}

if errs := errors.Join(
attributeMappedValueReadIfNotEmptyElse(d, "finalize", task.TaskRelations.FinalizedRootTask, func(finalizedRootTask *sdk.SchemaObjectIdentifier) (string, error) {
attributeMappedValueReadOrDefault(d, "finalize", task.TaskRelations.FinalizedRootTask, func(finalizedRootTask *sdk.SchemaObjectIdentifier) (string, error) {
return finalizedRootTask.FullyQualifiedName(), nil
}, nil),
attributeMappedValueReadIfNotEmptyElse(d, "error_integration", task.ErrorIntegration, func(errorIntegration *sdk.AccountObjectIdentifier) (string, error) {
attributeMappedValueReadOrDefault(d, "error_integration", task.ErrorIntegration, func(errorIntegration *sdk.AccountObjectIdentifier) (string, error) {
return errorIntegration.Name(), nil
}, nil),
attributeMappedValueReadIfNotEmptyElse(d, "warehouse", task.Warehouse, func(warehouse *sdk.AccountObjectIdentifier) (string, error) {
attributeMappedValueReadOrDefault(d, "warehouse", task.Warehouse, func(warehouse *sdk.AccountObjectIdentifier) (string, error) {
return warehouse.Name(), nil
}, nil),
func() error {
Expand Down Expand Up @@ -649,12 +644,7 @@ func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di
tasksToResume, err := client.Tasks.SuspendRootTasks(ctx, id, id)
defer func() {
if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil {
log.Printf("[WARN] failed to resume tasks in delete: %s", err)
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Failed to resume tasks when deleting %s", id.FullyQualifiedName()),
Detail: fmt.Sprintf("Failed to resume some of the tasks with the following errors (tasks can be resumed by applying the same configuration again): %v", err),
})
diags = append(diags, resumeTaskErrorDiag(id, "delete", err))
}
}()
if err != nil {
Expand All @@ -670,6 +660,14 @@ func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di
return nil
}

func resumeTaskErrorDiag(id sdk.SchemaObjectIdentifier, operation string, originalErr error) diag.Diagnostic {
return diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Failed to resume tasks in %s operation (id=%s)", operation, id.FullyQualifiedName()),
Detail: fmt.Sprintf("Failed to resume some of the tasks with the following errors (tasks can be resumed by applying the same configuration again): %v", originalErr),
}
}

func waitForTaskStart(ctx context.Context, client *sdk.Client, id sdk.SchemaObjectIdentifier) error {
err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(id).WithResume(true))
if err != nil {
Expand Down
Loading
Loading