Skip to content

Commit

Permalink
instrument methods to collect metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
joelrebel committed Jun 28, 2023
1 parent 348500d commit 0e65cfb
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 5 deletions.
27 changes: 26 additions & 1 deletion internal/outofband/action_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (

sw "github.com/filanov/stateswitch"
"github.com/hashicorp/go-multierror"
"github.com/metal-toolbox/flasher/internal/metrics"
"github.com/metal-toolbox/flasher/internal/model"
sm "github.com/metal-toolbox/flasher/internal/statemachine"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -227,10 +229,22 @@ func (h *actionHandler) downloadFirmware(a sw.StateSwitch, c sw.TransitionArgs)
file := filepath.Join(dir, action.Firmware.FileName)

// download firmware file
if err := download(tctx.Ctx, action.Firmware.URL, file); err != nil {
err = download(tctx.Ctx, action.Firmware.URL, file)
if err != nil {
return err
}

// collect download metrics
fileInfo, err := os.Stat(file)
if err == nil {
metrics.DownloadBytes.With(
prometheus.Labels{
"component": action.Firmware.Component,
"vendor": action.Firmware.Vendor,
},
).Add(float64(fileInfo.Size()))
}

// validate checksum
//
// This assumes the checksum is of type MD5
Expand Down Expand Up @@ -286,6 +300,17 @@ func (h *actionHandler) initiateInstallFirmware(a sw.StateSwitch, c sw.Transitio
return err
}

// collect upload metrics
fileInfo, err := os.Stat(action.FirmwareTempFile)
if err == nil {
metrics.UploadBytes.With(
prometheus.Labels{
"component": action.Firmware.Component,
"vendor": action.Firmware.Vendor,
},
).Add(float64(fileInfo.Size()))
}

// returned bmcTaskID corresponds to a redfish task ID on BMCs that support redfish
// for the rest we track the bmcTaskID as the action.ID
if bmcTaskID == "" {
Expand Down
30 changes: 30 additions & 0 deletions internal/statemachine/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"fmt"
"strconv"
"time"

sw "github.com/filanov/stateswitch"
"github.com/metal-toolbox/flasher/internal/metrics"
"github.com/metal-toolbox/flasher/internal/model"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -146,9 +149,31 @@ func (a *ActionStateMachine) TransitionSuccess(action *model.Action, hctx *Handl
return a.sm.Run(TransitionTypeActionSuccess, action, hctx)
}

func (a *ActionStateMachine) registerTransitionMetrics(startTS time.Time, action *model.Action, transitionType, state string) {
metrics.ActionHandlerCounter.With(
prometheus.Labels{
"vendor": action.Firmware.Vendor,
"component": action.Firmware.Component,
"transition": transitionType,
"state": state,
},
).Inc()

metrics.ActionHandlerRunTimeSummary.With(
prometheus.Labels{
"vendor": action.Firmware.Vendor,
"component": action.Firmware.Component,
"transition": transitionType,
"state": state,
},
).Observe(time.Since(startTS).Seconds())
}

// Run executes the transitions in the action statemachine while handling errors returned from any failed actions.
func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx *HandlerContext) error {
for _, transitionType := range a.transitions {
startTS := time.Now()

// publish task action running
tctx.Task.Status = fmt.Sprintf(
"component: %s, running action: %s ",
Expand All @@ -160,11 +185,15 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx

// return on context cancellation
if ctx.Err() != nil {
a.registerTransitionMetrics(startTS, action, string(transitionType), "failed")

return ctx.Err()
}

err := a.sm.Run(transitionType, action, tctx)
if err != nil {
a.registerTransitionMetrics(startTS, action, string(transitionType), "failed")

// When the condition returns false, run the next transition
if errors.Is(err, sw.NoConditionPassedToRunTransaction) {
return newErrAction(
Expand Down Expand Up @@ -194,6 +223,7 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
}

a.transitionsCompleted = append(a.transitionsCompleted, transitionType)
a.registerTransitionMetrics(startTS, action, string(transitionType), "succeeded")

// publish task action completion
if action.Final {
Expand Down
19 changes: 19 additions & 0 deletions internal/store/serverservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/coreos/go-oidc"
"github.com/google/uuid"
"github.com/hashicorp/go-retryablehttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"

"github.com/metal-toolbox/flasher/internal/app"
"github.com/metal-toolbox/flasher/internal/metrics"
"github.com/metal-toolbox/flasher/internal/model"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -147,6 +149,15 @@ func newClientWithOAuth(ctx context.Context, cfg *app.ServerserviceOptions, logg
)
}

func (s *Serverservice) registerMetric(queryKind string) {
metrics.StoreQueryErrorCount.With(
prometheus.Labels{
"storeKind": "serverservice",
"queryKind": queryKind,
},
).Inc()
}

// AssetByID returns an Asset object with various attributes populated.
func (s *Serverservice) AssetByID(ctx context.Context, id string) (*model.Asset, error) {
deviceUUID, err := uuid.Parse(id)
Expand All @@ -159,6 +170,8 @@ func (s *Serverservice) AssetByID(ctx context.Context, id string) (*model.Asset,
// query credentials
credential, _, err := s.client.GetCredential(ctx, deviceUUID, sservice.ServerCredentialTypeBMC)
if err != nil {
s.registerMetric("GetCredential")

return nil, errors.Wrap(ErrServerserviceQuery, err.Error())
}

Expand All @@ -168,6 +181,8 @@ func (s *Serverservice) AssetByID(ctx context.Context, id string) (*model.Asset,
// query the server object
srv, _, err := s.client.Get(ctx, deviceUUID)
if err != nil {
s.registerMetric("GetServer")

return nil, errors.Wrap(ErrServerserviceQuery, err.Error())
}

Expand Down Expand Up @@ -195,6 +210,8 @@ func (s *Serverservice) AssetByID(ctx context.Context, id string) (*model.Asset,
// include all required information
components, _, err := s.client.GetComponents(ctx, deviceUUID, nil)
if err != nil {
s.registerMetric("GetComponents")

s.logger.WithError(err).Warn(errors.Wrap(ErrServerserviceQuery, "component information query failed"))

return asset, nil
Expand All @@ -210,6 +227,8 @@ func (s *Serverservice) AssetByID(ctx context.Context, id string) (*model.Asset,
func (s *Serverservice) FirmwareSetByID(ctx context.Context, id uuid.UUID) ([]*model.Firmware, error) {
firmwareset, _, err := s.client.GetServerComponentFirmwareSet(ctx, id)
if err != nil {
s.registerMetric("GetFirmwareSet")

return nil, errors.Wrap(ErrServerserviceQuery, err.Error())
}

Expand Down
39 changes: 35 additions & 4 deletions internal/worker/task_handler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package worker

import (
"time"

"github.com/bmc-toolbox/common"
sw "github.com/filanov/stateswitch"
cptypes "github.com/metal-toolbox/conditionorc/pkg/types"
"github.com/metal-toolbox/flasher/internal/metrics"
"github.com/metal-toolbox/flasher/internal/model"
"github.com/metal-toolbox/flasher/internal/outofband"
sm "github.com/metal-toolbox/flasher/internal/statemachine"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -103,6 +108,24 @@ func (h *taskHandler) ValidatePlan(t sw.StateSwitch, args sw.TransitionArgs) (bo
return true, nil
}

func (h *taskHandler) registerActionMetrics(startTS time.Time, action *model.Action, state string) {
metrics.ActionCounter.With(
prometheus.Labels{
"vendor": action.Firmware.Vendor,
"component": action.Firmware.Component,
"state": state,
},
).Inc()

metrics.ActionRuntimeSummary.With(
prometheus.Labels{
"vendor": action.Firmware.Vendor,
"component": action.Firmware.Component,
"state": state,
},
).Observe(time.Since(startTS).Seconds())
}

func (h *taskHandler) Run(t sw.StateSwitch, args sw.TransitionArgs) error {
tctx, ok := args.(*sm.HandlerContext)
if !ok {
Expand All @@ -116,25 +139,33 @@ func (h *taskHandler) Run(t sw.StateSwitch, args sw.TransitionArgs) error {

// each actionSM (state machine) corresponds to a firmware to be installed
for _, actionSM := range tctx.ActionStateMachines {
// return on context cancellation
if tctx.Ctx.Err() != nil {
return tctx.Ctx.Err()
}
startTS := time.Now()

// fetch action attributes from task
action := task.ActionsPlanned.ByID(actionSM.ActionID())
if err := action.SetState(model.StateActive); err != nil {
return err
}

// return on context cancellation
if tctx.Ctx.Err() != nil {
h.registerActionMetrics(startTS, action, string(cptypes.Failed))

return tctx.Ctx.Err()
}

// run the action state machine
err := actionSM.Run(tctx.Ctx, action, tctx)
if err != nil {
h.registerActionMetrics(startTS, action, string(cptypes.Failed))

return errors.Wrap(
err,
"while running action to install firmware on component "+action.Firmware.Component,
)
}

h.registerActionMetrics(startTS, action, string(cptypes.Succeeded))
}

return nil
Expand Down
39 changes: 39 additions & 0 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/metal-toolbox/flasher/internal/metrics"
"github.com/metal-toolbox/flasher/internal/model"
"github.com/metal-toolbox/flasher/internal/store"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.hollow.sh/toolbox/events"
"go.hollow.sh/toolbox/events/pkg/kv"
Expand Down Expand Up @@ -217,13 +220,23 @@ func newTask(conditionID uuid.UUID, params *model.TaskParameters) (model.Task, e
return task, errors.Wrap(errTaskFirmwareParam, "no firmware list or firmwareSetID specified")
}

func (o *Worker) registerEventCounter(valid bool, response string) {
metrics.EventsCounter.With(
prometheus.Labels{
"valid": strconv.FormatBool(valid),
"response": response,
}).Inc()
}

func (o *Worker) processSingleEvent(ctx context.Context, e events.Message) {
condition, err := conditionFromEvent(e)
if err != nil {
o.logger.WithError(err).WithField(
"subject", e.Subject()).Warn("unable to retrieve condition from message")

o.registerEventCounter(false, "ack")
o.eventAckComplete(e)

return
}

Expand Down Expand Up @@ -255,7 +268,9 @@ func (o *Worker) processSingleEvent(ctx context.Context, e events.Message) {
if err != nil {
o.logger.WithError(err).Warn("error initializing task from condition")

o.registerEventCounter(false, "ack")
o.eventAckComplete(e)

return
}

Expand All @@ -268,6 +283,7 @@ func (o *Worker) processSingleEvent(ctx context.Context, e events.Message) {
"err": err.Error(),
}).Warn("asset lookup error")

o.registerEventCounter(true, "nack")
o.eventNak(e) // have the message bus re-deliver the message

return
Expand All @@ -276,6 +292,7 @@ func (o *Worker) processSingleEvent(ctx context.Context, e events.Message) {
taskCtx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

defer o.registerEventCounter(true, "ack")
defer o.eventAckComplete(e)

o.runTaskWithMonitor(taskCtx, task, asset, e)
Expand Down Expand Up @@ -336,6 +353,7 @@ func (o *Worker) runTaskWithMonitor(ctx context.Context, task *model.Task, asset
}

o.runTaskStatemachine(handler, handlerCtx, doneCh)

<-doneCh
}

Expand All @@ -350,6 +368,21 @@ func (o *Worker) getStatusPublisher() sm.Publisher {
return &statusEmitter{o.stream, o.logger}
}

func (o *Worker) registerConditionMetrics(startTS time.Time, state string) {
metrics.ConditionCounter.With(
prometheus.Labels{
"condition": string(cptypes.FirmwareInstall),
"state": state,
}).Inc()

metrics.ConditionRunTimeSummary.With(
prometheus.Labels{
"condition": string(cptypes.FirmwareInstall),
"state": state,
},
).Observe(time.Since(startTS).Seconds())
}

func (o *Worker) runTaskStatemachine(handler *taskHandler, handlerCtx *sm.HandlerContext, doneCh chan bool) {
defer close(doneCh)

Expand All @@ -365,6 +398,8 @@ func (o *Worker) runTaskStatemachine(handler *taskHandler, handlerCtx *sm.Handle
if err != nil {
o.logger.Error(err)

o.registerConditionMetrics(startTS, string(cptypes.Failed))

return
}

Expand All @@ -378,9 +413,13 @@ func (o *Worker) runTaskStatemachine(handler *taskHandler, handlerCtx *sm.Handle
},
).Warn("task for device failed")

o.registerConditionMetrics(startTS, string(cptypes.Failed))

return
}

o.registerConditionMetrics(startTS, string(cptypes.Succeeded))

o.logger.WithFields(logrus.Fields{
"deviceID": handlerCtx.Task.Parameters.AssetID.String(),
"conditionID": handlerCtx.Task.ID,
Expand Down

0 comments on commit 0e65cfb

Please sign in to comment.