Skip to content

Commit

Permalink
rules: remove dependency on promql.Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Nov 24, 2017
1 parent f8fccc7 commit 2d0e374
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 165 deletions.
5 changes: 4 additions & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func main() {
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
queryEngine: promql.EngineOptions{
Metrics: prometheus.DefaultRegisterer,
},
}

a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server")
Expand Down Expand Up @@ -234,7 +237,7 @@ func main() {
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Notifier: notifier,
QueryEngine: queryEngine,
Query: rules.EngineQueryFunc(queryEngine),
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Logger: log.With(logger, "component", "rule manager"),
Expand Down
10 changes: 3 additions & 7 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,8 @@ const resolvedRetention = 15 * time.Minute

// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, externalURL *url.URL) (promql.Vector, error) {
query, err := engine.NewInstantQuery(r.vector.String(), ts)
if err != nil {
return nil, err
}
res, err := query.Exec(ctx).Vector()
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -213,7 +209,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En
"__alert_"+r.Name(),
tmplData,
model.Time(timestamp.FromTime(ts)),
engine,
template.QueryFunc(query),
externalURL,
)
result, err := tmpl.Expand()
Expand Down
37 changes: 33 additions & 4 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,42 @@ const (
ruleTypeRecording = "recording"
)

// QueryFunc processes PromQL queries.
type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)

// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scaler into vector results.
func EngineQueryFunc(engine *promql.Engine) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point: promql.Point(v),
Metric: labels.Labels{},
}}, nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}
}

// A Rule encapsulates a vector expression which is evaluated at a specified
// interval and acted upon (currently either recorded or used for alerting).
type Rule interface {
Name() string
// eval evaluates the rule, including any associated recording or alerting actions.
Eval(context.Context, time.Time, *promql.Engine, *url.URL) (promql.Vector, error)
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
// String returns a human-readable string representation of the rule.
String() string

Expand Down Expand Up @@ -220,7 +250,6 @@ func (g *Group) hash() uint64 {
labels.Label{"name", g.name},
labels.Label{"file", g.file},
)

return l.Hash()
}

Expand Down Expand Up @@ -319,7 +348,7 @@ func (g *Group) Eval(ts time.Time) {

evalTotal.WithLabelValues(rtyp).Inc()

vector, err := rule.Eval(g.opts.Context, ts, g.opts.QueryEngine, g.opts.ExternalURL)
vector, err := rule.Eval(g.opts.Context, ts, g.opts.Query, g.opts.ExternalURL)
if err != nil {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
Expand Down Expand Up @@ -439,7 +468,7 @@ type Appendable interface {
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryEngine *promql.Engine
Query QueryFunc
Context context.Context
Notifier *notifier.Notifier
Appendable Appendable
Expand Down
135 changes: 84 additions & 51 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"fmt"
"math"
"strings"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -55,75 +55,108 @@ func TestAlertingRule(t *testing.T) {
labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil, nil,
)
result := promql.Vector{
{
Metric: labels.FromStrings(
"__name__", "ALERTS",
"alertname", "HTTPRequestRateLow",
"alertstate", "pending",
"group", "canary",
"instance", "0",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS",
"alertname", "HTTPRequestRateLow",
"alertstate", "pending",
"group", "canary",
"instance", "1",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS",
"alertname", "HTTPRequestRateLow",
"alertstate", "firing",
"group", "canary",
"instance", "0",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS",
"alertname", "HTTPRequestRateLow",
"alertstate", "firing",
"group", "canary",
"instance", "1",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
}

baseTime := time.Unix(0, 0)

var tests = []struct {
time time.Duration
result []string
result promql.Vector
}{
{
time: 0,
result: []string{
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`,
},
time: 0,
result: result[:2],
}, {
time: 5 * time.Minute,
result: []string{
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`,
},
time: 5 * time.Minute,
result: result[2:],
}, {
time: 10 * time.Minute,
result: []string{
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
},
time: 10 * time.Minute,
result: result[2:3],
},
{
time: 15 * time.Minute,
result: []string{},
result: nil,
},
{
time: 20 * time.Minute,
result: []string{},
result: nil,
},
{
time: 25 * time.Minute,
result: []string{
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
},
time: 25 * time.Minute,
result: result[:1],
},
{
time: 30 * time.Minute,
result: []string{
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
},
time: 30 * time.Minute,
result: result[2:3],
},
}

for i, test := range tests {
t.Logf("case %d", i)

evalTime := baseTime.Add(test.time)

res, err := rule.Eval(suite.Context(), evalTime, suite.QueryEngine(), nil)
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine()), nil)
testutil.Ok(t, err)

actual := strings.Split(res.String(), "\n")
expected := annotateWithTime(test.result, evalTime)
if actual[0] == "" {
actual = []string{}
}
testutil.Assert(t, len(expected) == len(actual), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(expected), len(actual))

for j, expectedSample := range expected {
found := false
for _, actualSample := range actual {
if actualSample == expectedSample {
found = true
}
}
testutil.Assert(t, found, "%d.%d. Couldn't find expected sample in output: '%v'", i, j, expectedSample)
for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime)
}
testutil.Assert(t, len(test.result) == len(res), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))

sort.Slice(res, func(i, j int) bool {
return labels.Compare(res[i].Metric, res[j].Metric) < 0
})
testutil.Equals(t, test.result, res)

for _, aa := range rule.ActiveAlerts() {
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
Expand All @@ -144,10 +177,10 @@ func TestStaleness(t *testing.T) {
defer storage.Close()
engine := promql.NewEngine(storage, nil)
opts := &ManagerOptions{
QueryEngine: engine,
Appendable: storage,
Context: context.Background(),
Logger: log.NewNopLogger(),
Query: EngineQueryFunc(engine),
Appendable: storage,
Context: context.Background(),
Logger: log.NewNopLogger(),
}

expr, err := promql.ParseExpr("a + 1")
Expand Down Expand Up @@ -271,11 +304,11 @@ func TestApplyConfig(t *testing.T) {
conf, err := config.LoadFile("../config/testdata/conf.good.yml")
testutil.Ok(t, err)
ruleManager := NewManager(&ManagerOptions{
Appendable: nil,
Notifier: nil,
QueryEngine: nil,
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: nil,
Notifier: nil,
Query: nil,
Context: context.Background(),
Logger: log.NewNopLogger(),
})
ruleManager.Run()

Expand Down
25 changes: 2 additions & 23 deletions rules/recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,11 @@ func (rule *RecordingRule) Name() string {
}

// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, _ *url.URL) (promql.Vector, error) {
query, err := engine.NewInstantQuery(rule.vector.String(), ts)
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {
vector, err := query(ctx, rule.vector.String(), ts)
if err != nil {
return nil, err
}

var (
result = query.Exec(ctx)
vector promql.Vector
)
if result.Err != nil {
return nil, err
}

switch v := result.Value.(type) {
case promql.Vector:
vector = v
case promql.Scalar:
vector = promql.Vector{promql.Sample{
Point: promql.Point(v),
Metric: labels.Labels{},
}}
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}

// Override the metric name and labels.
for i := range vector {
sample := &vector[i]
Expand Down
2 changes: 1 addition & 1 deletion rules/recording_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRuleEval(t *testing.T) {

for _, test := range suite {
rule := NewRecordingRule(test.name, test.expr, test.labels)
result, err := rule.Eval(ctx, now, engine, nil)
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine), nil)
testutil.Ok(t, err)
testutil.Equals(t, result, test.result)
}
Expand Down
Loading

0 comments on commit 2d0e374

Please sign in to comment.