Skip to content

Commit

Permalink
this to make sure queues are cleaned up on boot (#1812)
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy authored Oct 18, 2022
1 parent cf39279 commit bdcbd36
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func action(cli *cli.Context) error {
}
}()

if err := app.MarkBooted(provisionModule); err != nil {
if err := app.MarkBooted(serverName); err != nil {
log.Error().Err(err).Msg("failed to mark module as booted")
}

Expand Down
36 changes: 22 additions & 14 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,9 @@ func GetSubstrate(ctx context.Context) substrate.Manager {
// one reservation at a time. On error, the engine will log the error. and
// continue to next reservation.
func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOption) (*NativeEngine, error) {
queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} })
if err != nil {
// if this happens it means data types has been changed in that case we need
// to clean up the queue and start over. unfortunately any un applied changes
os.RemoveAll(filepath.Join(root, "jobs"))
return nil, errors.Wrap(err, "failed to create job queue")
}
e := &NativeEngine{
storage: storage,
provisioner: provisioner,
queue: queue,
twins: &nullKeyGetter{},
admins: &nullKeyGetter{},
order: gridtypes.Types(),
Expand All @@ -296,6 +288,19 @@ func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOp
opt.apply(e)
}

if e.rerunAll {
os.RemoveAll(filepath.Join(root, "jobs"))
}

queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} })
if err != nil {
// if this happens it means data types has been changed in that case we need
// to clean up the queue and start over. unfortunately any un applied changes
os.RemoveAll(filepath.Join(root, "jobs"))
return nil, errors.Wrap(err, "failed to create job queue")
}

e.queue = queue
return e, nil
}

Expand Down Expand Up @@ -459,7 +464,6 @@ func (e *NativeEngine) Run(root context.Context) error {
}

for {

obj, err := e.queue.PeekBlock()
if err != nil {
log.Error().Err(err).Msg("failed to check job queue")
Expand All @@ -469,24 +473,28 @@ func (e *NativeEngine) Run(root context.Context) error {

job := obj.(*engineJob)
ctx := withDeployment(root, job.Target.TwinID, job.Target.ContractID)
l := log.With().
Uint32("twin", job.Target.TwinID).
Uint64("contract", job.Target.ContractID).
Logger()

// contract validation
// this should ONLY be done on provosion and update operation
if job.Op == opProvision || job.Op == opUpdate {
// otherwise, contract validation is needed
ctx, err = e.contract(ctx, &job.Target)
if err != nil {
log.Error().Err(err).Uint64("contract", job.Target.ContractID).Msg("contact validation fails")
l.Error().Err(err).Msg("contact validation fails")
//job.Target.SetError(err)
if err := e.storage.Error(job.Target.TwinID, job.Target.ContractID, err); err != nil {
log.Error().Err(err).Msg("failed to set deployment global error")
l.Error().Err(err).Msg("failed to set deployment global error")
}
_, _ = e.queue.Dequeue()

continue
}

log.Debug().Uint64("contract", job.Target.ContractID).Msg("contact validation pass")
l.Debug().Msg("contact validation pass")
}

switch job.Op {
Expand Down Expand Up @@ -515,15 +523,15 @@ func (e *NativeEngine) Run(root context.Context) error {
// and update to reflect the current result on those workloads.
update, err := job.Source.Upgrade(&job.Target)
if err != nil {
log.Error().Err(err).Uint32("twin", job.Target.TwinID).Uint64("id", job.Target.ContractID).Msg("failed to get update procedure")
l.Error().Err(err).Msg("failed to get update procedure")
break
}
e.updateDeployment(ctx, update)
}

_, err = e.queue.Dequeue()
if err != nil {
log.Error().Err(err).Msg("failed to dequeue job")
l.Error().Err(err).Msg("failed to dequeue job")
}

e.safeCallback(&job.Target, job.Op == opDeprovision)
Expand Down

0 comments on commit bdcbd36

Please sign in to comment.