Skip to content

Commit

Permalink
worker, install: move RunActions into the runner
Browse files Browse the repository at this point in the history
This de-duplicates the method that runs actions and makes it part of the
runner.
  • Loading branch information
joelrebel committed Apr 26, 2024
1 parent dc6bccd commit a8f63c8
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 184 deletions.
81 changes: 0 additions & 81 deletions internal/install/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,87 +68,6 @@ func (t *handler) Query(ctx context.Context) error {
func (t *handler) PlanActions(ctx context.Context) error {
t.ctx.Logger.Debug("create the plan")

actionSMs, actions, err := t.planInstallFile(ctx)
if err != nil {
return err
}

t.ctx.ActionStateMachines = actionSMs
t.ctx.Task.ActionsPlanned = actions

return nil
}

func (t *handler) listPlan(tctx *sm.HandlerContext) error {
tctx.Logger.WithField("plan.actions", len(tctx.ActionStateMachines)).Info("only listing the plan")
for _, actionSM := range tctx.ActionStateMachines {
for _, tx := range actionSM.TransitionOrder() {
fmt.Println(tx)
}
}

return nil
}

func (t *handler) RunActions(ctx context.Context) error {
if t.onlyPlan {
return t.listPlan(t.ctx)
}

t.ctx.Logger.WithField("plan.actions", len(t.ctx.ActionStateMachines)).Debug("running the plan")

// each actionSM (state machine) corresponds to a firmware to be installed
for _, actionSM := range t.ctx.ActionStateMachines {
// fetch action attributes from task
action := t.ctx.Task.ActionsPlanned.ByID(actionSM.ActionID())
if err := action.SetState(model.StateActive); err != nil {
return err
}

// return on context cancellation
if ctx.Err() != nil {
return ctx.Err()
}

t.ctx.Logger.WithFields(logrus.Fields{
"statemachineID": actionSM.ActionID(),
"final": action.Final,
}).Debug("action state machine start")

// run the action state machine
err := actionSM.Run(ctx, action, t.ctx)
if err != nil {
return errors.Wrap(
err,
"while running action to install firmware on component "+action.Firmware.Component,
)
}

t.ctx.Logger.WithFields(logrus.Fields{
"action": action.ID,
"condition": action.TaskID,
"component": action.Firmware.Component,
"version": action.Firmware.Version,
}).Info("action for component completed successfully")

if !action.Final {
continue
}

t.ctx.Logger.WithFields(logrus.Fields{
"statemachineID": actionSM.ActionID(),
}).Debug("state machine end")
}

t.ctx.Logger.Debug("plan finished")
return nil
}

func (t *handler) Publish() {
t.ctx.Publisher.Publish(t.ctx)
}

func (t *handler) planInstallFile(ctx context.Context) (sm.ActionStateMachines, model.Actions, error) {
firmware := &model.Firmware{
Component: t.fwComponent,
Version: t.fwVersion,
Expand Down
86 changes: 81 additions & 5 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ import (
"github.com/sirupsen/logrus"
)

// A Runner instance runs a single task, setting up and executing the actions required to install firmware
// on one or more server components.
// A Runner instance runs a single task, to install firmware on one or more server components.
type Runner struct {
logger *logrus.Entry
}

type Handler interface {
type TaskHandler interface {
Initialize(ctx context.Context) error
Query(ctx context.Context) error
PlanActions(ctx context.Context) error
RunActions(ctx context.Context) error
OnSuccess(ctx context.Context, task *model.Task)
OnFailure(ctx context.Context, task *model.Task)
Publish(ctx context.Context)
Expand Down Expand Up @@ -81,7 +79,6 @@ func (r *Runner) RunTask(ctx context.Context, task *model.Task, handler Handler)
{"Initialize", handler.Initialize},
{"Query", handler.Query},
{"PlanActions", handler.PlanActions},
{"RunActions", handler.RunActions},
}

taskFailed := func(err error) error {
Expand Down Expand Up @@ -131,9 +128,88 @@ func (r *Runner) RunTask(ctx context.Context, task *model.Task, handler Handler)
}
}

r.logger.WithField("planned.actions", len(task.ActionsPlanned)).Debug("start running planned actions")

if err := r.runActions(ctx, task, handler); err != nil {
return taskFailed(err)
}

r.logger.WithField("planned.actions", len(task.ActionsPlanned)).Debug("finished running planned actions")

return taskSuccess()
}

func (r *Runner) runActions(ctx context.Context, task *model.Task, handler TaskHandler) error {
registerMetric := func(startTS time.Time, action *model.Action, state rctypes.State) {
registerActionMetric(startTS, action, string(state))
}

// helper func to log and publish step status
publish := func(state rctypes.State, action *model.Action, stepName model.StepName, logger *logrus.Entry) {
logger.WithField("step", stepName).Debug("running step")
task.Status.Append(fmt.Sprintf(
"[%s] install version: %s, method: %s, state: %s, step %s",
action.Firmware.Component,
action.Firmware.Version,
action.Firmware.InstallMethod,
state,
stepName,
))

handler.Publish(ctx)
}

// each action corresponds to a firmware to be installed
for _, action := range task.ActionsPlanned {
startTS := time.Now()

actionLogger := r.logger.WithFields(logrus.Fields{
"action": action.ID,
"component": action.Firmware.Component,
"fwversion": action.Firmware.Version,
})

// fetch action attributes from task
action.SetState(model.StateActive)

// return on context cancellation
if ctx.Err() != nil {
registerMetric(startTS, action, rctypes.Failed)
return ctx.Err()
}

actionLogger.Info("running action steps for firmware install")
for _, step := range action.Steps {
publish(model.StateActive, action, step.Name, actionLogger)

// run step
if err := step.Handler(ctx); err != nil {
action.SetState(model.StateFailed)
publish(model.StateFailed, action, step.Name, actionLogger)

registerMetric(startTS, action, rctypes.Failed)
return errors.Wrap(
err,
fmt.Sprintf(
"error while running step=%s to install firmware on component=%s",
step.Name,
action.Firmware.Component,
),
)
}

// log and publish status
action.SetState(model.StateSucceeded)
publish(model.StateSucceeded, action, step.Name, actionLogger)
}

registerMetric(startTS, action, rctypes.Succeeded)
actionLogger.Info("action steps for component completed successfully")
}

return nil
}

// conditionalFault is invoked before each runner method to induce a fault if specified
func (r *Runner) conditionalFault(ctx context.Context, fname string, task *model.Task, handler Handler) error {
var errConditionFault = errors.New("condition induced fault")
Expand Down
50 changes: 21 additions & 29 deletions internal/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,34 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
mock "github.com/stretchr/testify/mock"
)

func TestRunTask(t *testing.T) {
logger := logrus.New()
logger.SetLevel(logrus.ErrorLevel)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockHandler := NewMockHandler(ctrl)

tests := []struct {
name string
setupMock func()
mockSetup func(*MockTaskHandler)
expectedState string
expectedError error
}{
{
name: "Successful execution",
setupMock: func() {
mockHandler.EXPECT().Initialize(gomock.Any()).Return(nil)
mockHandler.EXPECT().Query(gomock.Any()).Return(nil)
mockHandler.EXPECT().PlanActions(gomock.Any()).Return(nil)
mockHandler.EXPECT().RunActions(gomock.Any()).Return(nil)
mockHandler.EXPECT().OnSuccess(gomock.Any(), gomock.Any())
mockHandler.EXPECT().Publish().AnyTimes()
mockSetup: func(m *MockTaskHandler) {
m.On("Initialize", mock.Anything).Return(nil)
m.On("Query", mock.Anything).Return(nil)
m.On("PlanActions", mock.Anything).Return(nil)
m.On("OnSuccess", mock.Anything, mock.Anything).Once()
m.On("Publish", mock.Anything).Maybe()
},
expectedState: string(model.StateSucceeded),
expectedError: nil,
},
{
name: "Failure during Initialize",
setupMock: func() {
mockHandler.EXPECT().Initialize(gomock.Any()).Return(errors.New("Initialize failed"))
mockHandler.EXPECT().OnFailure(gomock.Any(), gomock.Any())
mockSetup: func(m *MockTaskHandler) {
m.On("Initialize", mock.Anything).Return(errors.New("Initialize failed"))
m.On("OnFailure", mock.Anything, mock.Anything).Once()
m.On("Publish", mock.Anything, mock.Anything).Twice()
},
expectedState: string(model.StateFailed),
expectedError: errors.New("Initialize failed"),
Expand All @@ -52,23 +44,23 @@ func TestRunTask(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setupMock()
mockHandler := new(MockTaskHandler)
tt.mockSetup(mockHandler) // Set up the mock expectations

r := New(logrus.NewEntry(logrus.New()))
task := &model.Task{}
err := r.RunTask(context.Background(), task, mockHandler)

if string(task.State()) != tt.expectedState {
t.Errorf("Expected task state %s, but got %s", tt.expectedState, task.State())
}
err := r.RunTask(context.Background(), task, mockHandler)

// Assert task state and error expectations
assert.Equal(t, tt.expectedState, string(task.State))
if tt.expectedError != nil {
assert.EqualError(t, tt.expectedError, err.Error())
assert.EqualError(t, err, tt.expectedError.Error())
} else {
assert.NoError(t, err)
}

if tt.expectedState == string(model.StateSucceeded) {
assert.Nil(t, err)
}
mockHandler.AssertExpectations(t)
})
}
}
71 changes: 2 additions & 69 deletions internal/worker/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,63 +316,6 @@ func (t *handler) removeFirmwareAlreadyAtDesiredVersion(fws []*model.Firmware) [
return toInstall
}

func (t *handler) RunActions(ctx context.Context) error {
t.ctx.Logger.WithField("plan.actions", len(t.ctx.ActionStateMachines)).Debug("running the plan")

// each actionSM (state machine) corresponds to a firmware to be installed
for _, actionSM := range t.ctx.ActionStateMachines {
startTS := time.Now()

// fetch action attributes from task
action := t.ctx.Task.ActionsPlanned.ByID(actionSM.ActionID())
if err := action.SetState(model.StateActive); err != nil {
return err
}

// return on context cancellation
if ctx.Err() != nil {
t.registerActionMetrics(startTS, action, string(rctypes.Failed))

return ctx.Err()
}

t.ctx.Logger.WithFields(logrus.Fields{
"statemachineID": actionSM.ActionID(),
"final": action.Final,
}).Debug("action state machine start")

// run the action state machine
err := actionSM.Run(ctx, action, t.ctx)
if err != nil {
t.registerActionMetrics(startTS, action, string(rctypes.Failed))

return errors.Wrap(
err,
"while running action to install firmware on component "+action.Firmware.Component,
)
}

t.ctx.Logger.WithFields(logrus.Fields{
"action": action.ID,
"condition": action.TaskID,
"component": action.Firmware.Component,
"version": action.Firmware.Version,
}).Info("action for component completed successfully")

if !action.Final {
continue
}

t.registerActionMetrics(startTS, action, string(rctypes.Succeeded))
t.ctx.Logger.WithFields(logrus.Fields{
"statemachineID": actionSM.ActionID(),
}).Debug("state machine end")
}

t.ctx.Logger.Debug("plan finished")
return nil
}

func (t *handler) OnSuccess(ctx context.Context, _ *model.Task) {
if t.ctx.DeviceQueryor == nil {
return
Expand All @@ -393,16 +336,6 @@ func (t *handler) OnFailure(ctx context.Context, _ *model.Task) {
}
}

func (t *handler) Publish() {
t.ctx.Publisher.Publish(t.ctx)
}

func (t *handler) registerActionMetrics(startTS time.Time, action *model.Action, state string) {
metrics.ActionRuntimeSummary.With(
prometheus.Labels{
"vendor": action.Firmware.Vendor,
"component": action.Firmware.Component,
"state": state,
},
).Observe(time.Since(startTS).Seconds())
func (t *handler) Publish(ctx context.Context) {
t.Publisher.Publish(ctx, t.Task)
}

0 comments on commit a8f63c8

Please sign in to comment.