From 7aaca566ebf81c2710e0e02ae02e57f45f01db83 Mon Sep 17 00:00:00 2001 From: OmarElawady Date: Wed, 13 Oct 2021 15:33:44 +0200 Subject: [PATCH 1/3] rework workload update order --- pkg/gridtypes/deployment.go | 46 ++++--- pkg/provision/engine.go | 243 ++++++++++++++++++------------------ 2 files changed, 152 insertions(+), 137 deletions(-) diff --git a/pkg/gridtypes/deployment.go b/pkg/gridtypes/deployment.go index 6e3119d40..dc0a6e8f1 100644 --- a/pkg/gridtypes/deployment.go +++ b/pkg/gridtypes/deployment.go @@ -399,7 +399,7 @@ func (d *Deployment) ByType(typ WorkloadType) []*WorkloadWithID { // Upgrade validates n as an updated version of d, and return an Upgrade description // for the steps that the node needs to take to move from d to n. unchanged workloads results // will be set on n as is -func (d *Deployment) Upgrade(n *Deployment) (*Upgrade, error) { +func (d *Deployment) Upgrade(n *Deployment) ([]UpgradeOp, error) { if err := n.Valid(); err != nil { return nil, errors.Wrap(err, "new deployment is invalid") } @@ -420,10 +420,7 @@ func (d *Deployment) Upgrade(n *Deployment) (*Upgrade, error) { current[wl.Name] = wl } - update := make([]*WorkloadWithID, 0) - add := make([]*WorkloadWithID, 0) - remove := make([]*WorkloadWithID, 0) - + ops := make([]UpgradeOp, 0) for i := range n.Workloads { l := &n.Workloads[i] id, err := NewWorkloadID(n.TwinID, n.ContractID, l.Name) @@ -439,7 +436,10 @@ func (d *Deployment) Upgrade(n *Deployment) (*Upgrade, error) { if !ok { if wl.Version == expected { // newly added workload - add = append(add, wl) + ops = append(ops, UpgradeOp{ + wl, + OpAdd, + }) } else { return nil, fmt.Errorf("invalid version number for workload '%s' expected '%d'", wl.Name, expected) } @@ -454,7 +454,11 @@ func (d *Deployment) Upgrade(n *Deployment) (*Upgrade, error) { // so if wl.Version == expected { // should added to 'update' pile - update = append(update, wl) + ops = append(ops, UpgradeOp{ + wl, + OpUpdate, + }) + } // other wise. we leave it untouched } @@ -466,18 +470,26 @@ func (d *Deployment) Upgrade(n *Deployment) (*Upgrade, error) { for _, wl := range current { id, _ := NewWorkloadID(d.TwinID, d.ContractID, wl.Name) - remove = append(remove, &WorkloadWithID{ - Workload: wl, - ID: id, + ops = append(ops, UpgradeOp{ + &WorkloadWithID{ + Workload: wl, + ID: id, + }, + OpRemove, }) } - - return &Upgrade{ToAdd: add, ToUpdate: update, ToRemove: remove}, nil + return ops, nil } -// Upgrade procedure structure -type Upgrade struct { - ToAdd []*WorkloadWithID - ToUpdate []*WorkloadWithID - ToRemove []*WorkloadWithID +type UpgradeOp struct { + WlID *WorkloadWithID + Op jobOperation } + +type jobOperation int + +const ( + OpRemove jobOperation = iota + OpAdd + OpUpdate +) diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index d77a0a555..d624f3d60 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "time" "github.com/joncrlsn/dque" @@ -95,6 +96,7 @@ type NativeEngine struct { twins Twins admins Twins order []gridtypes.WorkloadType + revOrder map[gridtypes.WorkloadType]int rerunAll bool //substrate specific attributes nodeID uint32 @@ -155,10 +157,12 @@ func (w *withStartupOrder) apply(e *NativeEngine) { } delete(all, typ) ordered = append(ordered, typ) + e.revOrder[typ] = len(ordered) } // now move everything else for typ := range all { ordered = append(ordered, typ) + e.revOrder[typ] = len(ordered) } e.order = ordered @@ -237,6 +241,7 @@ func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOp twins: &nullKeyGetter{}, admins: &nullKeyGetter{}, order: gridtypes.Types(), + revOrder: make(map[gridtypes.WorkloadType]int), } for _, opt := range opts { @@ -397,11 +402,7 @@ func (e *NativeEngine) Run(root context.Context) error { log.Error().Err(err).Uint32("twin", job.Target.TwinID).Uint64("id", job.Target.ContractID).Msg("failed to get update procedure") break } - - e.uninstallDeployment(ctx, workloads(update.ToRemove), "deleted by an update") - e.updateDeployment(ctx, workloads(update.ToUpdate)) - e.installDeployment(ctx, workloads(update.ToAdd)) - + e.updateDeployment(ctx, update) if err := e.storage.Set(job.Target); err != nil { log.Error().Err(err).Msg("failed to set workload result") } @@ -486,6 +487,20 @@ func (e *NativeEngine) boot(root context.Context) error { } func (e *NativeEngine) uninstallWorkload(ctx context.Context, wl *gridtypes.WorkloadWithID, reason string) error { + twin, deployment, name, _ := wl.ID.Parts() + log := log.With(). + Uint32("twin", twin). + Uint64("deployment", deployment). + Str("name", name). + Str("type", wl.Type.String()). + Logger() + + log.Debug().Str("workload", string(wl.Name)).Msg("de-provisioning") + if wl.Result.State == gridtypes.StateDeleted { + //nothing to do! + return nil + } + err := e.provisioner.Decommission(ctx, wl) result := &gridtypes.Result{ Error: reason, @@ -503,84 +518,96 @@ func (e *NativeEngine) uninstallWorkload(ctx context.Context, wl *gridtypes.Work return err } -func (e *NativeEngine) uninstallDeployment(ctx context.Context, getter gridtypes.WorkloadByTypeGetter, reason string) { - for i := len(e.order) - 1; i >= 0; i-- { - typ := e.order[i] +func (e *NativeEngine) installWorkload(ctx context.Context, wl *gridtypes.WorkloadWithID) bool { + // this workload is already deleted or in error state + // we don't try again + if wl.Result.State == gridtypes.StateDeleted || + wl.Result.State == gridtypes.StateError { + //nothing to do! + return false + } - workloads := getter.ByType(typ) - for _, wl := range workloads { - twin, deployment, name, _ := wl.ID.Parts() - log := log.With(). - Uint32("twin", twin). - Uint64("deployment", deployment). - Str("name", name). - Str("type", wl.Type.String()). - Logger() - - log.Debug().Str("workload", string(wl.Name)).Msg("de-provisioning") - if wl.Result.State == gridtypes.StateDeleted { - //nothing to do! - continue - } + twin, deployment, name, _ := wl.ID.Parts() + log := log.With(). + Uint32("twin", twin). + Uint64("deployment", deployment). + Str("name", name). + Str("type", wl.Type.String()). + Logger() - _ = e.uninstallWorkload(ctx, wl, reason) + log.Debug().Msg("provisioning") + result, err := e.provisioner.Provision(ctx, wl) + if errors.Is(err, ErrDidNotChange) { + log.Debug().Msg("result did not change") + return false + } + + if err != nil { + log.Error().Err(err).Msg("failed to deploy workload") + result = &gridtypes.Result{ + Error: err.Error(), + State: gridtypes.StateError, } } -} -func (e *NativeEngine) updateDeployment(ctx context.Context, getter gridtypes.WorkloadByTypeGetter) (changed bool) { - for _, typ := range e.order { - workloads := getter.ByType(typ) + if result.State == gridtypes.StateError { + log.Error().Str("error", result.Error).Msg("failed to deploy workload") + } - for _, wl := range workloads { - // support redeployment by version update - // if wl.Result.State == gridtypes.StateDeleted || - // wl.Result.State == gridtypes.StateError { - // //nothing to do! - // continue - // } - - twin, deployment, name, _ := wl.ID.Parts() - log := log.With(). - Uint32("twin", twin). - Uint64("deployment", deployment). - Str("name", name). - Str("type", wl.Type.String()). - Logger() - - log.Debug().Msg("provisioning") - - var result *gridtypes.Result - var err error - if e.provisioner.CanUpdate(ctx, wl.Type) { - result, err = e.provisioner.Update(ctx, wl) - } else { - if err := e.provisioner.Decommission(ctx, wl); err != nil { - log.Error().Err(err).Msg("failed to decomission workload") - } + result.Created = gridtypes.Timestamp(time.Now().Unix()) + wl.Result = *result + return true +} + +func (e *NativeEngine) updateWorkload(ctx context.Context, wl *gridtypes.WorkloadWithID) bool { + twin, deployment, name, _ := wl.ID.Parts() + log := log.With(). + Uint32("twin", twin). + Uint64("deployment", deployment). + Str("name", name). + Str("type", wl.Type.String()). + Logger() + + log.Debug().Msg("provisioning") + + var result *gridtypes.Result + var err error + if e.provisioner.CanUpdate(ctx, wl.Type) { + result, err = e.provisioner.Update(ctx, wl) + } else { + if err := e.provisioner.Decommission(ctx, wl); err != nil { + log.Error().Err(err).Msg("failed to decomission workload") + } - result, err = e.provisioner.Provision(ctx, wl) - } + result, err = e.provisioner.Provision(ctx, wl) + } - if err != nil { - log.Error().Err(err).Msg("failed to deploy workload") - result = &gridtypes.Result{ - Error: err.Error(), - State: gridtypes.StateError, - } - } + if err != nil { + log.Error().Err(err).Msg("failed to deploy workload") + result = &gridtypes.Result{ + Error: err.Error(), + State: gridtypes.StateError, + } + } - if result.State == gridtypes.StateError { - log.Error().Str("error", result.Error).Msg("failed to deploy workload") - } + if result.State == gridtypes.StateError { + log.Error().Str("error", result.Error).Msg("failed to deploy workload") + } + + result.Created = gridtypes.Timestamp(time.Now().Unix()) + wl.Result = *result + return true +} - result.Created = gridtypes.Timestamp(time.Now().Unix()) - wl.Result = *result - changed = true +func (e *NativeEngine) uninstallDeployment(ctx context.Context, getter gridtypes.WorkloadByTypeGetter, reason string) { + for i := len(e.order) - 1; i >= 0; i-- { + typ := e.order[i] + + workloads := getter.ByType(typ) + for _, wl := range workloads { + _ = e.uninstallWorkload(ctx, wl, reason) } } - - return } func (e *NativeEngine) installDeployment(ctx context.Context, getter gridtypes.WorkloadByTypeGetter) (changed bool) { @@ -588,47 +615,36 @@ func (e *NativeEngine) installDeployment(ctx context.Context, getter gridtypes.W workloads := getter.ByType(typ) for _, wl := range workloads { - // this workload is already deleted or in error state - // we don't try again - if wl.Result.State == gridtypes.StateDeleted || - wl.Result.State == gridtypes.StateError { - //nothing to do! - continue - } - - twin, deployment, name, _ := wl.ID.Parts() - log := log.With(). - Uint32("twin", twin). - Uint64("deployment", deployment). - Str("name", name). - Str("type", wl.Type.String()). - Logger() - - log.Debug().Msg("provisioning") - result, err := e.provisioner.Provision(ctx, wl) - if errors.Is(err, ErrDidNotChange) { - log.Debug().Msg("result did not change") - continue - } + changed = e.installWorkload(ctx, wl) || changed + } + } - if err != nil { - log.Error().Err(err).Msg("failed to deploy workload") - result = &gridtypes.Result{ - Error: err.Error(), - State: gridtypes.StateError, - } - } + return +} - if result.State == gridtypes.StateError { - log.Error().Str("error", result.Error).Msg("failed to deploy workload") +func (e *NativeEngine) updateDeployment(ctx context.Context, ops []gridtypes.UpgradeOp) (changed bool) { + sort.SliceStable(ops, func(i, j int) bool { + if ops[i].WlID.Type != ops[j].WlID.Type { + if ops[i].Op == gridtypes.OpRemove && ops[j].Op == gridtypes.OpRemove { + // deletes are in reverse order (delete the zmachine before deleting its disk) + return e.revOrder[ops[i].WlID.Type] > e.revOrder[ops[j].WlID.Type] + } else { + return e.revOrder[ops[i].WlID.Type] < e.revOrder[ops[j].WlID.Type] } - - result.Created = gridtypes.Timestamp(time.Now().Unix()) - wl.Result = *result - changed = true + } + // same workload type (shouldn't matter) + return ops[i].Op < ops[j].Op + }) + for _, op := range ops { + switch op.Op { + case gridtypes.OpRemove: + _ = e.uninstallWorkload(ctx, op.WlID, "deleted by an update") + case gridtypes.OpAdd: + changed = e.installWorkload(ctx, op.WlID) || changed + case gridtypes.OpUpdate: + changed = e.updateWorkload(ctx, op.WlID) || changed } } - return } @@ -687,16 +703,3 @@ func (e *NativeEngine) DecommissionCached(id string, reason string) error { return err } - -type workloads []*gridtypes.WorkloadWithID - -func (l workloads) ByType(typ gridtypes.WorkloadType) []*gridtypes.WorkloadWithID { - var results []*gridtypes.WorkloadWithID - for _, wl := range l { - if wl.Type == typ { - results = append(results, wl) - } - } - - return results -} From 755769f98375bdf06ea32364f788ad775e932cb2 Mon Sep 17 00:00:00 2001 From: OmarElawady Date: Wed, 13 Oct 2021 15:49:01 +0200 Subject: [PATCH 2/3] fix sorting order --- pkg/provision/engine.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index d624f3d60..935383989 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -623,17 +623,15 @@ func (e *NativeEngine) installDeployment(ctx context.Context, getter gridtypes.W } func (e *NativeEngine) updateDeployment(ctx context.Context, ops []gridtypes.UpgradeOp) (changed bool) { - sort.SliceStable(ops, func(i, j int) bool { - if ops[i].WlID.Type != ops[j].WlID.Type { - if ops[i].Op == gridtypes.OpRemove && ops[j].Op == gridtypes.OpRemove { - // deletes are in reverse order (delete the zmachine before deleting its disk) - return e.revOrder[ops[i].WlID.Type] > e.revOrder[ops[j].WlID.Type] - } else { - return e.revOrder[ops[i].WlID.Type] < e.revOrder[ops[j].WlID.Type] - } + opMap := func(op gridtypes.UpgradeOp) int { + if op.Op == gridtypes.OpRemove { + return -e.revOrder[op.WlID.Type] + } else { + return e.revOrder[op.WlID.Type] } - // same workload type (shouldn't matter) - return ops[i].Op < ops[j].Op + } + sort.SliceStable(ops, func(i, j int) bool { + return opMap(ops[i]) < opMap(ops[j]) }) for _, op := range ops { switch op.Op { From e93b4135bf349e5623de997abfbcf7e88d3b111c Mon Sep 17 00:00:00 2001 From: OmarElawady Date: Thu, 14 Oct 2021 11:41:04 +0200 Subject: [PATCH 3/3] rename revOrder->typeIndex, extract operations sorting --- pkg/provision/engine.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index 935383989..6b001c60f 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -93,11 +93,11 @@ type NativeEngine struct { //options // janitor Janitor - twins Twins - admins Twins - order []gridtypes.WorkloadType - revOrder map[gridtypes.WorkloadType]int - rerunAll bool + twins Twins + admins Twins + order []gridtypes.WorkloadType + typeIndex map[gridtypes.WorkloadType]int + rerunAll bool //substrate specific attributes nodeID uint32 sub *substrate.Substrate @@ -157,12 +157,12 @@ func (w *withStartupOrder) apply(e *NativeEngine) { } delete(all, typ) ordered = append(ordered, typ) - e.revOrder[typ] = len(ordered) + e.typeIndex[typ] = len(ordered) } // now move everything else for typ := range all { ordered = append(ordered, typ) - e.revOrder[typ] = len(ordered) + e.typeIndex[typ] = len(ordered) } e.order = ordered @@ -241,7 +241,7 @@ func New(storage Storage, provisioner Provisioner, root string, opts ...EngineOp twins: &nullKeyGetter{}, admins: &nullKeyGetter{}, order: gridtypes.Types(), - revOrder: make(map[gridtypes.WorkloadType]int), + typeIndex: make(map[gridtypes.WorkloadType]int), } for _, opt := range opts { @@ -622,17 +622,26 @@ func (e *NativeEngine) installDeployment(ctx context.Context, getter gridtypes.W return } -func (e *NativeEngine) updateDeployment(ctx context.Context, ops []gridtypes.UpgradeOp) (changed bool) { +// sortOperations sortes the operations, removes first in reverse type order, then upgrades/creates in type order +func (e *NativeEngine) sortOperations(ops []gridtypes.UpgradeOp) { + // maps an operation to an integer, less comes first in sorting opMap := func(op gridtypes.UpgradeOp) int { if op.Op == gridtypes.OpRemove { - return -e.revOrder[op.WlID.Type] + // removes are negative (typeIndex starts from 1) so they are always before creations/updates + // negated to apply in reverse order + return -e.typeIndex[op.WlID.Type] } else { - return e.revOrder[op.WlID.Type] + // updates/creates are considered the same + return e.typeIndex[op.WlID.Type] } } sort.SliceStable(ops, func(i, j int) bool { return opMap(ops[i]) < opMap(ops[j]) }) +} + +func (e *NativeEngine) updateDeployment(ctx context.Context, ops []gridtypes.UpgradeOp) (changed bool) { + e.sortOperations(ops) for _, op := range ops { switch op.Op { case gridtypes.OpRemove: