From 104010b76ef7a6ed8beefcc8c5d7b000c10ea962 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Wed, 3 Jan 2024 12:35:34 -0500 Subject: [PATCH] fix: go-sdk improvements and set default timeouts --- api/v1/server/oas/transformers/workflow.go | 6 ++++++ internal/services/jobscontroller/controller.go | 13 +++---------- internal/services/shared/defaults/timeout.go | 6 ++++++ pkg/worker/worker.go | 8 +++++--- pkg/worker/workflow.go | 15 ++++++++++++++- 5 files changed, 34 insertions(+), 14 deletions(-) create mode 100644 internal/services/shared/defaults/timeout.go diff --git a/api/v1/server/oas/transformers/workflow.go b/api/v1/server/oas/transformers/workflow.go index 6a4bb8204..d3e6d4f04 100644 --- a/api/v1/server/oas/transformers/workflow.go +++ b/api/v1/server/oas/transformers/workflow.go @@ -7,8 +7,10 @@ import ( "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/iter" + "github.com/hatchet-dev/hatchet/internal/repository" "github.com/hatchet-dev/hatchet/internal/repository/prisma/db" "github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc" + "github.com/hatchet-dev/hatchet/internal/services/shared/defaults" "github.com/hatchet-dev/hatchet/pkg/client/types" ) @@ -228,6 +230,8 @@ func ToJob(job *db.JobModel) (*gen.Job, error) { if timeout, ok := job.Timeout(); ok { res.Timeout = &timeout + } else { + res.Timeout = repository.StringPtr(defaults.DefaultJobRunTimeout) } if steps := job.Steps(); steps != nil { @@ -269,6 +273,8 @@ func ToStep(step *db.StepModel) *gen.Step { if timeout, ok := step.Timeout(); ok { res.Timeout = &timeout + } else { + res.Timeout = repository.StringPtr(defaults.DefaultStepRunTimeout) } if next, ok := step.NextID(); ok { diff --git a/internal/services/jobscontroller/controller.go b/internal/services/jobscontroller/controller.go index 087aeabc1..68301c331 100644 --- a/internal/services/jobscontroller/controller.go +++ b/internal/services/jobscontroller/controller.go @@ -12,6 +12,7 @@ import ( "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/repository" "github.com/hatchet-dev/hatchet/internal/repository/prisma/db" + "github.com/hatchet-dev/hatchet/internal/services/shared/defaults" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" "github.com/hatchet-dev/hatchet/internal/taskqueue" "github.com/rs/zerolog" @@ -1016,16 +1017,12 @@ func stepRunAssignedTask(tenantId, stepRunId string, worker *db.WorkerModel) *ta } func scheduleStepRunTimeoutTask(ticker *db.TickerModel, stepRun *db.StepRunModel) (*taskqueue.Task, error) { - var durationStr string + durationStr := defaults.DefaultStepRunTimeout if timeout, ok := stepRun.Step().Timeout(); ok { durationStr = timeout } - if durationStr == "" { - durationStr = "300s" - } - // get a duration duration, err := time.ParseDuration(durationStr) @@ -1054,16 +1051,12 @@ func scheduleStepRunTimeoutTask(ticker *db.TickerModel, stepRun *db.StepRunModel } func scheduleJobRunTimeoutTask(ticker *db.TickerModel, jobRun *db.JobRunModel) (*taskqueue.Task, error) { - var durationStr string + durationStr := defaults.DefaultJobRunTimeout if timeout, ok := jobRun.Job().Timeout(); ok { durationStr = timeout } - if durationStr == "" { - durationStr = "300s" - } - // get a duration duration, err := time.ParseDuration(durationStr) diff --git a/internal/services/shared/defaults/timeout.go b/internal/services/shared/defaults/timeout.go new file mode 100644 index 000000000..e11ec70f7 --- /dev/null +++ b/internal/services/shared/defaults/timeout.go @@ -0,0 +1,6 @@ +package defaults + +const ( + DefaultJobRunTimeout = "300s" + DefaultStepRunTimeout = "300s" +) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 66965d232..68950d6d2 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -185,9 +185,11 @@ func (w *Worker) registerAction(name string, method any) error { return fmt.Errorf("could not get function from method: %w", err) } - // ensure action has not been registered - if _, ok := w.actions[name]; ok { - return fmt.Errorf("action %s already registered", name) + // if action has already been registered, ensure that the method is the same + if currMethod, ok := w.actions[name]; ok { + if reflect.ValueOf(currMethod).Pointer() != reflect.ValueOf(method).Pointer() { + return fmt.Errorf("action %s is already registered with function %s", name, getFnName(currMethod.MethodFn())) + } } w.actions[name] = &actionImpl{ diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index 155b4f58d..0182cc05d 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -64,6 +64,8 @@ type Workflow struct { type workflowFn struct { Function any + + name string } func Fn(f any) workflowFn { @@ -72,12 +74,23 @@ func Fn(f any) workflowFn { } } +func (w workflowFn) SetName(name string) workflowFn { + w.name = name + return w +} + func (w workflowFn) ToWorkflow(svcName string) types.Workflow { + jobName := w.name + + if jobName == "" { + jobName = getFnName(w.Function) + } workflowJob := &WorkflowJob{ - Name: getFnName(w.Function), + Name: jobName, Steps: []WorkflowStep{ { Function: w.Function, + ID: w.name, }, }, }