From bdcbd3689315ccbdcfd9711ea55cc470630526c1 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Tue, 18 Oct 2022 09:45:32 +0200 Subject: [PATCH] this to make sure queues are cleaned up on boot (#1812) --- cmds/modules/provisiond/main.go | 2 +- pkg/provision/engine.go | 36 ++++++++++++++++++++------------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/cmds/modules/provisiond/main.go b/cmds/modules/provisiond/main.go index 43e2f1a2a..afb891148 100644 --- a/cmds/modules/provisiond/main.go +++ b/cmds/modules/provisiond/main.go @@ -329,7 +329,7 @@ func action(cli *cli.Context) error { } }() - if err := app.MarkBooted(provisionModule); err != nil { + if err := app.MarkBooted(serverName); err != nil { log.Error().Err(err).Msg("failed to mark module as booted") } diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index 29612e84e..d31ee3dc9 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -275,17 +275,9 @@ func GetSubstrate(ctx context.Context) substrate.Manager { // one reservation at a time. On error, the engine will log the error. and // continue to next reservation. func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOption) (*NativeEngine, error) { - queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} }) - if err != nil { - // if this happens it means data types has been changed in that case we need - // to clean up the queue and start over. unfortunately any un applied changes - os.RemoveAll(filepath.Join(root, "jobs")) - return nil, errors.Wrap(err, "failed to create job queue") - } e := &NativeEngine{ storage: storage, provisioner: provisioner, - queue: queue, twins: &nullKeyGetter{}, admins: &nullKeyGetter{}, order: gridtypes.Types(), @@ -296,6 +288,19 @@ func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOp opt.apply(e) } + if e.rerunAll { + os.RemoveAll(filepath.Join(root, "jobs")) + } + + queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} }) + if err != nil { + // if this happens it means data types has been changed in that case we need + // to clean up the queue and start over. unfortunately any un applied changes + os.RemoveAll(filepath.Join(root, "jobs")) + return nil, errors.Wrap(err, "failed to create job queue") + } + + e.queue = queue return e, nil } @@ -459,7 +464,6 @@ func (e *NativeEngine) Run(root context.Context) error { } for { - obj, err := e.queue.PeekBlock() if err != nil { log.Error().Err(err).Msg("failed to check job queue") @@ -469,6 +473,10 @@ func (e *NativeEngine) Run(root context.Context) error { job := obj.(*engineJob) ctx := withDeployment(root, job.Target.TwinID, job.Target.ContractID) + l := log.With(). + Uint32("twin", job.Target.TwinID). + Uint64("contract", job.Target.ContractID). + Logger() // contract validation // this should ONLY be done on provosion and update operation @@ -476,17 +484,17 @@ func (e *NativeEngine) Run(root context.Context) error { // otherwise, contract validation is needed ctx, err = e.contract(ctx, &job.Target) if err != nil { - log.Error().Err(err).Uint64("contract", job.Target.ContractID).Msg("contact validation fails") + l.Error().Err(err).Msg("contact validation fails") //job.Target.SetError(err) if err := e.storage.Error(job.Target.TwinID, job.Target.ContractID, err); err != nil { - log.Error().Err(err).Msg("failed to set deployment global error") + l.Error().Err(err).Msg("failed to set deployment global error") } _, _ = e.queue.Dequeue() continue } - log.Debug().Uint64("contract", job.Target.ContractID).Msg("contact validation pass") + l.Debug().Msg("contact validation pass") } switch job.Op { @@ -515,7 +523,7 @@ func (e *NativeEngine) Run(root context.Context) error { // and update to reflect the current result on those workloads. update, err := job.Source.Upgrade(&job.Target) if err != nil { - log.Error().Err(err).Uint32("twin", job.Target.TwinID).Uint64("id", job.Target.ContractID).Msg("failed to get update procedure") + l.Error().Err(err).Msg("failed to get update procedure") break } e.updateDeployment(ctx, update) @@ -523,7 +531,7 @@ func (e *NativeEngine) Run(root context.Context) error { _, err = e.queue.Dequeue() if err != nil { - log.Error().Err(err).Msg("failed to dequeue job") + l.Error().Err(err).Msg("failed to dequeue job") } e.safeCallback(&job.Target, job.Op == opDeprovision)