From c9f62aca9383e02cc27274c0dd81e443f3eb4643 Mon Sep 17 00:00:00 2001 From: Joel Rebello Date: Tue, 30 Jan 2024 17:43:46 +0100 Subject: [PATCH] worker: fix up to use the internal task runner --- internal/worker/task_handler.go | 398 ++++++++++++--------------- internal/worker/task_handler_test.go | 43 ++- internal/worker/worker.go | 96 ++++--- 3 files changed, 247 insertions(+), 290 deletions(-) diff --git a/internal/worker/task_handler.go b/internal/worker/task_handler.go index 8747448e..d8a7f6c5 100644 --- a/internal/worker/task_handler.go +++ b/internal/worker/task_handler.go @@ -1,22 +1,24 @@ package worker import ( + "context" "fmt" "sort" "strings" "time" "github.com/bmc-toolbox/common" - sw "github.com/filanov/stateswitch" "github.com/metal-toolbox/flasher/internal/metrics" "github.com/metal-toolbox/flasher/internal/model" "github.com/metal-toolbox/flasher/internal/outofband" + "github.com/metal-toolbox/flasher/internal/runner" sm "github.com/metal-toolbox/flasher/internal/statemachine" + "github.com/metal-toolbox/flasher/internal/store" + rctypes "github.com/metal-toolbox/rivets/condition" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - - rctypes "github.com/metal-toolbox/rivets/condition" + "go.hollow.sh/toolbox/events/registry" ) var ( @@ -26,50 +28,87 @@ var ( errTaskPlanActions = errors.New("error in task action planning") ) -// taskHandler implements the taskTransitionHandler methods -type taskHandler struct{} +// handler implements the Runner.Handler interface +// +// The handler is instantiated to run a single task +type handler struct { + ctx *sm.HandlerContext +} -func (h *taskHandler) Init(_ sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidtaskHandlerContext +func newHandler( + ctx context.Context, + dryrun bool, + workerID registry.ControllerID, + facilityCode string, + task *model.Task, + asset *model.Asset, + storage store.Repository, + publisher sm.Publisher, + logger *logrus.Entry, +) runner.Handler { + handlerCtx := &sm.HandlerContext{ + WorkerID: workerID, + Dryrun: dryrun || task.Parameters.DryRun, + Task: task, + Publisher: publisher, + Ctx: ctx, + Store: storage, + Data: make(map[string]string), + Asset: asset, + FacilityCode: facilityCode, + Logger: logger, } - if tctx.DeviceQueryor == nil { + return &handler{handlerCtx} +} + +func (t *handler) Initialize(ctx context.Context) error { + if t.ctx.DeviceQueryor == nil { // TODO(joel): DeviceQueryor is to be instantiated based on the method(s) for the firmwares to be installed // if its a mix of inband, out of band firmware to be installed, then both are to be queried and // so this DeviceQueryor would have to be extended // // For this to work with both inband and out of band, the firmware set data should include the install method. - tctx.DeviceQueryor = outofband.NewDeviceQueryor(tctx.Ctx, tctx.Asset, tctx.Logger) + t.ctx.DeviceQueryor = outofband.NewDeviceQueryor(ctx, t.ctx.Asset, t.ctx.Logger) } return nil } -// Query looks up the device component inventory and sets it in the task handler context. -func (h *taskHandler) Query(t sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidtaskHandlerContext +func (t *handler) Query(ctx context.Context) error { + t.ctx.Logger.Debug("run query step") + + t.ctx.Task.Status.Append("connecting to device BMC") + t.ctx.Publisher.Publish(t.ctx) + + if err := t.ctx.DeviceQueryor.Open(ctx); err != nil { + return err } - _, ok = t.(*model.Task) - if !ok { - return errors.Wrap(errTaskQueryInventory, ErrTaskTypeAssertion.Error()) + t.ctx.Task.Status.Append("collecting inventory from device BMC") + t.ctx.Publisher.Publish(t.ctx) + + deviceCommon, err := t.ctx.DeviceQueryor.Inventory(ctx) + if err != nil { + return errors.Wrap(errTaskQueryInventory, err.Error()) + } + + if t.ctx.Asset.Vendor == "" { + t.ctx.Asset.Vendor = deviceCommon.Vendor } - tctx.Logger.Debug("run query step") + if t.ctx.Asset.Model == "" { + t.ctx.Asset.Model = common.FormatProductName(deviceCommon.Model) + } - // attempt to fetch component inventory from the device - components, err := h.queryFromDevice(tctx) + components, err := model.NewComponentConverter().CommonDeviceToComponents(deviceCommon) if err != nil { return errors.Wrap(errTaskQueryInventory, err.Error()) } // component inventory was identified if len(components) > 0 { - tctx.Asset.Components = components + t.ctx.Asset.Components = components return nil } @@ -77,154 +116,20 @@ func (h *taskHandler) Query(t sw.StateSwitch, args sw.TransitionArgs) error { return errors.Wrap(errTaskQueryInventory, "failed to query device component inventory") } -func (h *taskHandler) Plan(t sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidtaskHandlerContext - } - - task, ok := t.(*model.Task) - if !ok { - return errors.Wrap(ErrSaveTask, ErrTaskTypeAssertion.Error()) - } - - tctx.Logger.Debug("create the plan") - - switch task.FirmwarePlanMethod { +func (t *handler) PlanActions(ctx context.Context) error { + switch t.ctx.Task.FirmwarePlanMethod { case model.FromFirmwareSet: - return h.planFromFirmwareSet(tctx, task) + return t.planFromFirmwareSet(ctx) case model.FromRequestedFirmware: return errors.Wrap(errTaskPlanActions, "firmware plan method not implemented"+string(model.FromRequestedFirmware)) default: - return errors.Wrap(errTaskPlanActions, "firmware plan method invalid: "+string(task.FirmwarePlanMethod)) - } -} - -func (h *taskHandler) registerActionMetrics(startTS time.Time, action *model.Action, state string) { - metrics.ActionRuntimeSummary.With( - prometheus.Labels{ - "vendor": action.Firmware.Vendor, - "component": action.Firmware.Component, - "state": state, - }, - ).Observe(time.Since(startTS).Seconds()) -} - -func (h *taskHandler) Run(t sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidTransitionHandler - } - - task, ok := t.(*model.Task) - if !ok { - return errors.Wrap(ErrSaveTask, ErrTaskTypeAssertion.Error()) - } - - tctx.Logger.WithField("plan.actions", len(tctx.ActionStateMachines)).Debug("running the plan") - - // each actionSM (state machine) corresponds to a firmware to be installed - for _, actionSM := range tctx.ActionStateMachines { - startTS := time.Now() - - // fetch action attributes from task - action := task.ActionsPlanned.ByID(actionSM.ActionID()) - if err := action.SetState(model.StateActive); err != nil { - return err - } - - // return on context cancellation - if tctx.Ctx.Err() != nil { - h.registerActionMetrics(startTS, action, string(rctypes.Failed)) - - return tctx.Ctx.Err() - } - - tctx.Logger.WithFields(logrus.Fields{ - "statemachineID": actionSM.ActionID(), - "final": action.Final, - }).Debug("action state machine start") - - // run the action state machine - err := actionSM.Run(tctx.Ctx, action, tctx) - if err != nil { - h.registerActionMetrics(startTS, action, string(rctypes.Failed)) - - return errors.Wrap( - err, - "while running action to install firmware on component "+action.Firmware.Component, - ) - } - - tctx.Logger.WithFields(logrus.Fields{ - "action": action.ID, - "condition": action.TaskID, - "component": action.Firmware.Component, - "version": action.Firmware.Version, - }).Info("action for component completed successfully") - - if !action.Final { - continue - } - - h.registerActionMetrics(startTS, action, string(rctypes.Succeeded)) - tctx.Logger.WithFields(logrus.Fields{ - "statemachineID": actionSM.ActionID(), - }).Debug("state machine end") - } - - tctx.Logger.Debug("plan finished") - return nil -} - -func (h *taskHandler) TaskFailed(_ sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidTransitionHandler - } - - tctx.Task.Status.Append("task failed") - - if tctx.DeviceQueryor != nil { - if err := tctx.DeviceQueryor.Close(tctx.Ctx); err != nil { - tctx.Logger.WithFields(logrus.Fields{"err": err.Error()}).Warn("device logout error") - } - } - - return nil -} - -func (h *taskHandler) TaskSuccessful(_ sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidTransitionHandler + return errors.Wrap(errTaskPlanActions, "firmware plan method invalid: "+string(t.ctx.Task.FirmwarePlanMethod)) } - - tctx.Task.Status.Append("task completed successfully") - - if tctx.DeviceQueryor != nil { - if err := tctx.DeviceQueryor.Close(tctx.Ctx); err != nil { - tctx.Logger.WithFields(logrus.Fields{"err": err.Error()}).Warn("device logout error") - } - } - - return nil -} - -func (h *taskHandler) PublishStatus(_ sw.StateSwitch, args sw.TransitionArgs) error { - tctx, ok := args.(*sm.HandlerContext) - if !ok { - return sm.ErrInvalidTransitionHandler - } - - tctx.Publisher.Publish(tctx) - - return nil } // planFromFirmwareSet -func (h *taskHandler) planFromFirmwareSet(tctx *sm.HandlerContext, task *model.Task) error { - applicable, err := tctx.Store.FirmwareSetByID(tctx.Ctx, task.Parameters.FirmwareSetID) +func (t *handler) planFromFirmwareSet(ctx context.Context) error { + applicable, err := t.ctx.Store.FirmwareSetByID(ctx, t.ctx.Task.Parameters.FirmwareSetID) if err != nil { return errors.Wrap(errTaskPlanActions, err.Error()) } @@ -235,7 +140,7 @@ func (h *taskHandler) planFromFirmwareSet(tctx *sm.HandlerContext, task *model.T } // plan actions based and update task action list - tctx.ActionStateMachines, task.ActionsPlanned, err = h.planInstall(tctx, task, applicable) + t.ctx.ActionStateMachines, t.ctx.Task.ActionsPlanned, err = t.planInstall(ctx, applicable) if err != nil { return err } @@ -243,74 +148,46 @@ func (h *taskHandler) planFromFirmwareSet(tctx *sm.HandlerContext, task *model.T return nil } -// query device components inventory from the device itself. -func (h *taskHandler) queryFromDevice(tctx *sm.HandlerContext) (model.Components, error) { - tctx.Task.Status.Append("connecting to device BMC") - tctx.Publisher.Publish(tctx) - - if err := tctx.DeviceQueryor.Open(tctx.Ctx); err != nil { - return nil, err - } - - tctx.Task.Status.Append("collecting inventory from device BMC") - tctx.Publisher.Publish(tctx) - - deviceCommon, err := tctx.DeviceQueryor.Inventory(tctx.Ctx) - if err != nil { - return nil, err - } - - if tctx.Asset.Vendor == "" { - tctx.Asset.Vendor = deviceCommon.Vendor - } - - if tctx.Asset.Model == "" { - tctx.Asset.Model = common.FormatProductName(deviceCommon.Model) - } - - return model.NewComponentConverter().CommonDeviceToComponents(deviceCommon) -} - // planInstall sets up the firmware install plan // // This returns a list of actions to added to the task and a list of action state machines for those actions. -func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, firmwares []*model.Firmware) (sm.ActionStateMachines, model.Actions, error) { +func (t *handler) planInstall(ctx context.Context, firmwares []*model.Firmware) (sm.ActionStateMachines, model.Actions, error) { actionMachines := make(sm.ActionStateMachines, 0) actions := make(model.Actions, 0) // final is set to true in the final action var final bool - hCtx.Logger.WithFields(logrus.Fields{ - "condition.id": task.ID, + t.ctx.Logger.WithFields(logrus.Fields{ + "condition.id": t.ctx.Task.ID, "requested.firmware.count": fmt.Sprintf("%d", len(firmwares)), }).Debug("checking against current inventory") toInstall := firmwares - if !task.Parameters.ForceInstall { - toInstall = removeFirmwareAlreadyAtDesiredVersion(hCtx, firmwares) + if !t.ctx.Task.Parameters.ForceInstall { + toInstall = t.removeFirmwareAlreadyAtDesiredVersion(firmwares) } if len(toInstall) == 0 { info := "no actions required for this task" - hCtx.Publisher.Publish(hCtx) - hCtx.Logger.Info(info) + t.ctx.Publisher.Publish(t.ctx) + t.ctx.Logger.Info(info) return actionMachines, actions, nil } - sortFirmwareByInstallOrder(toInstall) + t.sortFirmwareByInstallOrder(toInstall) // each firmware applicable results in an ActionPlan and an Action for idx, firmware := range toInstall { // set final bool when its the last firmware in the slice final = (idx == len(toInstall)-1) // generate an action ID - actionID := sm.ActionID(task.ID.String(), firmware.Component, idx) + actionID := sm.ActionID(t.ctx.Task.ID.String(), firmware.Component, idx) - steps, err := hCtx.DeviceQueryor.FirmwareInstallSteps(hCtx.Ctx, firmware.Component) + steps, err := t.ctx.DeviceQueryor.FirmwareInstallSteps(ctx, firmware.Component) if err != nil { return nil, nil, err } @@ -326,7 +203,7 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir // based on that the action plan is setup. // // For now this is hardcoded to outofband. - m, err := outofband.NewActionStateMachine(actionID, steps, task.Parameters.ResetBMCBeforeInstall) + m, err := outofband.NewActionStateMachine(actionID, steps, t.ctx.Task.Parameters.ResetBMCBeforeInstall) if err != nil { return nil, nil, err } @@ -336,7 +213,7 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir newAction := model.Action{ ID: actionID, - TaskID: task.ID.String(), + TaskID: t.ctx.Task.ID.String(), // TODO: The firmware is to define the preferred install method // based on that the action plan is setup. @@ -348,7 +225,7 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir Firmware: *firmware, // VerifyCurrentFirmware is disabled when ForceInstall is true. - VerifyCurrentFirmware: !task.Parameters.ForceInstall, + VerifyCurrentFirmware: !t.ctx.Task.Parameters.ForceInstall, // Final is set to true when its the last action in the list. Final: final, @@ -359,7 +236,7 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir // The BMC requires to be reset only on the first action if idx == 0 { - newAction.BMCResetPreInstall = task.Parameters.ResetBMCBeforeInstall + newAction.BMCResetPreInstall = t.ctx.Task.Parameters.ResetBMCBeforeInstall } //nolint:errcheck // SetState never returns an error @@ -372,7 +249,7 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir return actionMachines, actions, nil } -func sortFirmwareByInstallOrder(firmwares []*model.Firmware) { +func (t *handler) sortFirmwareByInstallOrder(firmwares []*model.Firmware) { sort.Slice(firmwares, func(i, j int) bool { slugi := strings.ToLower(firmwares[i].Component) slugj := strings.ToLower(firmwares[j].Component) @@ -381,11 +258,11 @@ func sortFirmwareByInstallOrder(firmwares []*model.Firmware) { } // returns a list of firmware applicable and a list of causes for firmwares that were removed from the install list. -func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model.Firmware) []*model.Firmware { +func (t *handler) removeFirmwareAlreadyAtDesiredVersion(fws []*model.Firmware) []*model.Firmware { var toInstall []*model.Firmware invMap := make(map[string]string) - for _, cmp := range hCtx.Asset.Components { + for _, cmp := range t.ctx.Asset.Components { invMap[strings.ToLower(cmp.Slug)] = cmp.FirmwareInstalled } @@ -407,23 +284,23 @@ func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model switch { case !ok: cause := "component not found in inventory" - hCtx.Logger.WithFields(logrus.Fields{ + t.ctx.Logger.WithFields(logrus.Fields{ "component": fw.Component, }).Warn(cause) - hCtx.Task.Status.Append(fmtCause(fw.Component, cause, "", "")) + t.ctx.Task.Status.Append(fmtCause(fw.Component, cause, "", "")) case strings.EqualFold(currentVersion, fw.Version): cause := "component firmware version equal" - hCtx.Logger.WithFields(logrus.Fields{ + t.ctx.Logger.WithFields(logrus.Fields{ "component": fw.Component, "version": fw.Version, }).Debug(cause) - hCtx.Task.Status.Append(fmtCause(fw.Component, cause, currentVersion, fw.Version)) + t.ctx.Task.Status.Append(fmtCause(fw.Component, cause, currentVersion, fw.Version)) default: - hCtx.Logger.WithFields(logrus.Fields{ + t.ctx.Logger.WithFields(logrus.Fields{ "component": fw.Component, "installed.version": currentVersion, "mandated.version": fw.Version, @@ -431,7 +308,7 @@ func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model toInstall = append(toInstall, fw) - hCtx.Task.Status.Append( + t.ctx.Task.Status.Append( fmtCause(fw.Component, "firmware queued for install", currentVersion, fw.Version), ) } @@ -439,3 +316,94 @@ func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model return toInstall } + +func (t *handler) RunActions(ctx context.Context) error { + t.ctx.Logger.WithField("plan.actions", len(t.ctx.ActionStateMachines)).Debug("running the plan") + + // each actionSM (state machine) corresponds to a firmware to be installed + for _, actionSM := range t.ctx.ActionStateMachines { + startTS := time.Now() + + // fetch action attributes from task + action := t.ctx.Task.ActionsPlanned.ByID(actionSM.ActionID()) + if err := action.SetState(model.StateActive); err != nil { + return err + } + + // return on context cancellation + if ctx.Err() != nil { + t.registerActionMetrics(startTS, action, string(rctypes.Failed)) + + return ctx.Err() + } + + t.ctx.Logger.WithFields(logrus.Fields{ + "statemachineID": actionSM.ActionID(), + "final": action.Final, + }).Debug("action state machine start") + + // run the action state machine + err := actionSM.Run(ctx, action, t.ctx) + if err != nil { + t.registerActionMetrics(startTS, action, string(rctypes.Failed)) + + return errors.Wrap( + err, + "while running action to install firmware on component "+action.Firmware.Component, + ) + } + + t.ctx.Logger.WithFields(logrus.Fields{ + "action": action.ID, + "condition": action.TaskID, + "component": action.Firmware.Component, + "version": action.Firmware.Version, + }).Info("action for component completed successfully") + + if !action.Final { + continue + } + + t.registerActionMetrics(startTS, action, string(rctypes.Succeeded)) + t.ctx.Logger.WithFields(logrus.Fields{ + "statemachineID": actionSM.ActionID(), + }).Debug("state machine end") + } + + t.ctx.Logger.Debug("plan finished") + return nil +} + +func (t *handler) OnSuccess(ctx context.Context, _ *model.Task) { + if t.ctx.DeviceQueryor == nil { + return + } + + if err := t.ctx.DeviceQueryor.Close(ctx); err != nil { + t.ctx.Logger.WithFields(logrus.Fields{"err": err.Error()}).Warn("device logout error") + } +} + +func (t *handler) OnFailure(ctx context.Context, _ *model.Task) { + if t.ctx.DeviceQueryor == nil { + return + } + + if err := t.ctx.DeviceQueryor.Close(ctx); err != nil { + t.ctx.Logger.WithFields(logrus.Fields{"err": err.Error()}).Warn("device logout error") + } +} + +func (t *handler) Publish() { + t.ctx.Publisher.Publish(t.ctx) +} + +func (t *handler) registerActionMetrics(startTS time.Time, action *model.Action, state string) { + metrics.ActionRuntimeSummary.With( + prometheus.Labels{ + "vendor": action.Firmware.Vendor, + "component": action.Firmware.Component, + "state": state, + }, + ).Observe(time.Since(startTS).Seconds()) +} diff --git a/internal/worker/task_handler_test.go b/internal/worker/task_handler_test.go index 9f9ec275..b3cef04b 100644 --- a/internal/worker/task_handler_test.go +++ b/internal/worker/task_handler_test.go @@ -1,6 +1,7 @@ package worker import ( + "context" "testing" "github.com/google/uuid" @@ -17,7 +18,7 @@ import ( rctypes "github.com/metal-toolbox/rivets/condition" ) -func Test_sortFirmwareByInstallOrde(t *testing.T) { +func TestSortFirmwareByInstallOrder(t *testing.T) { have := []*model.Firmware{ { Version: "DL6R", @@ -72,7 +73,8 @@ func Test_sortFirmwareByInstallOrde(t *testing.T) { }, } - sortFirmwareByInstallOrder(have) + h := handler{} + h.sortFirmwareByInstallOrder(have) assert.Equal(t, expected, have) } @@ -137,7 +139,8 @@ func TestRemoveFirmwareAlreadyAtDesiredVersion(t *testing.T) { }, } - got := removeFirmwareAlreadyAtDesiredVersion(ctx, fwSet) + h := handler{ctx} + got := h.removeFirmwareAlreadyAtDesiredVersion(fwSet) require.Equal(t, 3, len(ctx.Task.Status.StatusMsgs)) require.Equal(t, 1, len(got)) require.Equal(t, expected[0], got[0]) @@ -199,21 +202,15 @@ func TestPlanInstall1(t *testing.T) { }, Task: &model.Task{ ID: taskID, + Parameters: rctypes.FirmwareInstallTaskParameters{ + AssetID: serverID, + ResetBMCBeforeInstall: true, + }, }, WorkerID: registry.GetID("test-app"), DeviceQueryor: q, } - h := &taskHandler{} - - taskParam := &model.Task{ - ID: uuid.MustParse("95ccb1c5-d807-4078-bb22-facc3045a49a"), - Parameters: rctypes.FirmwareInstallTaskParameters{ - AssetID: serverID, - ResetBMCBeforeInstall: true, - }, - } - q.EXPECT().FirmwareInstallSteps(gomock.Any(), gomock.Any()). Times(2). Return([]bconsts.FirmwareInstallStep{ @@ -221,7 +218,8 @@ func TestPlanInstall1(t *testing.T) { bconsts.FirmwareInstallStepInstallStatus, }, nil) - sms, actions, err := h.planInstall(ctx, taskParam, fwSet) + h := &handler{ctx} + sms, actions, err := h.planInstall(context.Background(), fwSet) require.NoError(t, err, "no errors returned") require.Equal(t, 2, len(sms), "expect two action state machines") require.Equal(t, 2, len(actions), "expect two actions to be performed") @@ -290,21 +288,15 @@ func TestPlanInstall2(t *testing.T) { }, Task: &model.Task{ ID: taskID, + Parameters: rctypes.FirmwareInstallTaskParameters{ + AssetID: serverID, + ForceInstall: true, + }, }, WorkerID: registry.GetID("test-app"), DeviceQueryor: q, } - h := &taskHandler{} - - taskParam := &model.Task{ - ID: uuid.MustParse("95ccb1c5-d807-4078-bb22-facc3045a49a"), - Parameters: rctypes.FirmwareInstallTaskParameters{ - AssetID: serverID, - ForceInstall: true, - }, - } - q.EXPECT().FirmwareInstallSteps(gomock.Any(), gomock.Any()). Times(3). Return([]bconsts.FirmwareInstallStep{ @@ -313,7 +305,8 @@ func TestPlanInstall2(t *testing.T) { bconsts.FirmwareInstallStepInstallStatus, }, nil) - sms, actions, err := h.planInstall(ctx, taskParam, fwSet) + h := &handler{ctx} + sms, actions, err := h.planInstall(context.Background(), fwSet) require.NoError(t, err, "no errors returned") require.Equal(t, 3, len(sms), "expect three action state machines") require.Equal(t, 3, len(actions), "expect three actions to be performed") diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 360f28d9..78771596 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -3,6 +3,7 @@ package worker import ( "context" "encoding/json" + "fmt" "os" "strconv" "sync" @@ -12,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/metal-toolbox/flasher/internal/metrics" "github.com/metal-toolbox/flasher/internal/model" + "github.com/metal-toolbox/flasher/internal/runner" "github.com/metal-toolbox/flasher/internal/store" "github.com/metal-toolbox/flasher/internal/version" "github.com/nats-io/nats.go" @@ -360,7 +362,7 @@ func (o *Worker) runTaskWithMonitor(ctx context.Context, task *model.Task, asset // the runTask method is expected to close this channel to indicate its done doneCh := make(chan bool) - // monitor sends in progress ack's until the task statemachine returns. + // monitor sends in progress ack's until the task handler returns. monitor := func() { defer o.syncWG.Done() @@ -382,35 +384,7 @@ func (o *Worker) runTaskWithMonitor(ctx context.Context, task *model.Task, asset go monitor() - // setup state machine task handler - handler := &taskHandler{} - - // setup state machine task handler context - l := logrus.New() - l.Formatter = o.logger.Formatter - l.Level = o.logger.Level - - handlerCtx := &sm.HandlerContext{ - WorkerID: o.id, - Dryrun: o.dryrun || task.Parameters.DryRun, - Task: task, - Publisher: o.getStatusPublisher(), - Ctx: ctx, - Store: o.store, - Data: make(map[string]string), - Asset: asset, - FacilityCode: o.facilityCode, - Logger: l.WithFields( - logrus.Fields{ - "workerID": o.id.String(), - "conditionID": task.ID.String(), - "assetID": asset.ID.String(), - "bmc": asset.BmcAddress.String(), - }, - ), - } - - o.runTaskStatemachine(handler, handlerCtx, doneCh) + o.runTaskHandler(ctx, asset, task, doneCh) <-doneCh } @@ -435,46 +409,68 @@ func (o *Worker) registerConditionMetrics(startTS time.Time, state string) { ).Observe(time.Since(startTS).Seconds()) } -func (o *Worker) runTaskStatemachine(handler *taskHandler, handlerCtx *sm.HandlerContext, doneCh chan bool) { +func (o *Worker) runTaskHandler(ctx context.Context, asset *model.Asset, task *model.Task, doneCh chan bool) { defer close(doneCh) - startTS := time.Now() + // prepare logger + l := logrus.New() + l.Formatter = o.logger.Formatter + l.Level = o.logger.Level + hLogger := l.WithFields( + logrus.Fields{ + "workerID": o.id.String(), + "conditionID": task.ID.String(), + "assetID": asset.ID.String(), + "bmc": asset.BmcAddress.String(), + }, + ) - o.logger.WithFields(logrus.Fields{ - "deviceID": handlerCtx.Task.Parameters.AssetID.String(), - "conditionID": handlerCtx.Task.ID, - }).Info("running task for device") + t, _ := ctx.Deadline() + fmt.Printf(">>>> ctx " + t.String()) - // init state machine for task - stateMachine, err := sm.NewTaskStateMachine(handler) - if err != nil { - o.logger.Error(err) + // init handler + handler := newHandler( + ctx, + o.dryrun, + o.id, + o.facilityCode, + task, + asset, + o.store, + o.getStatusPublisher(), + hLogger, + ) - o.registerConditionMetrics(startTS, string(rctypes.Failed)) + fmt.Printf(">>>> handler ctx " + t.String()) - return - } + // init runner + r := runner.New(hLogger) + startTS := time.Now() - // run task state machine - if err := stateMachine.Run(handlerCtx.Task, handlerCtx); err != nil { + o.logger.WithFields(logrus.Fields{ + "deviceID": task.Parameters.AssetID.String(), + "conditionID": task.ID, + }).Info("running task for device") + + // run task handler + if err := r.RunTask(ctx, task, handler); err != nil { o.logger.WithFields( logrus.Fields{ - "deviceID": handlerCtx.Task.Parameters.AssetID, - "conditionID": handlerCtx.Task.ID.String(), + "deviceID": task.Parameters.AssetID, + "conditionID": task.ID.String(), "err": err.Error(), }, ).Warn("task for device failed") o.registerConditionMetrics(startTS, string(rctypes.Failed)) - return } o.registerConditionMetrics(startTS, string(rctypes.Succeeded)) o.logger.WithFields(logrus.Fields{ - "deviceID": handlerCtx.Task.Parameters.AssetID.String(), - "conditionID": handlerCtx.Task.ID, + "deviceID": task.Parameters.AssetID.String(), + "conditionID": task.ID, "elapsed": time.Since(startTS).String(), }).Info("task for device completed") }