Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance of first_over_time and last_over_time queries by sharding them #11605

Merged
merged 39 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
de536b6
Step 1
jeschkies Jan 11, 2024
235e386
Step 2
jeschkies Jan 11, 2024
689c161
Step 3
jeschkies Jan 11, 2024
09f6d66
Step 4
jeschkies Jan 11, 2024
f7463ae
Step 5
jeschkies Jan 11, 2024
750cec7
Step 6
jeschkies Jan 11, 2024
9281f44
Step 7
jeschkies Jan 11, 2024
8f589fe
Step 8
jeschkies Jan 11, 2024
810b07c
downstreaming of last over time expr
cstyan Jan 13, 2024
5e106bf
add expr type for merging of last over time
cstyan Jan 13, 2024
7b3cadc
add shardmapping for last over time
cstyan Jan 13, 2024
b10c1de
wire up new op type in ast
cstyan Jan 13, 2024
928ccfa
setup last iterator
cstyan Jan 13, 2024
f89ea65
add iterators functionality
cstyan Jan 13, 2024
5ddf9df
add evaluator
cstyan Jan 13, 2024
104892b
add test cases for last over time
cstyan Jan 13, 2024
67dd511
fix some things I misunderstood
cstyan Jan 13, 2024
8e22df0
the last timestamp iterator aggregator needs to select the last sample
cstyan Jan 15, 2024
6404308
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 18, 2024
0d420dc
Merge branch 'main' into karsten/first-over-time
jeschkies Jan 18, 2024
f8c9c2b
simplify evaluators next functions
cstyan Jan 18, 2024
2bfd0b5
Merge commit 'f8c9c2bdbf2013cc151e986330263bf08dbeeefc' into karsten/…
jeschkies Jan 23, 2024
af67edd
Correct vector check.
jeschkies Jan 23, 2024
361babc
Unify first and last
jeschkies Jan 23, 2024
2fb80d7
Increase relative error
jeschkies Jan 23, 2024
1b6cc02
Document some code
jeschkies Jan 23, 2024
1f04c45
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 23, 2024
d2b055b
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 29, 2024
87bbd75
Split up documentation
jeschkies Jan 29, 2024
307bfc1
Drop commented TS
jeschkies Jan 29, 2024
4dc234f
Merge branch 'main' into karsten/first-over-time
jeschkies Jan 30, 2024
995b982
Merge branch 'main' into karsten/first-over-time
jeschkies Mar 6, 2024
5231b6f
Merge branch 'main' into karsten/first-over-time
cstyan Apr 23, 2024
9e10b2c
fixes after merging in main
cstyan Apr 23, 2024
2325e85
Merge branch 'main' into karsten/first-over-time
cstyan May 3, 2024
69f504b
Return an empty result instead of nil
jeschkies May 7, 2024
27a5d66
fix other minor issues
cstyan May 8, 2024
1b09ad3
handle sharding noop case for first/last over time
cstyan May 8, 2024
bd7f275
fix formatting
cstyan May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,35 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
}
}

// Step 3
type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeFirstOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String())
}

func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type Shards []astmapper.ShardAnnotation

func (xs Shards) Encode() (encoded []string) {
Expand Down Expand Up @@ -394,6 +423,43 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil
// Step 2:
case *MergeFirstOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{*shard}.Encode(),
}
}
queries[i] = qry
}

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
Comment on lines +561 to +562
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that a shard which only retrieves data for a single relevant stream could return a vector? or would the vector for that single series always be wrapped in a matrix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a matrix with one vector I believe. Only for instant queries we return a vector https://github.com/grafana/loki/blob/main/pkg/logql/engine.go#L388

}
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil

default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func TestMappingEquivalence(t *testing.T) {
`,
false,
},
// Step 1:
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -132,7 +135,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
query string
realtiveError float64
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03},
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this need to change from 0.03 to 0.05?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I change the test series to have the values spread out this would fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which change is that? the addition of nanos in the timestamp within test_utils.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
} {
q := NewMockQuerier(
Expand Down
11 changes: 6 additions & 5 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

next, ts, r := stepEvaluator.Next()
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand All @@ -361,7 +361,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
case ProbabilisticQuantileVector:
return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -371,7 +371,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, nil
}

func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {

seriesIndex := map[uint64]*promql.Series{}

Expand Down Expand Up @@ -419,15 +419,16 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: ts,
//T: ts,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can do it like this. What do you think, @cstyan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code on main overrides he timestamps in each vector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the timestamp from the actual vector samples more accurate than the one got from stepEvaluator.Next()?

Can you help me understand why it won't make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make sense for us. I'm just not sure some other component is relying on this overridden timestamp.

T: p.T,
F: p.F,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
next, ts, r = stepEvaluator.Next()
next, _, r = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,18 @@ func newRangeAggEvaluator(
return &QuantileSketchStepEvaluator{
iter: iter,
}, nil
// Step 6
case syntax.OpRangeTypeFirstWithTimestamp:
iter := newFirstWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
default:
iter, err := newRangeVectorIterator(
it, expr,
Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ func (e *BinOpStepEvaluator) Explain(parent Node) {
func (i *VectorIterator) Explain(parent Node) {
parent.Childf("%f vectorIterator", i.val)
}

func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}

func (e *firstOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeFirstOverTime")
}

func (EmptyEvaluator) Explain(parent Node) {
parent.Child("Empty")
}
191 changes: 191 additions & 0 deletions pkg/logql/first_over_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package logql

import (
"math"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/loki/pkg/iter"
)

func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64) RangeVectorIterator {
inner := &batchRangeVectorIterator{
iter: it,
step: step,
end: end,
selRange: selRange,
metrics: map[string]labels.Labels{},
window: map[string]*promql.Series{},
agg: nil,
current: start - step, // first loop iteration will set it to start
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does feel kind of brittle; is there a nice way we can have the iterator correctly retrieve the actual first step

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it's in the other cases. I think this is super brittle but couldn't think of a nicer way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it's in the other cases. I think this is super brittle but couldn't think of a nicer way.

Okay, I'll create an issue for that as a follow up, if this is following the existing pattern then lets just move forward with this as we have it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rework the iterators. I think range over func golang/go#61405 might help here a lot.

offset: offset,
}
return &firstWithTimestampBatchRangeVectorIterator{
batchRangeVectorIterator: inner,
}
}

type firstWithTimestampBatchRangeVectorIterator struct {
*batchRangeVectorIterator
at []promql.Sample
}

// Step 7
func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) {
if r.at == nil {
r.at = make([]promql.Sample, 0, len(r.window))
}
r.at = r.at[:0]
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r.current/1e+6 + r.offset/1e+6
for _, series := range r.window {
s := r.agg(series.Floats)
r.at = append(r.at, promql.Sample{
F: s.F,
T: s.T / int64(time.Millisecond),
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
Metric: series.Metric,
})
}
return ts, SampleVector(r.at)
}

func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint {
if len(samples) == 0 {
return promql.FPoint{F: math.NaN(), T: 0}
}
return samples[0]
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

// Step 8
type firstOverTimeStepEvaluator struct {
start, end, ts time.Time
step time.Duration
matrices []promql.Matrix
streamVec map[int64]int
}

func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator{}
}

var (
start = params.Start()
end = params.End()
step = params.Step()
)

index := make(map[int64]int, 0)
for i, series := range m[1] {
index[int64(series.Metric.Hash())] = i
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
}

return &firstOverTimeStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
matrices: m,
streamVec: index,
}
}

func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) {

var (
vec promql.Vector
ok bool
)

// TODO: build index metric to vec pos

e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
return false, 0, nil
}
ts := e.ts.UnixNano() / int64(time.Millisecond)

// Process first result
// len(e.matrices) >= 1 was check during creation
for s, series := range e.matrices[0] {
if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}

vec = append(vec, promql.Sample{
Metric: series.Metric,
T: series.Floats[0].T,
F: series.Floats[0].F,
})

e.pop(0, s)
}

if len(e.matrices) == 1 {
return ok, ts, SampleVector(vec)
}

if len(vec) == 0 {
return e.hasNext(), ts, SampleVector(vec)
}

// Merge other results
for i, m := range e.matrices[1:] {
// TODO: verify length and same labels/metric
for j, series := range m {

if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}

// Merge
if vec[j].T > series.Floats[0].T {
vec[j].F = series.Floats[0].F
vec[j].T = series.Floats[0].T
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

// We've omitted the first matrix. That's why +1.
e.pop(i+1, j)
}
}

// Align vector timestamps with step
for i := range vec {
vec[i].T = ts
}

return true, ts, SampleVector(vec)
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

func (e *firstOverTimeStepEvaluator) pop(r, s int) {
if len(e.matrices[r][s].Floats) <= 1 {
e.matrices[r][s].Floats = nil
return
}
e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:]
}

func (e *firstOverTimeStepEvaluator) inRange(t, ts int64) bool {
previous := ts - e.step.Milliseconds()
return previous <= t && t < ts
}

func (e *firstOverTimeStepEvaluator) hasNext() bool {
for _, m := range e.matrices {
for _, s := range m {
if len(s.Floats) != 0 {
return true
}
}
}

return false
}

func (*firstOverTimeStepEvaluator) Close() error { return nil }

func (*firstOverTimeStepEvaluator) Error() error { return nil }
2 changes: 1 addition & 1 deletion pkg/logql/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) {
// we skip sharding AST for now, it's not easy to clone them since they are not part of the language.
expr.Walk(func(e syntax.Expr) {
switch e.(type) {
case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr:
case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr:
skip = true
return
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,3 @@ func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
func (*QuantileSketchVectorStepEvaluator) Close() error { return nil }

func (*QuantileSketchVectorStepEvaluator) Error() error { return nil }

func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}
Loading
Loading