From 7616ac10e6f5f76bce698954dc18f790c08ad2ed Mon Sep 17 00:00:00 2001 From: Satont Date: Sun, 17 Dec 2023 15:55:05 +0300 Subject: [PATCH] chore(timers): one time schedule all timers --- apps/timers/cmd/main.go | 2 ++ apps/timers/internal/s/s.go | 19 +++++++++++++++++++ apps/timers/internal/worker/worker.go | 2 +- 3 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 apps/timers/internal/s/s.go diff --git a/apps/timers/cmd/main.go b/apps/timers/cmd/main.go index d27827aa8..96a55e898 100644 --- a/apps/timers/cmd/main.go +++ b/apps/timers/cmd/main.go @@ -10,6 +10,7 @@ import ( "github.com/satont/twir/apps/timers/internal/repositories/channels" "github.com/satont/twir/apps/timers/internal/repositories/streams" "github.com/satont/twir/apps/timers/internal/repositories/timers" + "github.com/satont/twir/apps/timers/internal/s" "github.com/satont/twir/apps/timers/internal/worker" "github.com/satont/twir/apps/timers/internal/workflow" cfg "github.com/satont/twir/libs/config" @@ -43,6 +44,7 @@ func main() { ), fx.NopLogger, fx.Invoke( + s.New, worker.New, grpc_server.New, func(l logger.Logger) { diff --git a/apps/timers/internal/s/s.go b/apps/timers/internal/s/s.go new file mode 100644 index 000000000..f72b49db7 --- /dev/null +++ b/apps/timers/internal/s/s.go @@ -0,0 +1,19 @@ +package s + +import ( + "context" + + "github.com/satont/twir/apps/timers/internal/repositories/timers" + "github.com/satont/twir/apps/timers/internal/workflow" +) + +func New(repository timers.Repository, w *workflow.Workflow) { + timers, err := repository.GetAll() + if err != nil { + return + } + + for _, timer := range timers { + w.AddTimer(context.TODO(), timer.ID) + } +} diff --git a/apps/timers/internal/worker/worker.go b/apps/timers/internal/worker/worker.go index 425ed5f74..50900a26d 100644 --- a/apps/timers/internal/worker/worker.go +++ b/apps/timers/internal/worker/worker.go @@ -32,7 +32,7 @@ func New(opts Opts) error { if err != nil { return err } - + temporalWorker := worker.New(c, shared.TimersWorkerTaskQueueName, worker.Options{}) temporalWorker.RegisterWorkflow(opts.Workflow.Flow)