From 7effeec6424fcd670b8f7b2fe6ac19c7c78f44df Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 27 Mar 2020 10:28:29 -0400 Subject: [PATCH] Sharding optimizations I: AST mapping (#1846) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Update pkg/logql/evaluator.go Co-Authored-By: Cyril Tovena Co-authored-by: Cyril Tovena --- pkg/logql/ast.go | 42 +- pkg/logql/astmapper.go | 26 + pkg/logql/engine.go | 2 +- pkg/logql/evaluator.go | 91 ++-- pkg/logql/evaluator_test.go | 5 +- pkg/logql/parser_test.go | 38 ++ pkg/logql/sharding.go | 57 ++ pkg/logql/shardmapper.go | 224 ++++++++ pkg/logql/shardmapper_test.go | 944 ++++++++++++++++++++++++++++++++++ 9 files changed, 1379 insertions(+), 50 deletions(-) create mode 100644 pkg/logql/astmapper.go create mode 100644 pkg/logql/sharding.go create mode 100644 pkg/logql/shardmapper.go create mode 100644 pkg/logql/shardmapper_test.go diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index efd6c2f522223..73b5cea25bbab 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -182,15 +182,18 @@ func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string) } const ( - OpTypeSum = "sum" - OpTypeAvg = "avg" - OpTypeMax = "max" - OpTypeMin = "min" - OpTypeCount = "count" - OpTypeStddev = "stddev" - OpTypeStdvar = "stdvar" - OpTypeBottomK = "bottomk" - OpTypeTopK = "topk" + // vector ops + OpTypeSum = "sum" + OpTypeAvg = "avg" + OpTypeMax = "max" + OpTypeMin = "min" + OpTypeCount = "count" + OpTypeStddev = "stddev" + OpTypeStdvar = "stdvar" + OpTypeBottomK = "bottomk" + OpTypeTopK = "topk" + + // range vector ops OpTypeCountOverTime = "count_over_time" OpTypeRate = "rate" @@ -217,6 +220,8 @@ func IsLogicalBinOp(op string) bool { type SampleExpr interface { // Selector is the LogQL selector to apply when retrieving logs. Selector() LogSelectorExpr + // Operations returns the list of operations used in this SampleExpr + Operations() []string Expr } @@ -244,6 +249,11 @@ func (e *rangeAggregationExpr) String() string { return formatOperation(e.operation, nil, e.left.String()) } +// impl SampleExpr +func (e *rangeAggregationExpr) Operations() []string { + return []string{e.operation} +} + type grouping struct { groups []string without bool @@ -320,6 +330,11 @@ func (e *vectorAggregationExpr) String() string { return formatOperation(e.operation, e.grouping, params...) } +// impl SampleExpr +func (e *vectorAggregationExpr) Operations() []string { + return append(e.left.Operations(), e.operation) +} + type binOpExpr struct { SampleExpr RHS SampleExpr @@ -330,6 +345,12 @@ func (e *binOpExpr) String() string { return fmt.Sprintf("%s %s %s", e.SampleExpr.String(), e.op, e.RHS.String()) } +// impl SampleExpr +func (e *binOpExpr) Operations() []string { + ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...) + return append(ops, e.op) +} + func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr { left, ok := lhs.(SampleExpr) if !ok { @@ -386,7 +407,7 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr { // This is because literals need match all labels, which is currently difficult to encode into StepEvaluators. // Therefore, we ensure a binop can be reduced/simplified, maintaining the invariant that it does not have two literal legs. func reduceBinOp(op string, left, right *literalExpr) *literalExpr { - merged := (&defaultEvaluator{}).mergeBinOp( + merged := mergeBinOp( op, &promql.Sample{Point: promql.Point{V: left.value}}, &promql.Sample{Point: promql.Point{V: right.value}}, @@ -423,6 +444,7 @@ func (e *literalExpr) String() string { // to facilitate sum types. We'll be type switching when evaluating them anyways // and they will only be present in binary operation legs. func (e *literalExpr) Selector() LogSelectorExpr { return e } +func (e *literalExpr) Operations() []string { return nil } func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go new file mode 100644 index 0000000000000..6067fbeb93e94 --- /dev/null +++ b/pkg/logql/astmapper.go @@ -0,0 +1,26 @@ +package logql + +import ( + "fmt" + + "github.com/pkg/errors" +) + +// ASTMapper is the exported interface for mapping between multiple AST representations +type ASTMapper interface { + Map(Expr) (Expr, error) +} + +// CloneExpr is a helper function to clone a node. +func CloneExpr(expr Expr) (Expr, error) { + return ParseExpr(expr.String()) +} + +func badASTMapping(expected string, got Expr) error { + return fmt.Errorf("Bad AST mapping: expected one type (%s), but got (%T)", expected, got) +} + +// MapperUnsuportedType is a helper for signaling that an evaluator does not support an Expr type +func MapperUnsupportedType(expr Expr, m ASTMapper) error { + return errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) +} diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 0753173148368..39d2618abee40 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -212,7 +212,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr return ng.evalLiteral(ctx, lit, q) } - stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q) + stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q) if err != nil { return nil, err } diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 1846bad12168f..85659db9bef89 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -69,12 +69,18 @@ func GetRangeType(q Params) QueryRangeType { // Evaluator is an interface for iterating over data at different nodes in the AST type Evaluator interface { - // Evaluator returns a StepEvaluator for a given SampleExpr - Evaluator(context.Context, SampleExpr, Params) (StepEvaluator, error) + // StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible + // StepEvaluator implementations which can be composed. + StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error) // Iterator returns the iter.EntryIterator for a given LogSelectorExpr Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error) } +// EvaluatorUnsupportedType is a helper for signaling that an evaluator does not support an Expr type +func EvaluatorUnsupportedType(expr Expr, ev Evaluator) error { + return errors.Errorf("unexpected expr type (%T) for Evaluator type (%T) ", expr, ev) +} + type defaultEvaluator struct { maxLookBackPeriod time.Duration querier Querier @@ -99,21 +105,43 @@ func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, } -func (ev *defaultEvaluator) Evaluator(ctx context.Context, expr SampleExpr, q Params) (StepEvaluator, error) { +func (ev *defaultEvaluator) StepEvaluator( + ctx context.Context, + nextEv Evaluator, + expr SampleExpr, + q Params, +) (StepEvaluator, error) { switch e := expr.(type) { case *vectorAggregationExpr: - return ev.vectorAggEvaluator(ctx, e, q) + return vectorAggEvaluator(ctx, nextEv, e, q) case *rangeAggregationExpr: - return ev.rangeAggEvaluator(ctx, e, q) + entryIter, err := ev.querier.Select(ctx, SelectParams{ + &logproto.QueryRequest{ + Start: q.Start().Add(-e.left.interval), + End: q.End(), + Limit: 0, + Direction: logproto.FORWARD, + Selector: expr.Selector().String(), + }, + }) + if err != nil { + return nil, err + } + return rangeAggEvaluator(entryIter, e, q) case *binOpExpr: - return ev.binOpEvaluator(ctx, e, q) + return binOpStepEvaluator(ctx, nextEv, e, q) default: - return nil, errors.Errorf("unexpected type (%T): %v", e, e) + return nil, EvaluatorUnsupportedType(e, ev) } } -func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vectorAggregationExpr, q Params) (StepEvaluator, error) { - nextEvaluator, err := ev.Evaluator(ctx, expr.left, q) +func vectorAggEvaluator( + ctx context.Context, + ev Evaluator, + expr *vectorAggregationExpr, + q Params, +) (StepEvaluator, error) { + nextEvaluator, err := ev.StepEvaluator(ctx, ev, expr.left, q) if err != nil { return nil, err } @@ -302,21 +330,11 @@ func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vector }, nextEvaluator.Close) } -func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAggregationExpr, q Params) (StepEvaluator, error) { - entryIter, err := ev.querier.Select(ctx, SelectParams{ - &logproto.QueryRequest{ - Start: q.Start().Add(-expr.left.interval), - End: q.End(), - Limit: 0, - Direction: logproto.FORWARD, - Selector: expr.Selector().String(), - }, - }) - - if err != nil { - return nil, err - } - +func rangeAggEvaluator( + entryIter iter.EntryIterator, + expr *rangeAggregationExpr, + q Params, +) (StepEvaluator, error) { vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(), q.Start().UnixNano(), q.End().UnixNano()) @@ -341,8 +359,9 @@ func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAg // binOpExpr explicly does not handle when both legs are literals as // it makes the type system simpler and these are reduced in mustNewBinOpExpr -func (ev *defaultEvaluator) binOpEvaluator( +func binOpStepEvaluator( ctx context.Context, + ev Evaluator, expr *binOpExpr, q Params, ) (StepEvaluator, error) { @@ -352,26 +371,26 @@ func (ev *defaultEvaluator) binOpEvaluator( // match a literal expr with all labels in the other leg if lOk { - rhs, err := ev.Evaluator(ctx, expr.RHS, q) + rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } - return ev.literalEvaluator(expr.op, leftLit, rhs, false) + return literalStepEvaluator(expr.op, leftLit, rhs, false) } if rOk { - lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q) + lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } - return ev.literalEvaluator(expr.op, rightLit, lhs, true) + return literalStepEvaluator(expr.op, rightLit, lhs, true) } // we have two non literal legs - lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q) + lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } - rhs, err := ev.Evaluator(ctx, expr.RHS, q) + rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } @@ -409,7 +428,7 @@ func (ev *defaultEvaluator) binOpEvaluator( for _, pair := range pairs { // merge - if merged := ev.mergeBinOp(expr.op, pair[0], pair[1]); merged != nil { + if merged := mergeBinOp(expr.op, pair[0], pair[1]); merged != nil { results = append(results, *merged) } } @@ -425,7 +444,7 @@ func (ev *defaultEvaluator) binOpEvaluator( }) } -func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *promql.Sample { +func mergeBinOp(op string, left, right *promql.Sample) *promql.Sample { var merger func(left, right *promql.Sample) *promql.Sample switch op { @@ -554,9 +573,9 @@ func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *p } -// literalEvaluator merges a literal with a StepEvaluator. Since order matters in +// literalStepEvaluator merges a literal with a StepEvaluator. Since order matters in // non commutative operations, inverted should be true when the literalExpr is not the left argument. -func (ev *defaultEvaluator) literalEvaluator( +func literalStepEvaluator( op string, lit *literalExpr, eval StepEvaluator, @@ -578,7 +597,7 @@ func (ev *defaultEvaluator) literalEvaluator( left, right = right, left } - if merged := ev.mergeBinOp( + if merged := mergeBinOp( op, left, right, diff --git a/pkg/logql/evaluator_test.go b/pkg/logql/evaluator_test.go index 23ef3d2f71c94..a25a72e6ae85e 100644 --- a/pkg/logql/evaluator_test.go +++ b/pkg/logql/evaluator_test.go @@ -9,9 +9,8 @@ import ( ) func TestDefaultEvaluator_DivideByZero(t *testing.T) { - ev := &defaultEvaluator{} - require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeDiv, + require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeDiv, &promql.Sample{ Point: promql.Point{T: 1, V: 1}, }, @@ -20,7 +19,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) { }, ).Point.V)) - require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeMod, + require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeMod, &promql.Sample{ Point: promql.Point{T: 1, V: 1}, }, diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index 13a38536fb99c..2687249513c2d 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -20,6 +20,44 @@ func TestParse(t *testing.T) { exp Expr err error }{ + { + // test [12h] before filter expr + in: `count_over_time({foo="bar"}[12h] |= "error")`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: &filterExpr{ + ty: labels.MatchEqual, + match: "error", + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + interval: 12 * time.Hour, + }, + }, + }, + { + // test [12h] after filter expr + in: `count_over_time({foo="bar"} |= "error" [12h])`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: &filterExpr{ + ty: labels.MatchEqual, + match: "error", + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + interval: 12 * time.Hour, + }, + }, + }, { in: `{foo="bar"}`, exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go new file mode 100644 index 0000000000000..5141a160d3a36 --- /dev/null +++ b/pkg/logql/sharding.go @@ -0,0 +1,57 @@ +package logql + +import ( + "fmt" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" +) + +// DownstreamSampleExpr is a SampleExpr which signals downstream computation +type DownstreamSampleExpr struct { + shard *astmapper.ShardAnnotation + SampleExpr +} + +func (d DownstreamSampleExpr) String() string { + return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard) +} + +// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation +type DownstreamLogSelectorExpr struct { + shard *astmapper.ShardAnnotation + LogSelectorExpr +} + +func (d DownstreamLogSelectorExpr) String() string { + return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard) +} + +// ConcatSampleExpr is an expr for concatenating multiple SampleExpr +// Contract: The embedded SampleExprs within a linked list of ConcatSampleExprs must be of the +// same structure. This makes special implementations of SampleExpr.Associative() unnecessary. +type ConcatSampleExpr struct { + SampleExpr + next *ConcatSampleExpr +} + +func (c ConcatSampleExpr) String() string { + if c.next == nil { + return c.SampleExpr.String() + } + + return fmt.Sprintf("%s ++ %s", c.SampleExpr.String(), c.next.String()) +} + +// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr +type ConcatLogSelectorExpr struct { + LogSelectorExpr + next *ConcatLogSelectorExpr +} + +func (c ConcatLogSelectorExpr) String() string { + if c.next == nil { + return c.LogSelectorExpr.String() + } + + return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String()) +} diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go new file mode 100644 index 0000000000000..79303365a96d0 --- /dev/null +++ b/pkg/logql/shardmapper.go @@ -0,0 +1,224 @@ +package logql + +import ( + "fmt" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" +) + +func NewShardMapper(shards int) (ShardMapper, error) { + if shards < 2 { + return ShardMapper{}, fmt.Errorf("Cannot create ShardMapper with <2 shards. Received %d", shards) + } + return ShardMapper{shards}, nil +} + +type ShardMapper struct { + shards int +} + +func (m ShardMapper) Map(expr Expr) (Expr, error) { + switch e := expr.(type) { + case *literalExpr: + return e, nil + case *matchersExpr, *filterExpr: + return m.mapLogSelectorExpr(e.(LogSelectorExpr)), nil + case *vectorAggregationExpr: + return m.mapVectorAggregationExpr(e) + case *rangeAggregationExpr: + return m.mapRangeAggregationExpr(e), nil + case *binOpExpr: + lhsMapped, err := m.Map(e.SampleExpr) + if err != nil { + return nil, err + } + rhsMapped, err := m.Map(e.RHS) + if err != nil { + return nil, err + } + lhsSampleExpr, ok := lhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", lhsMapped) + } + rhsSampleExpr, ok := rhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", rhsMapped) + } + e.SampleExpr = lhsSampleExpr + e.RHS = rhsSampleExpr + return e, nil + default: + return nil, MapperUnsupportedType(expr, m) + } +} + +func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { + var head *ConcatLogSelectorExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, + LogSelectorExpr: expr, + }, + next: head, + } + } + + return head +} + +func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr { + var head *ConcatSampleExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, + SampleExpr: expr, + }, + next: head, + } + } + + return head +} + +// technically, std{dev,var} are also parallelizable if there is no cross-shard merging +// in descendent nodes in the AST. This optimization is currently avoided for simplicity. +func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) { + + // if this AST contains unshardable operations, don't shard this at this level, + // but attempt to shard a child node. + if shardable := isShardable(expr.Operations()); !shardable { + subMapped, err := m.Map(expr.left) + if err != nil { + return nil, err + } + sampleExpr, ok := subMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", subMapped) + } + + return &vectorAggregationExpr{ + left: sampleExpr, + grouping: expr.grouping, + params: expr.params, + operation: expr.operation, + }, nil + + } + + switch expr.operation { + case OpTypeSum: + // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) + return &vectorAggregationExpr{ + left: m.mapSampleExpr(expr), + grouping: expr.grouping, + params: expr.params, + operation: expr.operation, + }, nil + + case OpTypeAvg: + // avg(x) -> sum(x)/count(x) + lhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeSum, + }) + if err != nil { + return nil, err + } + rhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeCount, + }) + if err != nil { + return nil, err + } + + return &binOpExpr{ + SampleExpr: lhs, + RHS: rhs, + op: OpTypeDiv, + }, nil + + case OpTypeCount: + // count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) + sharded := m.mapSampleExpr(expr) + return &vectorAggregationExpr{ + left: sharded, + grouping: expr.grouping, + operation: OpTypeSum, + }, nil + default: + // this should not be reachable. If an operation is shardable it should + // have an optimization listed. + level.Warn(util.Logger).Log( + "msg", "unexpected operation which appears shardable, ignoring", + "operation", expr.operation, + ) + return expr, nil + } +} + +func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr { + switch expr.operation { + case OpTypeCountOverTime, OpTypeRate: + // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... + // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... + return m.mapSampleExpr(expr) + default: + return expr + } +} + +// isShardable returns false if any of the listed operation types are not shardable and true otherwise +func isShardable(ops []string) bool { + for _, op := range ops { + if shardable := shardableOps[op]; !shardable { + return false + } + } + return true +} + +// shardableOps lists the operations which may be sharded. +// topk, botk, max, & min all must be concatenated and then evaluated in order to avoid +// potential data loss due to series distribution across shards. +// For example, grouping by `cluster` for a `max` operation may yield +// 2 results on the first shard and 10 results on the second. If we prematurely +// calculated `max`s on each shard, the shard/label combination with `2` may be +// discarded and some other combination with `11` may be reported falsely as the max. +// +// Explanation: this is my (owen-d) best understanding. +// +// For an operation to be shardable, first the sample-operation itself must be associative like (+, *) but not (%, /, ^). +// Secondly, if the operation is part of a vector aggregation expression or utilizes logical/set binary ops, +// the vector operation must be distributive over the sample-operation. +// This ensures that the vector merging operation can be applied repeatedly to data in different shards. +// references: +// https://en.wikipedia.org/wiki/Associative_property +// https://en.wikipedia.org/wiki/Distributive_property +var shardableOps = map[string]bool{ + // vector ops + OpTypeSum: true, + // avg is only marked as shardable because we remap it into sum/count. + OpTypeAvg: true, + OpTypeCount: true, + + // range vector ops + OpTypeCountOverTime: true, + OpTypeRate: true, + + // binops - arith + OpTypeAdd: true, + OpTypeMul: true, +} diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go new file mode 100644 index 0000000000000..f02107b7adfe0 --- /dev/null +++ b/pkg/logql/shardmapper_test.go @@ -0,0 +1,944 @@ +package logql + +import ( + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" +) + +func TestStringer(t *testing.T) { + for _, tc := range []struct { + in Expr + out string + }{ + { + in: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: nil, + }, + }, + out: `downstream<{foo="bar"}, shard=0_of_2> ++ downstream<{foo="bar"}, shard=1_of_2>`, + }, + } { + t.Run(tc.out, func(t *testing.T) { + require.Equal(t, tc.out, tc.in.String()) + }) + } +} + +func TestMapSampleExpr(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + + for _, tc := range []struct { + in SampleExpr + out SampleExpr + }{ + { + in: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + out: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + } { + t.Run(tc.in.String(), func(t *testing.T) { + require.Equal(t, tc.out, m.mapSampleExpr(tc.in)) + }) + + } +} + +func TestMappingStrings(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + for _, tc := range []struct { + in string + out string + }{ + { + in: `sum(rate({foo="bar"}[1m]))`, + out: `sum(downstream ++ downstream)`, + }, + { + in: `max(count(rate({foo="bar"}[5m]))) / 2`, + out: `max(sum(downstream ++ downstream)) / 2.000000`, + }, + { + in: `topk(3, rate({foo="bar"}[5m]))`, + out: `topk(3,downstream ++ downstream)`, + }, + { + in: `sum(max(rate({foo="bar"}[5m])))`, + out: `sum(max(downstream ++ downstream))`, + }, + { + in: `{foo="bar"} |= "id=123"`, + out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`, + }, + { + in: `sum by (cluster) (rate({foo="bar"} |= "id=123" [5m]))`, + out: `sum by(cluster)(downstream ++ downstream)`, + }, + } { + t.Run(tc.in, func(t *testing.T) { + ast, err := ParseExpr(tc.in) + require.Nil(t, err) + + mapped, err := m.Map(ast) + require.Nil(t, err) + + require.Equal(t, tc.out, mapped.String()) + + }) + } +} + +func TestMapping(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + + for _, tc := range []struct { + in string + expr Expr + err error + }{ + { + in: `{foo="bar"}`, + expr: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `{foo="bar"} |= "error"`, + expr: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &filterExpr{ + match: "error", + ty: labels.MatchEqual, + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &filterExpr{ + match: "error", + ty: labels.MatchEqual, + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `rate({foo="bar"}[5m])`, + expr: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `count_over_time({foo="bar"}[5m])`, + expr: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeCountOverTime, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeCountOverTime, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `sum(rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `topk(3, rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + params: 3, + operation: OpTypeTopK, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `max without (env) (rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{ + without: true, + groups: []string{"env"}, + }, + operation: OpTypeMax, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `count(rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `avg(rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeDiv, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + RHS: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + { + in: `1 + sum by (cluster) (rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeAdd, + SampleExpr: &literalExpr{1}, + RHS: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + // sum(max) should not shard the maxes + { + in: `sum(max(rate({foo="bar"}[5m])))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeMax, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + // max(count) should shard the count, but not the max + { + in: `max(count(rate({foo="bar"}[5m])))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeMax, + left: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + { + in: `max(sum by (cluster) (rate({foo="bar"}[5m]))) / count(rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeDiv, + SampleExpr: &vectorAggregationExpr{ + operation: OpTypeMax, + grouping: &grouping{}, + left: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + RHS: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + } { + t.Run(tc.in, func(t *testing.T) { + ast, err := ParseExpr(tc.in) + require.Equal(t, tc.err, err) + + mapped, err := m.Map(ast) + + require.Equal(t, tc.err, err) + require.Equal(t, tc.expr.String(), mapped.String()) + require.Equal(t, tc.expr, mapped) + }) + } +}