Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

this to make sure queues are cleaned up on boot #1812

Merged
merged 1 commit into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func action(cli *cli.Context) error {
}
}()

if err := app.MarkBooted(provisionModule); err != nil {
if err := app.MarkBooted(serverName); err != nil {
log.Error().Err(err).Msg("failed to mark module as booted")
}

Expand Down
36 changes: 22 additions & 14 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,9 @@ func GetSubstrate(ctx context.Context) substrate.Manager {
// one reservation at a time. On error, the engine will log the error. and
// continue to next reservation.
func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOption) (*NativeEngine, error) {
queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} })
if err != nil {
// if this happens it means data types has been changed in that case we need
// to clean up the queue and start over. unfortunately any un applied changes
os.RemoveAll(filepath.Join(root, "jobs"))
return nil, errors.Wrap(err, "failed to create job queue")
}
e := &NativeEngine{
storage: storage,
provisioner: provisioner,
queue: queue,
twins: &nullKeyGetter{},
admins: &nullKeyGetter{},
order: gridtypes.Types(),
Expand All @@ -296,6 +288,19 @@ func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOp
opt.apply(e)
}

if e.rerunAll {
os.RemoveAll(filepath.Join(root, "jobs"))
}

queue, err := dque.NewOrOpen("jobs", root, 512, func() interface{} { return &engineJob{} })
if err != nil {
// if this happens it means data types has been changed in that case we need
// to clean up the queue and start over. unfortunately any un applied changes
os.RemoveAll(filepath.Join(root, "jobs"))
return nil, errors.Wrap(err, "failed to create job queue")
}

e.queue = queue
return e, nil
}

Expand Down Expand Up @@ -459,7 +464,6 @@ func (e *NativeEngine) Run(root context.Context) error {
}

for {

obj, err := e.queue.PeekBlock()
if err != nil {
log.Error().Err(err).Msg("failed to check job queue")
Expand All @@ -469,24 +473,28 @@ func (e *NativeEngine) Run(root context.Context) error {

job := obj.(*engineJob)
ctx := withDeployment(root, job.Target.TwinID, job.Target.ContractID)
l := log.With().
Uint32("twin", job.Target.TwinID).
Uint64("contract", job.Target.ContractID).
Logger()

// contract validation
// this should ONLY be done on provosion and update operation
if job.Op == opProvision || job.Op == opUpdate {
// otherwise, contract validation is needed
ctx, err = e.contract(ctx, &job.Target)
if err != nil {
log.Error().Err(err).Uint64("contract", job.Target.ContractID).Msg("contact validation fails")
l.Error().Err(err).Msg("contact validation fails")
//job.Target.SetError(err)
if err := e.storage.Error(job.Target.TwinID, job.Target.ContractID, err); err != nil {
log.Error().Err(err).Msg("failed to set deployment global error")
l.Error().Err(err).Msg("failed to set deployment global error")
}
_, _ = e.queue.Dequeue()

continue
}

log.Debug().Uint64("contract", job.Target.ContractID).Msg("contact validation pass")
l.Debug().Msg("contact validation pass")
}

switch job.Op {
Expand Down Expand Up @@ -515,15 +523,15 @@ func (e *NativeEngine) Run(root context.Context) error {
// and update to reflect the current result on those workloads.
update, err := job.Source.Upgrade(&job.Target)
if err != nil {
log.Error().Err(err).Uint32("twin", job.Target.TwinID).Uint64("id", job.Target.ContractID).Msg("failed to get update procedure")
l.Error().Err(err).Msg("failed to get update procedure")
break
}
e.updateDeployment(ctx, update)
}

_, err = e.queue.Dequeue()
if err != nil {
log.Error().Err(err).Msg("failed to dequeue job")
l.Error().Err(err).Msg("failed to dequeue job")
}

e.safeCallback(&job.Target, job.Op == opDeprovision)
Expand Down