Skip to content

Commit

Permalink
Fix certain binary expression queries by optimizing through push down (
Browse files Browse the repository at this point in the history
…#6132)

* Add test cases that fail

Instant queries of the following type fail and return an `unimplemented`
error:

```
sum(count_over_time({foo="bar"} | logfmt | duration > 2s [3s]))
/
sum(count_over_time({foo="bar"} [3s]))
```

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix certain binary expressions for instant queries

If either the left hand side or the right hand side of a binary
expression is a noop, we need to return the original expression so the
whole expression is a noop as well, and thus not executed using the downstream
engine.
Otherwise, a binary expression that has a noop on either side, results
in an `unimplemented` error when executed using the downstream engine.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Optimize instant vector aggregations with log extraction stage

This change optimizes queries that use a vector aggregation without
grouping around a range aggregation with a label extraction stage such
as `json` or `logfmt`.

Since the vector aggregation can be pushed down to the downstream query,
the downstream query does not create a massive amount of streams, even
though it contains a generic label extraction stage.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Add test cases that fail

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! fixup! Add test cases that fail

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored May 12, 2022
1 parent 31ca108 commit ee4ab1b
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 37 deletions.
8 changes: 8 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,19 @@ func TestRangeMappingEquivalence(t *testing.T) {

// sum
{`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second},
{`sum(count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(count_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second},
{`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

Expand Down Expand Up @@ -215,9 +220,11 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

Expand All @@ -237,6 +244,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
// Binary operations
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
{`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second},
{`sum(count_over_time({a=~".+"} | logfmt | b > 2 [3s])) / sum(count_over_time({a=~".+"} [3s]))`, time.Second},

// Multi vector aggregator layer queries
{`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second},
Expand Down
14 changes: 13 additions & 1 deletion pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,22 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect
if err != nil {
return nil, err
}
// if left hand side is a noop, we need to return the original expression
// so the whole expression is a noop and thus not executed using the
// downstream engine
if e.SampleExpr.String() == lhsMapped.String() {
return e, nil
}
rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder)
if err != nil {
return nil, err
}
// if right hand side is a noop, we need to return the original expression
// so the whole expression is a noop and thus not executed using the
// downstream engine
if e.RHS.String() == rhsMapped.String() {
return e, nil
}
e.SampleExpr = lhsMapped
e.RHS = rhsMapped
return e, nil
Expand Down Expand Up @@ -331,7 +343,7 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

// We cannot execute downstream queries that would potentially produce a huge amount of series
// and therefore would very likely fail.
if expr.Grouping == nil && hasLabelExtractionStage(expr) {
if expr.Grouping == nil && vectorAggrPushdown == nil && hasLabelExtractionStage(expr) {
return expr
}
switch expr.Operation {
Expand Down
154 changes: 118 additions & 36 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,26 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},
{
`sum (count_over_time({app="foo"} | logfmt | duration > 10s [3m])) / sum (count_over_time({app="foo"} [3m]))`,
`(
sum (
sum without (
downstream<sum (count_over_time({app="foo"} | logfmt | duration > 10s [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum (count_over_time({app="foo"} | logfmt | duration > 10s [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum (count_over_time({app="foo"} | logfmt | duration > 10s [1m])), shard=<nil>>
)
)
/
sum (
sum without (
downstream<sum (count_over_time({app="foo"} [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum (count_over_time({app="foo"} [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum (count_over_time({app="foo"} [1m])), shard=<nil>>
)
)
)`,
},

// Multi vector aggregator layer queries
{
Expand Down Expand Up @@ -959,6 +979,98 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},

// outer vector aggregation is pushed down
{
`sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(
min without(
downstream<sum(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum(min_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(
max without(
downstream<sum(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum(max_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(
sum without(
downstream<sum(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum(sum_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(
min without(
downstream<min(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<min(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<min(min_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(
max without(
downstream<min(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<min(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<min(max_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(
sum without(
downstream<min(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<min(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<min(sum_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(
min without(
downstream<max(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<max(min_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<max(min_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(
max without(
downstream<max(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<max(max_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<max(max_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
{
`max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(
sum without(
downstream<max(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 2m0s)), shard=<nil>>
++ downstream<max(sum_over_time({app="foo"} | logfmt | unwrap bar [1m] offset 1m0s)), shard=<nil>>
++ downstream<max(sum_over_time({app="foo"} | logfmt | unwrap bar [1m])), shard=<nil>>
)
)`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
Expand Down Expand Up @@ -997,46 +1109,16 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {

// should be noop if inner range aggregation includes a stage for label extraction such as `| json` or `| logfmt`
// because otherwise the downstream queries would result in too many series
{
`sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
`max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`,
},
{
`max_over_time({app="foo"} | json | unwrap bar [3m])`,
`max_over_time({app="foo"} | json | unwrap bar [3m])`,
},

// if one side of a binary expression is a noop, the full query is a noop as well
{
`sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`,
`(sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m]))`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
Expand Down

0 comments on commit ee4ab1b

Please sign in to comment.