From ee4ab1b3946f04922a0ea8941b07581c9ccf5935 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 12 May 2022 08:25:05 +0200 Subject: [PATCH] Fix certain binary expression queries by optimizing through push down (#6132) * Add test cases that fail Instant queries of the following type fail and return an `unimplemented` error: ``` sum(count_over_time({foo="bar"} | logfmt | duration > 2s [3s])) / sum(count_over_time({foo="bar"} [3s])) ``` Signed-off-by: Christian Haudum * Fix certain binary expressions for instant queries If either the left hand side or the right hand side of a binary expression is a noop, we need to return the original expression so the whole expression is a noop as well, and thus not executed using the downstream engine. Otherwise, a binary expression that has a noop on either side, results in an `unimplemented` error when executed using the downstream engine. Signed-off-by: Christian Haudum * Optimize instant vector aggregations with log extraction stage This change optimizes queries that use a vector aggregation without grouping around a range aggregation with a label extraction stage such as `json` or `logfmt`. Since the vector aggregation can be pushed down to the downstream query, the downstream query does not create a massive amount of streams, even though it contains a generic label extraction stage. Signed-off-by: Christian Haudum * fixup! Add test cases that fail Signed-off-by: Christian Haudum * fixup! fixup! Add test cases that fail Signed-off-by: Christian Haudum --- pkg/logql/downstream_test.go | 8 ++ pkg/logql/rangemapper.go | 14 +++- pkg/logql/rangemapper_test.go | 154 ++++++++++++++++++++++++++-------- 3 files changed, 139 insertions(+), 37 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index b584d721b22c9..a845337e61c64 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -132,14 +132,19 @@ func TestRangeMappingEquivalence(t *testing.T) { // sum {`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(bytes_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(count_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(rate({a=~".+"}[2s]))`, time.Second}, {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -215,9 +220,11 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(rate({a=~".+"}[2s]))`, time.Second}, {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -237,6 +244,7 @@ func TestRangeMappingEquivalence(t *testing.T) { // Binary operations {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, {`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second}, + {`sum(count_over_time({a=~".+"} | logfmt | b > 2 [3s])) / sum(count_over_time({a=~".+"} [3s]))`, time.Second}, // Multi vector aggregator layer queries {`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second}, diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 86cf2d00274ed..88de6b9ce2174 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -128,10 +128,22 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect if err != nil { return nil, err } + // if left hand side is a noop, we need to return the original expression + // so the whole expression is a noop and thus not executed using the + // downstream engine + if e.SampleExpr.String() == lhsMapped.String() { + return e, nil + } rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder) if err != nil { return nil, err } + // if right hand side is a noop, we need to return the original expression + // so the whole expression is a noop and thus not executed using the + // downstream engine + if e.RHS.String() == rhsMapped.String() { + return e, nil + } e.SampleExpr = lhsMapped e.RHS = rhsMapped return e, nil @@ -331,7 +343,7 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, // We cannot execute downstream queries that would potentially produce a huge amount of series // and therefore would very likely fail. - if expr.Grouping == nil && hasLabelExtractionStage(expr) { + if expr.Grouping == nil && vectorAggrPushdown == nil && hasLabelExtractionStage(expr) { return expr } switch expr.Operation { diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index e3ed1ac726252..0f178d2393edc 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -932,6 +932,26 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, + { + `sum (count_over_time({app="foo"} | logfmt | duration > 10s [3m])) / sum (count_over_time({app="foo"} [3m]))`, + `( + sum ( + sum without ( + downstream 10s [1m] offset 2m0s)), shard=> + ++ downstream 10s [1m] offset 1m0s)), shard=> + ++ downstream 10s [1m])), shard=> + ) + ) + / + sum ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + ) + )`, + }, // Multi vector aggregator layer queries { @@ -959,6 +979,98 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, + + // outer vector aggregation is pushed down + { + `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, } { tc := tc t.Run(tc.expr, func(t *testing.T) { @@ -997,46 +1109,16 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { // should be noop if inner range aggregation includes a stage for label extraction such as `| json` or `| logfmt` // because otherwise the downstream queries would result in too many series - { - `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, { `max_over_time({app="foo"} | json | unwrap bar [3m])`, `max_over_time({app="foo"} | json | unwrap bar [3m])`, }, + + // if one side of a binary expression is a noop, the full query is a noop as well + { + `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, + `(sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m]))`, + }, } { tc := tc t.Run(tc.expr, func(t *testing.T) {