Skip to content

Commit

Permalink
feat(tasks): use influxdb errors for executor metrics (#14926)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlirieGray authored Sep 10, 2019
1 parent cc84a43 commit 645df57
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
12 changes: 6 additions & 6 deletions task/backend/executor/executor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type ExecutorMetrics struct {
activeRuns prometheus.Collector
queueDelta *prometheus.SummaryVec
runDuration *prometheus.SummaryVec
errorsCounter prometheus.Counter
errorsCounter *prometheus.CounterVec
manualRunsCounter *prometheus.CounterVec
resumeRunsCounter *prometheus.CounterVec
}
Expand Down Expand Up @@ -55,12 +55,12 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"taskID"}),

errorsCounter: prometheus.NewCounter(prometheus.CounterOpts{
errorsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "errors_counter",
Help: "The number of errors thrown by the executor.",
}),
Help: "The number of errors thrown by the executor with the type of error (ex. Flux compile, query, etc).",
}, []string{"errorType"}),

manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -131,8 +131,8 @@ func (em *ExecutorMetrics) FinishRun(taskID influxdb.ID, status backend.RunStatu
}

// LogError increments the count of errors.
func (em *ExecutorMetrics) LogError() {
em.errorsCounter.Inc()
func (em *ExecutorMetrics) LogError(err *influxdb.Error) {
em.errorsCounter.WithLabelValues(err.Code)
}

// Describe returns all descriptions associated with the run collector.
Expand Down
14 changes: 7 additions & 7 deletions task/backend/executor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (w *worker) start(p *Promise) {
w.te.metrics.StartRun(p.task.ID, time.Since(p.createdAt))
}

func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) {
func (w *worker) finish(p *Promise, rs backend.RunStatus, err *influxdb.Error) {
// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()
Expand All @@ -332,9 +332,9 @@ func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) {
w.te.metrics.FinishRun(p.task.ID, rs, rd)

// log error
if err != nil {
if err.Err != nil {
w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String()))
w.te.metrics.LogError()
w.te.metrics.LogError(err)
p.err = err
} else {
w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String()))
Expand All @@ -350,13 +350,13 @@ func (w *worker) executeQuery(p *Promise) {

pkg, err := flux.Parse(p.task.Flux)
if err != nil {
w.finish(p, backend.RunFail, err)
w.finish(p, backend.RunFail, influxdb.ErrFluxParseError(err))
return
}

sf, err := p.run.ScheduledForTime()
if err != nil {
w.finish(p, backend.RunFail, err)
w.finish(p, backend.RunFail, influxdb.ErrTaskTimeParse(err))
return
}

Expand All @@ -372,7 +372,7 @@ func (w *worker) executeQuery(p *Promise) {
it, err := w.te.qs.Query(ctx, req)
if err != nil {
// Assume the error should not be part of the runResult.
w.finish(p, backend.RunFail, err)
w.finish(p, backend.RunFail, influxdb.ErrQueryError(err))
return
}

Expand Down Expand Up @@ -400,7 +400,7 @@ func (w *worker) executeQuery(p *Promise) {
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b))
}

w.finish(p, backend.RunSuccess, runErr)
w.finish(p, backend.RunSuccess, influxdb.ErrResultIteratorError(runErr))
}

// RunsActive returns the current number of workers, which is equivalent to
Expand Down
3 changes: 2 additions & 1 deletion task/backend/executor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func testLimitFunc(t *testing.T) {
t.Fatal(err)
}
forcedErr := errors.New("forced")
forcedQueryErr := influxdb.ErrQueryError(forcedErr)
tes.svc.FailNextQuery(forcedErr)

count := 0
Expand All @@ -285,7 +286,7 @@ func testLimitFunc(t *testing.T) {

<-promise.Done()

if got := promise.Error(); got != forcedErr {
if got := promise.Error(); got.Error() != forcedQueryErr.Error() {
t.Fatal("failed to get failure from forced error")
}

Expand Down
30 changes: 30 additions & 0 deletions task_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,36 @@ var (
}
)

// ErrFluxParseError is returned when an error is thrown by Flux.Parse in the task executor
func ErrFluxParseError(err error) *Error {
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf("could not parse Flux script; Err: %v", err),
Op: "kv/taskExecutor",
Err: err,
}
}

// ErrQueryError is returned when an error is thrown by Query service in the task executor
func ErrQueryError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("unexpected error from queryd; Err: %v", err),
Op: "kv/taskExecutor",
Err: err,
}
}

// ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor
func ErrResultIteratorError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err),
Op: "kv/taskExecutor",
Err: err,
}
}

func ErrInternalTaskServiceError(err error) *Error {
return &Error{
Code: EInternal,
Expand Down

0 comments on commit 645df57

Please sign in to comment.