From de536b6dfcc16489d65a0d3fe279f0704e5e2dfa Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:39:46 +0100 Subject: [PATCH 01/30] Step 1 --- pkg/logql/downstream_test.go | 5 ++++- pkg/logql/test_utils.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 426722a55459..7a6360d74733 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -63,6 +63,9 @@ func TestMappingEquivalence(t *testing.T) { `, false, }, + // Step 1: + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. @@ -132,7 +135,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { query string realtiveError float64 }{ - {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03}, + {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02}, } { q := NewMockQuerier( diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 82442e09bf60..12eb97d30eb4 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -265,8 +265,9 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string, valueFi if valueField { line = fmt.Sprintf("%s value=%f", line, r.Float64()*100.0) } + nanos := r.Int63n(time.Second.Nanoseconds()) stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Unix(0, int64(j*int(time.Second))), + Timestamp: time.Unix(0, int64(j*int(time.Second))+nanos), Line: line, }) } From 235e386646735305631f96bda1ca0e32500daebd Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:41:36 +0100 Subject: [PATCH 02/30] Step 2 --- pkg/logql/downstream.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 76594dc040c2..d33cd588a2b8 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -394,6 +394,43 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( } inner := NewQuantileSketchMatrixStepEvaluator(matrix, params) return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil + // Step 2: + case *MergeFirstOverTimeExpr: + queries := make([]DownstreamQuery, len(e.downstreams)) + + for i, d := range e.downstreams { + qry := DownstreamQuery{ + Params: ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: d.SampleExpr, + }, + } + if shard := d.shard; shard != nil { + qry.Params = ParamsWithShardsOverride{ + Params: qry.Params, + ShardsOverride: Shards{*shard}.Encode(), + } + } + queries[i] = qry + } + + results, err := ev.Downstream(ctx, queries) + if err != nil { + return nil, err + } + + xs := make([]promql.Matrix, 0, len(queries)) + for _, res := range results { + + switch data := res.Data.(type) { + case promql.Matrix: + xs = append(xs, data) + default: + return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) + } + } + + return NewMergeFirstOverTimeStepEvaluator(params, xs), nil default: return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params) From 689c1614dd150724fa55d5520a8d3aa806793dbf Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:42:39 +0100 Subject: [PATCH 03/30] Step 3 --- pkg/logql/downstream.go | 29 ++++++++++++++++++++++++++ pkg/logql/explain.go | 13 ++++++++++++ pkg/logql/quantile_over_time_sketch.go | 5 ----- 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index d33cd588a2b8..48abb2815368 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -200,6 +200,35 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) { } } +// Step 3 +type MergeFirstOverTimeExpr struct { + syntax.SampleExpr + downstreams []DownstreamSampleExpr +} + +func (e MergeFirstOverTimeExpr) String() string { + var sb strings.Builder + for i, d := range e.downstreams { + if i >= defaultMaxDepth { + break + } + + if i > 0 { + sb.WriteString(" ++ ") + } + + sb.WriteString(d.String()) + } + return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String()) +} + +func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) { + f(e) + for _, d := range e.downstreams { + d.Walk(f) + } +} + type Shards []astmapper.ShardAnnotation func (xs Shards) Encode() (encoded []string) { diff --git a/pkg/logql/explain.go b/pkg/logql/explain.go index 4890d150f0a6..54eeb482fa6a 100644 --- a/pkg/logql/explain.go +++ b/pkg/logql/explain.go @@ -57,3 +57,16 @@ func (e *BinOpStepEvaluator) Explain(parent Node) { func (i *VectorIterator) Explain(parent Node) { parent.Childf("%f vectorIterator", i.val) } + +func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) { + b := parent.Child("QuantileSketchVector") + e.inner.Explain(b) +} + +func (e *firstOverTimeStepEvaluator) Explain(parent Node) { + parent.Child("MergeFirstOverTime") +} + +func (EmptyEvaluator) Explain(parent Node) { + parent.Child("Empty") +} diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index f9f05f99c997..05ad98cdb335 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -454,8 +454,3 @@ func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) { func (*QuantileSketchVectorStepEvaluator) Close() error { return nil } func (*QuantileSketchVectorStepEvaluator) Error() error { return nil } - -func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) { - b := parent.Child("QuantileSketchVector") - e.inner.Explain(b) -} From 09f6d660e0d897140f4f3151dbcfde0582998b80 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:43:29 +0100 Subject: [PATCH 04/30] Step 4 --- pkg/logql/optimize.go | 2 +- pkg/logql/shardmapper.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/logql/optimize.go b/pkg/logql/optimize.go index 2f9c80a64f91..b6eeb2f7f986 100644 --- a/pkg/logql/optimize.go +++ b/pkg/logql/optimize.go @@ -8,7 +8,7 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) { // we skip sharding AST for now, it's not easy to clone them since they are not part of the language. expr.Walk(func(e syntax.Expr) { switch e.(type) { - case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr: + case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr: skip = true return } diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 4a06b5f804e8..bdd701ebaef4 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -487,6 +487,35 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, quantile: expr.Params, }, bytesPerShard, nil + // Step 4 + case syntax.OpRangeTypeFirst: + potentialConflict := syntax.ReducesLabels(expr) + if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) { + return m.mapSampleExpr(expr, r) + } + + shards, bytesPerShard, err := m.shards.Shards(expr) + if err != nil { + return nil, 0, err + } + + downstreams := make([]DownstreamSampleExpr, 0, shards) + // This is the magic. We send a custom operation + expr.Operation = syntax.OpRangeTypeFirstWithTimestamp + for shard := shards - 1; shard >= 0; shard-- { + downstreams = append(downstreams, DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: shard, + Of: shards, + }, + SampleExpr: expr, + }) + } + + return &MergeFirstOverTimeExpr{ + downstreams: downstreams, + }, bytesPerShard, nil + default: // don't shard if there's not an appropriate optimization return noOp(expr, m.shards) From f7463ae93d3f9bc75f45a660e50cc063a8dfef60 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:43:47 +0100 Subject: [PATCH 05/30] Step 5 --- pkg/logql/syntax/ast.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index 900802207c50..fa6815d15341 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -1171,6 +1171,8 @@ const ( // evaluate expressions differently resulting in intermediate formats // that are not consumable by LogQL clients but are used for sharding. OpRangeTypeQuantileSketch = "__quantile_sketch_over_time__" + // Step 5 + OpRangeTypeFirstWithTimestamp = "__first_over_time_ts__" ) func IsComparisonOperator(op string) bool { @@ -1274,7 +1276,9 @@ func (e *RangeAggregationExpr) MatcherGroups() ([]MatcherRange, error) { func (e RangeAggregationExpr) validate() error { if e.Grouping != nil { switch e.Operation { - case OpRangeTypeAvg, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeQuantileSketch, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeFirst, OpRangeTypeLast: + case OpRangeTypeAvg, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, + OpRangeTypeQuantileSketch, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeFirst, + OpRangeTypeLast, OpRangeTypeFirstWithTimestamp: default: return fmt.Errorf("grouping not allowed for %s aggregation", e.Operation) } @@ -1283,7 +1287,8 @@ func (e RangeAggregationExpr) validate() error { switch e.Operation { case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeRate, OpRangeTypeRateCounter, - OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast, OpRangeTypeQuantileSketch: + OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast, OpRangeTypeQuantileSketch, + OpRangeTypeFirstWithTimestamp: return nil default: return fmt.Errorf("invalid aggregation %s with unwrap", e.Operation) @@ -2137,6 +2142,7 @@ var shardableOps = map[string]bool{ // range vector ops OpRangeTypeAvg: true, OpRangeTypeCount: true, + OpRangeTypeFirst: true, OpRangeTypeRate: true, OpRangeTypeBytes: true, OpRangeTypeBytesRate: true, From 750cec747b15e66bd77ced87f4a0e9742cf8c1f1 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:44:28 +0100 Subject: [PATCH 06/30] Step 6 --- pkg/logql/engine.go | 11 ++++++----- pkg/logql/evaluator.go | 12 ++++++++++++ pkg/logql/first_over_time.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 pkg/logql/first_over_time.go diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 8d951ad64c94..21bc28f410c6 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -351,7 +351,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ } defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close) - next, ts, r := stepEvaluator.Next() + next, _, r := stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } @@ -361,7 +361,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ case SampleVector: maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) - return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries) + return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries) case ProbabilisticQuantileVector: return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params) default: @@ -371,7 +371,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, nil } -func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { +func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { seriesIndex := map[uint64]*promql.Series{} @@ -419,7 +419,8 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato seriesIndex[hash] = series } series.Floats = append(series.Floats, promql.FPoint{ - T: ts, + //T: ts, + T: p.T, F: p.F, }) } @@ -427,7 +428,7 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato if len(seriesIndex) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) } - next, ts, r = stepEvaluator.Next() + next, _, r = stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 2d6837ef6a78..a6b2bade1c1b 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -556,6 +556,18 @@ func newRangeAggEvaluator( return &QuantileSketchStepEvaluator{ iter: iter, }, nil + // Step 6 + case syntax.OpRangeTypeFirstWithTimestamp: + iter := newFirstWithTimestampIterator( + it, + expr.Left.Interval.Nanoseconds(), + q.Step().Nanoseconds(), + q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(), + ) + + return &RangeVectorEvaluator{ + iter: iter, + }, nil default: iter, err := newRangeVectorIterator( it, expr, diff --git a/pkg/logql/first_over_time.go b/pkg/logql/first_over_time.go new file mode 100644 index 000000000000..b441cd89b154 --- /dev/null +++ b/pkg/logql/first_over_time.go @@ -0,0 +1,35 @@ +package logql + +import ( + "math" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + + "github.com/grafana/loki/pkg/iter" +) + +func newFirstWithTimestampIterator( + it iter.PeekingSampleIterator, + selRange, step, start, end, offset int64) RangeVectorIterator { + inner := &batchRangeVectorIterator{ + iter: it, + step: step, + end: end, + selRange: selRange, + metrics: map[string]labels.Labels{}, + window: map[string]*promql.Series{}, + agg: nil, + current: start - step, // first loop iteration will set it to start + offset: offset, + } + return &firstWithTimestampBatchRangeVectorIterator{ + batchRangeVectorIterator: inner, + } +} + +type firstWithTimestampBatchRangeVectorIterator struct { + *batchRangeVectorIterator + at []promql.Sample +} From 9281f4479ef5db068b5d42aa37949fdcc9448dc1 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:44:49 +0100 Subject: [PATCH 07/30] Step 7 --- pkg/logql/first_over_time.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/logql/first_over_time.go b/pkg/logql/first_over_time.go index b441cd89b154..fa7e82d0798b 100644 --- a/pkg/logql/first_over_time.go +++ b/pkg/logql/first_over_time.go @@ -33,3 +33,29 @@ type firstWithTimestampBatchRangeVectorIterator struct { *batchRangeVectorIterator at []promql.Sample } + +// Step 7 +func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { + if r.at == nil { + r.at = make([]promql.Sample, 0, len(r.window)) + } + r.at = r.at[:0] + // convert ts from nano to milli seconds as the iterator work with nanoseconds + ts := r.current/1e+6 + r.offset/1e+6 + for _, series := range r.window { + s := r.agg(series.Floats) + r.at = append(r.at, promql.Sample{ + F: s.F, + T: s.T / int64(time.Millisecond), + Metric: series.Metric, + }) + } + return ts, SampleVector(r.at) +} + +func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { + if len(samples) == 0 { + return promql.FPoint{F: math.NaN(), T: 0} + } + return samples[0] +} From 8f589fef47a5cf7d2988c33adcd686ff34a61e93 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jan 2024 10:45:01 +0100 Subject: [PATCH 08/30] Step 8 --- pkg/logql/first_over_time.go | 130 +++++++++++++++++++++++++++++++++++ pkg/logql/step_evaluator.go | 15 ++++ 2 files changed, 145 insertions(+) diff --git a/pkg/logql/first_over_time.go b/pkg/logql/first_over_time.go index fa7e82d0798b..e9e6080ccce7 100644 --- a/pkg/logql/first_over_time.go +++ b/pkg/logql/first_over_time.go @@ -59,3 +59,133 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint } return samples[0] } + +// Step 8 +type firstOverTimeStepEvaluator struct { + start, end, ts time.Time + step time.Duration + matrices []promql.Matrix + streamVec map[int64]int +} + +func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { + if len(m) == 0 { + return EmptyEvaluator{} + } + + var ( + start = params.Start() + end = params.End() + step = params.Step() + ) + + index := make(map[int64]int, 0) + for i, series := range m[1] { + index[int64(series.Metric.Hash())] = i + } + + return &firstOverTimeStepEvaluator{ + start: start, + end: end, + ts: start.Add(-step), // will be corrected on first Next() call + step: step, + matrices: m, + streamVec: index, + } +} + +func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { + + var ( + vec promql.Vector + ok bool + ) + + // TODO: build index metric to vec pos + + e.ts = e.ts.Add(e.step) + if e.ts.After(e.end) { + return false, 0, nil + } + ts := e.ts.UnixNano() / int64(time.Millisecond) + + // Process first result + // len(e.matrices) >= 1 was check during creation + for s, series := range e.matrices[0] { + if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { + continue + } + + vec = append(vec, promql.Sample{ + Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, + }) + + e.pop(0, s) + } + + if len(e.matrices) == 1 { + return ok, ts, SampleVector(vec) + } + + if len(vec) == 0 { + return e.hasNext(), ts, SampleVector(vec) + } + + // Merge other results + for i, m := range e.matrices[1:] { + // TODO: verify length and same labels/metric + for j, series := range m { + + if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { + continue + } + + // Merge + if vec[j].T > series.Floats[0].T { + vec[j].F = series.Floats[0].F + vec[j].T = series.Floats[0].T + } + + // We've omitted the first matrix. That's why +1. + e.pop(i+1, j) + } + } + + // Align vector timestamps with step + for i := range vec { + vec[i].T = ts + } + + return true, ts, SampleVector(vec) +} + +func (e *firstOverTimeStepEvaluator) pop(r, s int) { + if len(e.matrices[r][s].Floats) <= 1 { + e.matrices[r][s].Floats = nil + return + } + e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] +} + +func (e *firstOverTimeStepEvaluator) inRange(t, ts int64) bool { + previous := ts - e.step.Milliseconds() + return previous <= t && t < ts +} + +func (e *firstOverTimeStepEvaluator) hasNext() bool { + for _, m := range e.matrices { + for _, s := range m { + if len(s.Floats) != 0 { + return true + } + } + } + + return false +} + +func (*firstOverTimeStepEvaluator) Close() error { return nil } + +func (*firstOverTimeStepEvaluator) Error() error { return nil } diff --git a/pkg/logql/step_evaluator.go b/pkg/logql/step_evaluator.go index 955f9e2b97f8..3d4ffa192c22 100644 --- a/pkg/logql/step_evaluator.go +++ b/pkg/logql/step_evaluator.go @@ -32,3 +32,18 @@ type StepEvaluator interface { // Explain returns a print of the step evaluation tree Explain(Node) } + +type EmptyEvaluator struct{} + +var _ StepEvaluator = EmptyEvaluator{} + +// Close implements StepEvaluator. +func (EmptyEvaluator) Close() error { return nil } + +// Error implements StepEvaluator. +func (EmptyEvaluator) Error() error { return nil } + +// Next implements StepEvaluator. +func (EmptyEvaluator) Next() (ok bool, ts int64, r StepResult) { + return false, 0, nil +} From 810b07c50f4b15df8ba16181e1fa3da879cc8440 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:00:26 -0800 Subject: [PATCH 09/30] downstreaming of last over time expr Signed-off-by: Callum Styan --- pkg/logql/downstream.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 48abb2815368..f4f381ca24aa 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -460,7 +460,42 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( } return NewMergeFirstOverTimeStepEvaluator(params, xs), nil + case *MergeLastOverTimeExpr: + queries := make([]DownstreamQuery, len(e.downstreams)) + + for i, d := range e.downstreams { + qry := DownstreamQuery{ + Params: ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: d.SampleExpr, + }, + } + if shard := d.shard; shard != nil { + qry.Params = ParamsWithShardsOverride{ + Params: qry.Params, + ShardsOverride: Shards{*shard}.Encode(), + } + } + queries[i] = qry + } + + results, err := ev.Downstream(ctx, queries) + if err != nil { + return nil, err + } + + xs := make([]promql.Matrix, 0, len(queries)) + for _, res := range results { + + switch data := res.Data.(type) { + case promql.Matrix: + xs = append(xs, data) + default: + return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) + } + } + return NewMergeLastOverTimeStepEvaluator(params, xs), nil default: return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params) } From 5e106bf0950c8a48dc5899e6cd341a5c476599e4 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:05:21 -0800 Subject: [PATCH 10/30] add expr type for merging of last over time Signed-off-by: Callum Styan --- pkg/logql/downstream.go | 28 ++++++++++++++++++++++++++++ pkg/logql/explain.go | 4 ++++ 2 files changed, 32 insertions(+) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index f4f381ca24aa..69e1e3518c70 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -229,6 +229,34 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) { } } +type MergeLastOverTimeExpr struct { + syntax.SampleExpr + downstreams []DownstreamSampleExpr +} + +func (e MergeLastOverTimeExpr) String() string { + var sb strings.Builder + for i, d := range e.downstreams { + if i >= defaultMaxDepth { + break + } + + if i > 0 { + sb.WriteString(" ++ ") + } + + sb.WriteString(d.String()) + } + return fmt.Sprintf("MergeLastOverTime<%s>", sb.String()) +} + +func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) { + f(e) + for _, d := range e.downstreams { + d.Walk(f) + } +} + type Shards []astmapper.ShardAnnotation func (xs Shards) Encode() (encoded []string) { diff --git a/pkg/logql/explain.go b/pkg/logql/explain.go index 54eeb482fa6a..b010605c2728 100644 --- a/pkg/logql/explain.go +++ b/pkg/logql/explain.go @@ -67,6 +67,10 @@ func (e *firstOverTimeStepEvaluator) Explain(parent Node) { parent.Child("MergeFirstOverTime") } +func (e *lastOverTimeStepEvaluator) Explain(parent Node) { + parent.Child("MergeLastOverTime") +} + func (EmptyEvaluator) Explain(parent Node) { parent.Child("Empty") } From 7b3cadc2b4ff3913d7b2fcafd68649339c4507ed Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:07:36 -0800 Subject: [PATCH 11/30] add shardmapping for last over time Signed-off-by: Callum Styan --- pkg/logql/optimize.go | 2 +- pkg/logql/shardmapper.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/logql/optimize.go b/pkg/logql/optimize.go index b6eeb2f7f986..867f2691d46c 100644 --- a/pkg/logql/optimize.go +++ b/pkg/logql/optimize.go @@ -8,7 +8,7 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) { // we skip sharding AST for now, it's not easy to clone them since they are not part of the language. expr.Walk(func(e syntax.Expr) { switch e.(type) { - case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr: + case *ConcatSampleExpr, *DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr, *MergeLastOverTimeExpr: skip = true return } diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index bdd701ebaef4..2748b85d6a69 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -515,7 +515,32 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return &MergeFirstOverTimeExpr{ downstreams: downstreams, }, bytesPerShard, nil + case syntax.OpRangeTypeLast: + potentialConflict := syntax.ReducesLabels(expr) + if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) { + return m.mapSampleExpr(expr, r) + } + + shards, bytesPerShard, err := m.shards.Shards(expr) + if err != nil { + return nil, 0, err + } + downstreams := make([]DownstreamSampleExpr, 0, shards) + expr.Operation = syntax.OpRangeTypeFirstWithTimestamp + for shard := shards - 1; shard >= 0; shard-- { + downstreams = append(downstreams, DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: shard, + Of: shards, + }, + SampleExpr: expr, + }) + } + + return &MergeLastOverTimeExpr{ + downstreams: downstreams, + }, bytesPerShard, nil default: // don't shard if there's not an appropriate optimization return noOp(expr, m.shards) From b10c1dec7fa21c5f27fd559b1d133dd936433c39 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:13:56 -0800 Subject: [PATCH 12/30] wire up new op type in ast Signed-off-by: Callum Styan --- pkg/logql/syntax/ast.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index fa6815d15341..7249ba85e358 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -1173,6 +1173,7 @@ const ( OpRangeTypeQuantileSketch = "__quantile_sketch_over_time__" // Step 5 OpRangeTypeFirstWithTimestamp = "__first_over_time_ts__" + OpRangeTypeLastWithTimestamp = "__last_over_time_ts__" ) func IsComparisonOperator(op string) bool { @@ -1278,7 +1279,7 @@ func (e RangeAggregationExpr) validate() error { switch e.Operation { case OpRangeTypeAvg, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeQuantileSketch, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeFirst, - OpRangeTypeLast, OpRangeTypeFirstWithTimestamp: + OpRangeTypeLast, OpRangeTypeFirstWithTimestamp, OpRangeTypeLastWithTimestamp: default: return fmt.Errorf("grouping not allowed for %s aggregation", e.Operation) } @@ -1288,7 +1289,7 @@ func (e RangeAggregationExpr) validate() error { case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeRate, OpRangeTypeRateCounter, OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast, OpRangeTypeQuantileSketch, - OpRangeTypeFirstWithTimestamp: + OpRangeTypeFirstWithTimestamp, OpRangeTypeLastWithTimestamp: return nil default: return fmt.Errorf("invalid aggregation %s with unwrap", e.Operation) @@ -2143,6 +2144,7 @@ var shardableOps = map[string]bool{ OpRangeTypeAvg: true, OpRangeTypeCount: true, OpRangeTypeFirst: true, + OpRangeTypeLast: true, OpRangeTypeRate: true, OpRangeTypeBytes: true, OpRangeTypeBytesRate: true, From 928ccfa9c0abdd7a96aaed2f3d98cdd467ca539e Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:20:17 -0800 Subject: [PATCH 13/30] setup last iterator Signed-off-by: Callum Styan --- pkg/logql/evaluator.go | 11 +++++++++ ...t_over_time.go => first_last_over_time.go} | 24 +++++++++++++++++++ 2 files changed, 35 insertions(+) rename pkg/logql/{first_over_time.go => first_last_over_time.go} (86%) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index a6b2bade1c1b..523c04feb0cb 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -565,6 +565,17 @@ func newRangeAggEvaluator( q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(), ) + return &RangeVectorEvaluator{ + iter: iter, + }, nil + case syntax.OpRangeTypeLastWithTimestamp: + iter := newLastWithTimestampIterator( + it, + expr.Left.Interval.Nanoseconds(), + q.Step().Nanoseconds(), + q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(), + ) + return &RangeVectorEvaluator{ iter: iter, }, nil diff --git a/pkg/logql/first_over_time.go b/pkg/logql/first_last_over_time.go similarity index 86% rename from pkg/logql/first_over_time.go rename to pkg/logql/first_last_over_time.go index e9e6080ccce7..fb28d87019fc 100644 --- a/pkg/logql/first_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -189,3 +189,27 @@ func (e *firstOverTimeStepEvaluator) hasNext() bool { func (*firstOverTimeStepEvaluator) Close() error { return nil } func (*firstOverTimeStepEvaluator) Error() error { return nil } + +func newLastWithTimestampIterator( + it iter.PeekingSampleIterator, + selRange, step, start, end, offset int64) RangeVectorIterator { + inner := &batchRangeVectorIterator{ + iter: it, + step: step, + end: end, + selRange: selRange, + metrics: map[string]labels.Labels{}, + window: map[string]*promql.Series{}, + agg: nil, + current: end + step, // first loop iteration will set it to end + offset: offset, + } + return &lastWithTimestampBatchRangeVectorIterator{ + batchRangeVectorIterator: inner, + } +} + +type lastWithTimestampBatchRangeVectorIterator struct { + *batchRangeVectorIterator + at []promql.Sample +} From f89ea658c2ff0c2ca0ca6c861772d42e1080abd8 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:21:54 -0800 Subject: [PATCH 14/30] add iterators functionality Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index fb28d87019fc..16e16bbce32d 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -213,3 +213,28 @@ type lastWithTimestampBatchRangeVectorIterator struct { *batchRangeVectorIterator at []promql.Sample } + +func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { + if r.at == nil { + r.at = make([]promql.Sample, 0, len(r.window)) + } + r.at = r.at[:0] + // convert ts from nano to milli seconds as the iterator work with nanoseconds + ts := r.current/1e+6 + r.offset/1e+6 + for _, series := range r.window { + s := r.agg(series.Floats) + r.at = append(r.at, promql.Sample{ + F: s.F, + T: s.T / int64(time.Millisecond), + Metric: series.Metric, + }) + } + return ts, SampleVector(r.at) +} + +func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { + if len(samples) == 0 { + return promql.FPoint{F: math.NaN(), T: 0} + } + return samples[0] +} From 5ddf9dff9506f090aa4d23347293f369657b8ed9 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:25:58 -0800 Subject: [PATCH 15/30] add evaluator Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 199 ++++++++++++++++++++++++------ 1 file changed, 164 insertions(+), 35 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 16e16bbce32d..617d421c4e99 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -60,6 +60,55 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint return samples[0] } +func newLastWithTimestampIterator( + it iter.PeekingSampleIterator, + selRange, step, start, end, offset int64) RangeVectorIterator { + inner := &batchRangeVectorIterator{ + iter: it, + step: step, + end: end, + selRange: selRange, + metrics: map[string]labels.Labels{}, + window: map[string]*promql.Series{}, + agg: nil, + current: end + step, // first loop iteration will set it to end + offset: offset, + } + return &lastWithTimestampBatchRangeVectorIterator{ + batchRangeVectorIterator: inner, + } +} + +type lastWithTimestampBatchRangeVectorIterator struct { + *batchRangeVectorIterator + at []promql.Sample +} + +func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { + if r.at == nil { + r.at = make([]promql.Sample, 0, len(r.window)) + } + r.at = r.at[:0] + // convert ts from nano to milli seconds as the iterator work with nanoseconds + ts := r.current/1e+6 + r.offset/1e+6 + for _, series := range r.window { + s := r.agg(series.Floats) + r.at = append(r.at, promql.Sample{ + F: s.F, + T: s.T / int64(time.Millisecond), + Metric: series.Metric, + }) + } + return ts, SampleVector(r.at) +} + +func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { + if len(samples) == 0 { + return promql.FPoint{F: math.NaN(), T: 0} + } + return samples[0] +} + // Step 8 type firstOverTimeStepEvaluator struct { start, end, ts time.Time @@ -190,51 +239,131 @@ func (*firstOverTimeStepEvaluator) Close() error { return nil } func (*firstOverTimeStepEvaluator) Error() error { return nil } -func newLastWithTimestampIterator( - it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { - inner := &batchRangeVectorIterator{ - iter: it, - step: step, - end: end, - selRange: selRange, - metrics: map[string]labels.Labels{}, - window: map[string]*promql.Series{}, - agg: nil, - current: end + step, // first loop iteration will set it to end - offset: offset, +type lastOverTimeStepEvaluator struct { + start, end, ts time.Time + step time.Duration + matrices []promql.Matrix + streamVec map[int64]int +} + +func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { + if len(m) == 0 { + return EmptyEvaluator{} } - return &lastWithTimestampBatchRangeVectorIterator{ - batchRangeVectorIterator: inner, + + var ( + start = params.Start() + end = params.End() + step = params.Step() + ) + + index := make(map[int64]int, 0) + for i, series := range m[1] { + index[int64(series.Metric.Hash())] = i } -} -type lastWithTimestampBatchRangeVectorIterator struct { - *batchRangeVectorIterator - at []promql.Sample + return &lastOverTimeStepEvaluator{ + start: start, + end: end, + ts: start.Add(-step), // will be corrected on first Next() call + step: step, + matrices: m, + streamVec: index, + } } -func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { - if r.at == nil { - r.at = make([]promql.Sample, 0, len(r.window)) +func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { + + var ( + vec promql.Vector + ok bool + ) + + // TODO: build index metric to vec pos + + e.ts = e.ts.Add(e.step) + if e.ts.After(e.end) { + return false, 0, nil } - r.at = r.at[:0] - // convert ts from nano to milli seconds as the iterator work with nanoseconds - ts := r.current/1e+6 + r.offset/1e+6 - for _, series := range r.window { - s := r.agg(series.Floats) - r.at = append(r.at, promql.Sample{ - F: s.F, - T: s.T / int64(time.Millisecond), + ts := e.ts.UnixNano() / int64(time.Millisecond) + + // Process first result + // len(e.matrices) >= 1 was check during creation + for s, series := range e.matrices[0] { + if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { + continue + } + + vec = append(vec, promql.Sample{ Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, }) + + e.pop(0, s) } - return ts, SampleVector(r.at) + + if len(e.matrices) == 1 { + return ok, ts, SampleVector(vec) + } + + if len(vec) == 0 { + return e.hasNext(), ts, SampleVector(vec) + } + + // Merge other results + for i, m := range e.matrices[1:] { + // TODO: verify length and same labels/metric + for j, series := range m { + + if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { + continue + } + + // Merge + if vec[j].T > series.Floats[0].T { + vec[j].F = series.Floats[0].F + vec[j].T = series.Floats[0].T + } + + // We've omitted the first matrix. That's why +1. + e.pop(i+1, j) + } + } + + // Align vector timestamps with step + for i := range vec { + vec[i].T = ts + } + + return true, ts, SampleVector(vec) } -func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { - if len(samples) == 0 { - return promql.FPoint{F: math.NaN(), T: 0} +func (e *lastOverTimeStepEvaluator) pop(r, s int) { + if len(e.matrices[r][s].Floats) <= 1 { + e.matrices[r][s].Floats = nil + return } - return samples[0] + e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] } + +func (e *lastOverTimeStepEvaluator) inRange(t, ts int64) bool { + previous := ts - e.step.Milliseconds() + return previous <= t && t < ts +} + +func (e *lastOverTimeStepEvaluator) hasNext() bool { + for _, m := range e.matrices { + for _, s := range m { + if len(s.Floats) != 0 { + return true + } + } + } + + return false +} + +func (*lastOverTimeStepEvaluator) Close() error { return nil } + +func (*lastOverTimeStepEvaluator) Error() error { return nil } From 104892b66808ea576aca0d9774d22eec42c92a45 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 17:26:06 -0800 Subject: [PATCH 16/30] add test cases for last over time Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 7a6360d74733..af1b19149970 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -66,6 +66,8 @@ func TestMappingEquivalence(t *testing.T) { // Step 1: {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. From 67dd51187cbb63a1e815b4c8f44568f3f17a676a Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 12 Jan 2024 19:22:15 -0800 Subject: [PATCH 17/30] fix some things I misunderstood Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 4 ++-- pkg/logql/shardmapper.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 617d421c4e99..4d138729a9e5 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -71,7 +71,7 @@ func newLastWithTimestampIterator( metrics: map[string]labels.Labels{}, window: map[string]*promql.Series{}, agg: nil, - current: end + step, // first loop iteration will set it to end + current: start - step, // first loop iteration will set it to start offset: offset, } return &lastWithTimestampBatchRangeVectorIterator{ @@ -321,7 +321,7 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } // Merge - if vec[j].T > series.Floats[0].T { + if vec[j].T < series.Floats[0].T { vec[j].F = series.Floats[0].F vec[j].T = series.Floats[0].T } diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 2748b85d6a69..4222e820539a 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -527,7 +527,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, } downstreams := make([]DownstreamSampleExpr, 0, shards) - expr.Operation = syntax.OpRangeTypeFirstWithTimestamp + expr.Operation = syntax.OpRangeTypeLastWithTimestamp for shard := shards - 1; shard >= 0; shard-- { downstreams = append(downstreams, DownstreamSampleExpr{ shard: &astmapper.ShardAnnotation{ From 8e22df0e28fda37c1b76af47f388ccfff919d8c1 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Mon, 15 Jan 2024 14:10:34 -0800 Subject: [PATCH 18/30] the last timestamp iterator aggregator needs to select the last sample in each vector, not the first Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 4d138729a9e5..8c88ad9228d3 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -106,7 +106,7 @@ func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) if len(samples) == 0 { return promql.FPoint{F: math.NaN(), T: 0} } - return samples[0] + return samples[len(samples)-1] } // Step 8 From f8c9c2bdbf2013cc151e986330263bf08dbeeefc Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 18 Jan 2024 15:25:00 -0800 Subject: [PATCH 19/30] simplify evaluators next functions Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 118 ++++++++++++------------------ 1 file changed, 46 insertions(+), 72 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 8c88ad9228d3..9ca5d2b15bb2 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -114,7 +114,6 @@ type firstOverTimeStepEvaluator struct { start, end, ts time.Time step time.Duration matrices []promql.Matrix - streamVec map[int64]int } func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { @@ -128,18 +127,12 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv step = params.Step() ) - index := make(map[int64]int, 0) - for i, series := range m[1] { - index[int64(series.Metric.Hash())] = i - } - return &firstOverTimeStepEvaluator{ - start: start, - end: end, - ts: start.Add(-step), // will be corrected on first Next() call - step: step, - matrices: m, - streamVec: index, + start: start, + end: end, + ts: start.Add(-step), // will be corrected on first Next() call + step: step, + matrices: m, } } @@ -160,30 +153,9 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { // Process first result // len(e.matrices) >= 1 was check during creation - for s, series := range e.matrices[0] { - if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { - continue - } - - vec = append(vec, promql.Sample{ - Metric: series.Metric, - T: series.Floats[0].T, - F: series.Floats[0].F, - }) - - e.pop(0, s) - } - - if len(e.matrices) == 1 { - return ok, ts, SampleVector(vec) - } - - if len(vec) == 0 { - return e.hasNext(), ts, SampleVector(vec) - } // Merge other results - for i, m := range e.matrices[1:] { + for i, m := range e.matrices { // TODO: verify length and same labels/metric for j, series := range m { @@ -192,13 +164,19 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } // Merge - if vec[j].T > series.Floats[0].T { + if len(vec) < len(e.matrices) { + vec = append(vec, promql.Sample{ + Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, + }) + } else if vec[j].T > series.Floats[0].T { vec[j].F = series.Floats[0].F vec[j].T = series.Floats[0].T } // We've omitted the first matrix. That's why +1. - e.pop(i+1, j) + e.pop(i, j) } } @@ -207,6 +185,16 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } + if len(e.matrices) == 1 { + //fmt.Println("just one matrix!!!") + //fmt.Println("length of vec: ", len(vec)) + return ok, ts, SampleVector(vec) + } + + if len(vec) == 0 { + return e.hasNext(), ts, SampleVector(vec) + } + return true, ts, SampleVector(vec) } @@ -243,7 +231,6 @@ type lastOverTimeStepEvaluator struct { start, end, ts time.Time step time.Duration matrices []promql.Matrix - streamVec map[int64]int } func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { @@ -257,18 +244,12 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva step = params.Step() ) - index := make(map[int64]int, 0) - for i, series := range m[1] { - index[int64(series.Metric.Hash())] = i - } - return &lastOverTimeStepEvaluator{ - start: start, - end: end, - ts: start.Add(-step), // will be corrected on first Next() call - step: step, - matrices: m, - streamVec: index, + start: start, + end: end, + ts: start.Add(-step), // will be corrected on first Next() call + step: step, + matrices: m, } } @@ -289,30 +270,9 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { // Process first result // len(e.matrices) >= 1 was check during creation - for s, series := range e.matrices[0] { - if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { - continue - } - - vec = append(vec, promql.Sample{ - Metric: series.Metric, - T: series.Floats[0].T, - F: series.Floats[0].F, - }) - - e.pop(0, s) - } - - if len(e.matrices) == 1 { - return ok, ts, SampleVector(vec) - } - - if len(vec) == 0 { - return e.hasNext(), ts, SampleVector(vec) - } // Merge other results - for i, m := range e.matrices[1:] { + for i, m := range e.matrices { // TODO: verify length and same labels/metric for j, series := range m { @@ -321,13 +281,19 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } // Merge - if vec[j].T < series.Floats[0].T { + if len(vec) < len(e.matrices) { + vec = append(vec, promql.Sample{ + Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, + }) + } else if vec[j].T < series.Floats[0].T { vec[j].F = series.Floats[0].F vec[j].T = series.Floats[0].T } // We've omitted the first matrix. That's why +1. - e.pop(i+1, j) + e.pop(i, j) } } @@ -336,6 +302,14 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } + if len(e.matrices) == 1 { + return ok, ts, SampleVector(vec) + } + + if len(vec) == 0 { + return e.hasNext(), ts, SampleVector(vec) + } + return true, ts, SampleVector(vec) } From af67edd2dbb62f6127d4e8ab4c4f755f8972edc6 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 23 Jan 2024 11:30:31 +0100 Subject: [PATCH 20/30] Correct vector check. --- pkg/logql/downstream_test.go | 2 +- pkg/logql/first_last_over_time.go | 31 ++----------------------------- 2 files changed, 3 insertions(+), 30 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index fdda668a5feb..f2580f4d3246 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -136,7 +136,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { query string realtiveError float64 }{ - {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05}, + {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02}, } { q := NewMockQuerier( diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 9ca5d2b15bb2..74a374c798cd 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -34,7 +34,6 @@ type firstWithTimestampBatchRangeVectorIterator struct { at []promql.Sample } -// Step 7 func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { if r.at == nil { r.at = make([]promql.Sample, 0, len(r.window)) @@ -140,20 +139,14 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { var ( vec promql.Vector - ok bool ) - // TODO: build index metric to vec pos - e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { return false, 0, nil } ts := e.ts.UnixNano() / int64(time.Millisecond) - // Process first result - // len(e.matrices) >= 1 was check during creation - // Merge other results for i, m := range e.matrices { // TODO: verify length and same labels/metric @@ -164,7 +157,7 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } // Merge - if len(vec) < len(e.matrices) { + if len(vec) < len(m) { vec = append(vec, promql.Sample{ Metric: series.Metric, T: series.Floats[0].T, @@ -185,12 +178,6 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } - if len(e.matrices) == 1 { - //fmt.Println("just one matrix!!!") - //fmt.Println("length of vec: ", len(vec)) - return ok, ts, SampleVector(vec) - } - if len(vec) == 0 { return e.hasNext(), ts, SampleVector(vec) } @@ -257,20 +244,14 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { var ( vec promql.Vector - ok bool ) - // TODO: build index metric to vec pos - e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { return false, 0, nil } ts := e.ts.UnixNano() / int64(time.Millisecond) - // Process first result - // len(e.matrices) >= 1 was check during creation - // Merge other results for i, m := range e.matrices { // TODO: verify length and same labels/metric @@ -281,7 +262,7 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } // Merge - if len(vec) < len(e.matrices) { + if len(vec) < len(m) { vec = append(vec, promql.Sample{ Metric: series.Metric, T: series.Floats[0].T, @@ -302,14 +283,6 @@ func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } - if len(e.matrices) == 1 { - return ok, ts, SampleVector(vec) - } - - if len(vec) == 0 { - return e.hasNext(), ts, SampleVector(vec) - } - return true, ts, SampleVector(vec) } From 361babc2f9f922b475b2e2db9272de6291a6bd0d Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 23 Jan 2024 11:39:18 +0100 Subject: [PATCH 21/30] Unify first and last --- pkg/logql/explain.go | 6 +- pkg/logql/first_last_over_time.go | 160 ++++++++++-------------------- 2 files changed, 51 insertions(+), 115 deletions(-) diff --git a/pkg/logql/explain.go b/pkg/logql/explain.go index b010605c2728..b29b4eb6a42a 100644 --- a/pkg/logql/explain.go +++ b/pkg/logql/explain.go @@ -63,14 +63,10 @@ func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) { e.inner.Explain(b) } -func (e *firstOverTimeStepEvaluator) Explain(parent Node) { +func (e *mergeOverTimeStepEvaluator) Explain(parent Node) { parent.Child("MergeFirstOverTime") } -func (e *lastOverTimeStepEvaluator) Explain(parent Node) { - parent.Child("MergeLastOverTime") -} - func (EmptyEvaluator) Explain(parent Node) { parent.Child("Empty") } diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 74a374c798cd..d859bb7d83a2 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -108,34 +108,14 @@ func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) return samples[len(samples)-1] } -// Step 8 -type firstOverTimeStepEvaluator struct { +type mergeOverTimeStepEvaluator struct { start, end, ts time.Time step time.Duration matrices []promql.Matrix + merge func(promql.Vector, int, int, promql.Series) promql.Vector } -func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { - if len(m) == 0 { - return EmptyEvaluator{} - } - - var ( - start = params.Start() - end = params.End() - step = params.Step() - ) - - return &firstOverTimeStepEvaluator{ - start: start, - end: end, - ts: start.Add(-step), // will be corrected on first Next() call - step: step, - matrices: m, - } -} - -func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { +func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { var ( vec promql.Vector @@ -156,19 +136,8 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { continue } - // Merge - if len(vec) < len(m) { - vec = append(vec, promql.Sample{ - Metric: series.Metric, - T: series.Floats[0].T, - F: series.Floats[0].F, - }) - } else if vec[j].T > series.Floats[0].T { - vec[j].F = series.Floats[0].F - vec[j].T = series.Floats[0].T - } + vec = e.merge(vec, j, len(m), series) - // We've omitted the first matrix. That's why +1. e.pop(i, j) } } @@ -185,7 +154,7 @@ func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { return true, ts, SampleVector(vec) } -func (e *firstOverTimeStepEvaluator) pop(r, s int) { +func (e *mergeOverTimeStepEvaluator) pop(r, s int) { if len(e.matrices[r][s].Floats) <= 1 { e.matrices[r][s].Floats = nil return @@ -193,12 +162,12 @@ func (e *firstOverTimeStepEvaluator) pop(r, s int) { e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] } -func (e *firstOverTimeStepEvaluator) inRange(t, ts int64) bool { +func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { previous := ts - e.step.Milliseconds() return previous <= t && t < ts } -func (e *firstOverTimeStepEvaluator) hasNext() bool { +func (e *mergeOverTimeStepEvaluator) hasNext() bool { for _, m := range e.matrices { for _, s := range m { if len(s.Floats) != 0 { @@ -210,17 +179,11 @@ func (e *firstOverTimeStepEvaluator) hasNext() bool { return false } -func (*firstOverTimeStepEvaluator) Close() error { return nil } +func (*mergeOverTimeStepEvaluator) Close() error { return nil } -func (*firstOverTimeStepEvaluator) Error() error { return nil } +func (*mergeOverTimeStepEvaluator) Error() error { return nil } -type lastOverTimeStepEvaluator struct { - start, end, ts time.Time - step time.Duration - matrices []promql.Matrix -} - -func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { +func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { if len(m) == 0 { return EmptyEvaluator{} } @@ -231,86 +194,63 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva step = params.Step() ) - return &lastOverTimeStepEvaluator{ + return &mergeOverTimeStepEvaluator{ start: start, end: end, ts: start.Add(-step), // will be corrected on first Next() call step: step, matrices: m, + merge: mergeFirstOverTime, } } -func (e *lastOverTimeStepEvaluator) Next() (bool, int64, StepResult) { - - var ( - vec promql.Vector - ) - - e.ts = e.ts.Add(e.step) - if e.ts.After(e.end) { - return false, 0, nil +func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector { + if len(vec) < nSeries { + return append(vec, promql.Sample{ + Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, + }) + } else if vec[pos].T > series.Floats[0].T { + vec[pos].F = series.Floats[0].F + vec[pos].T = series.Floats[0].T } - ts := e.ts.UnixNano() / int64(time.Millisecond) - - // Merge other results - for i, m := range e.matrices { - // TODO: verify length and same labels/metric - for j, series := range m { - - if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { - continue - } - - // Merge - if len(vec) < len(m) { - vec = append(vec, promql.Sample{ - Metric: series.Metric, - T: series.Floats[0].T, - F: series.Floats[0].F, - }) - } else if vec[j].T < series.Floats[0].T { - vec[j].F = series.Floats[0].F - vec[j].T = series.Floats[0].T - } - // We've omitted the first matrix. That's why +1. - e.pop(i, j) - } - } + return vec +} - // Align vector timestamps with step - for i := range vec { - vec[i].T = ts +func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { + if len(m) == 0 { + return EmptyEvaluator{} } - return true, ts, SampleVector(vec) -} + var ( + start = params.Start() + end = params.End() + step = params.Step() + ) -func (e *lastOverTimeStepEvaluator) pop(r, s int) { - if len(e.matrices[r][s].Floats) <= 1 { - e.matrices[r][s].Floats = nil - return + return &mergeOverTimeStepEvaluator{ + start: start, + end: end, + ts: start.Add(-step), // will be corrected on first Next() call + step: step, + matrices: m, + merge: mergeLastOverTime, } - e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] -} - -func (e *lastOverTimeStepEvaluator) inRange(t, ts int64) bool { - previous := ts - e.step.Milliseconds() - return previous <= t && t < ts } -func (e *lastOverTimeStepEvaluator) hasNext() bool { - for _, m := range e.matrices { - for _, s := range m { - if len(s.Floats) != 0 { - return true - } - } +func mergeLastOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector { + if len(vec) < nSeries { + return append(vec, promql.Sample{ + Metric: series.Metric, + T: series.Floats[0].T, + F: series.Floats[0].F, + }) + } else if vec[pos].T < series.Floats[0].T { + vec[pos].F = series.Floats[0].F + vec[pos].T = series.Floats[0].T } - return false + return vec } - -func (*lastOverTimeStepEvaluator) Close() error { return nil } - -func (*lastOverTimeStepEvaluator) Error() error { return nil } From 2fb80d7cd8dd2bca2c6513b770fb33f74b7dd4ed Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 23 Jan 2024 11:52:01 +0100 Subject: [PATCH 22/30] Increase relative error --- pkg/logql/downstream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index f2580f4d3246..fdda668a5feb 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -136,7 +136,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { query string realtiveError float64 }{ - {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03}, + {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02}, } { q := NewMockQuerier( From 1b6cc0211d148a098f4e3d8e6de4208d877776cb Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 23 Jan 2024 12:01:00 +0100 Subject: [PATCH 23/30] Document some code --- pkg/logql/first_last_over_time.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index d859bb7d83a2..f054441fdd4f 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -10,6 +10,8 @@ import ( "github.com/grafana/loki/pkg/iter" ) +// newFirstWithTimestampIterator returns an iterator the returns the first value +// of a windowed aggregation. func newFirstWithTimestampIterator( it iter.PeekingSampleIterator, selRange, step, start, end, offset int64) RangeVectorIterator { @@ -52,6 +54,14 @@ func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { return ts, SampleVector(r.at) } +// The inner batchRangeVectorIterator has preloaded valid samples into each +// series (see the window filed and load). They're in increasing timestamp +// order (see next, moves the current timestamp forward by step). So this +// means that for each downstreamed shard of a first_over_time, selecting the +// first sample here in each iterator is getting us the earliest timestamped +// value. Later on when we merge we select the earliest from all the shards. +// +// For last_over_time we would do the opposite and select the last element. func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { if len(samples) == 0 { return promql.FPoint{F: math.NaN(), T: 0} @@ -78,6 +88,8 @@ func newLastWithTimestampIterator( } } +// lastWithTimestampBatchRangeVectorIterator returns an iterator that returns the +// last point in a windowed aggregation. type lastWithTimestampBatchRangeVectorIterator struct { *batchRangeVectorIterator at []promql.Sample @@ -129,7 +141,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { // Merge other results for i, m := range e.matrices { - // TODO: verify length and same labels/metric for j, series := range m { if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { @@ -137,7 +148,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } vec = e.merge(vec, j, len(m), series) - e.pop(i, j) } } @@ -154,6 +164,7 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { return true, ts, SampleVector(vec) } +// pop drops the float of the s'th series in the r'th matrix. func (e *mergeOverTimeStepEvaluator) pop(r, s int) { if len(e.matrices[r][s].Floats) <= 1 { e.matrices[r][s].Floats = nil @@ -162,9 +173,9 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) { e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] } +// inRange returns true if t is in step range of ts. func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { - previous := ts - e.step.Milliseconds() - return previous <= t && t < ts + return (ts-e.step.Milliseconds()) <= t && t < ts } func (e *mergeOverTimeStepEvaluator) hasNext() bool { From 87bbd75ae83294d8b178b9e6483b75779418e00b Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 29 Jan 2024 14:35:24 +0100 Subject: [PATCH 24/30] Split up documentation --- pkg/logql/first_last_over_time.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index f054441fdd4f..34c679474f91 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -36,6 +36,8 @@ type firstWithTimestampBatchRangeVectorIterator struct { at []promql.Sample } +// At aggregates the underlying window by picking the first sample with its +// timestamp. func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { if r.at == nil { r.at = make([]promql.Sample, 0, len(r.window)) @@ -54,14 +56,8 @@ func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { return ts, SampleVector(r.at) } -// The inner batchRangeVectorIterator has preloaded valid samples into each -// series (see the window filed and load). They're in increasing timestamp -// order (see next, moves the current timestamp forward by step). So this -// means that for each downstreamed shard of a first_over_time, selecting the -// first sample here in each iterator is getting us the earliest timestamped -// value. Later on when we merge we select the earliest from all the shards. -// -// For last_over_time we would do the opposite and select the last element. +// agg returns the first sample with its timestamp. The input is assumed to be +// in order. func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { if len(samples) == 0 { return promql.FPoint{F: math.NaN(), T: 0} @@ -95,6 +91,8 @@ type lastWithTimestampBatchRangeVectorIterator struct { at []promql.Sample } +// At aggregates the underlying window by picking the last sample with its +// timestamp. func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { if r.at == nil { r.at = make([]promql.Sample, 0, len(r.window)) @@ -113,6 +111,8 @@ func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { return ts, SampleVector(r.at) } +// agg returns the last sample with its timestamp. The input is assumed to be +// in order. func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { if len(samples) == 0 { return promql.FPoint{F: math.NaN(), T: 0} @@ -127,6 +127,7 @@ type mergeOverTimeStepEvaluator struct { merge func(promql.Vector, int, int, promql.Series) promql.Vector } +// Next returns the first or last element within one step of each matrix. func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { var ( @@ -215,6 +216,7 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv } } +// mergeFirstOverTime selects the first sample by timestamp of each series. func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector { if len(vec) < nSeries { return append(vec, promql.Sample{ @@ -251,6 +253,7 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva } } +// mergeLastOverTime selects the last sample by timestamp of each series. func mergeLastOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector { if len(vec) < nSeries { return append(vec, promql.Sample{ From 307bfc1301edb2e55de6e2827997b61e4826c065 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 29 Jan 2024 14:35:50 +0100 Subject: [PATCH 25/30] Drop commented TS --- pkg/logql/engine.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 21bc28f410c6..5932a11de09f 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -419,7 +419,6 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval seriesIndex[hash] = series } series.Floats = append(series.Floats, promql.FPoint{ - //T: ts, T: p.T, F: p.F, }) From 9e10b2cef37a44b41e4f659ae2dff36272145be5 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 23 Apr 2024 15:21:45 -0700 Subject: [PATCH 26/30] fixes after merging in main --- pkg/logql/downstream.go | 9 ++++----- pkg/logql/first_last_over_time.go | 2 +- pkg/logql/shardmapper.go | 18 ++++++------------ 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 605aa82e8547..33776c82b952 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -301,7 +301,6 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) { } } -<<<<<<< HEAD type MergeFirstOverTimeExpr struct { syntax.SampleExpr downstreams []DownstreamSampleExpr @@ -358,8 +357,6 @@ func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) { } } -======= ->>>>>>> main type Downstreamable interface { Downstreamer(context.Context) Downstreamer } @@ -559,7 +556,8 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( queries[i] = qry } - results, err := ev.Downstream(ctx, queries) + acc := NewBufferedAccumulator(len(queries)) + results, err := ev.Downstream(ctx, queries, acc) if err != nil { return nil, err } @@ -595,7 +593,8 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( queries[i] = qry } - results, err := ev.Downstream(ctx, queries) + acc := NewBufferedAccumulator(len(queries)) + results, err := ev.Downstream(ctx, queries, acc) if err != nil { return nil, err } diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 34c679474f91..df499e07626b 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -7,7 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/v3/pkg/iter" ) // newFirstWithTimestampIterator returns an iterator the returns the first value diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index f3156092bae5..be779a282d73 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -480,15 +480,12 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return nil, 0, err } - downstreams := make([]DownstreamSampleExpr, 0, shards) + downstreams := make([]DownstreamSampleExpr, 0, len(shards)) // This is the magic. We send a custom operation expr.Operation = syntax.OpRangeTypeFirstWithTimestamp - for shard := shards - 1; shard >= 0; shard-- { + for i := len(shards) - 1; i >= 0; i-- { downstreams = append(downstreams, DownstreamSampleExpr{ - shard: &astmapper.ShardAnnotation{ - Shard: shard, - Of: shards, - }, + shard: &shards[i], SampleExpr: expr, }) } @@ -507,14 +504,11 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return nil, 0, err } - downstreams := make([]DownstreamSampleExpr, 0, shards) + downstreams := make([]DownstreamSampleExpr, 0, len(shards)) expr.Operation = syntax.OpRangeTypeLastWithTimestamp - for shard := shards - 1; shard >= 0; shard-- { + for i := len(shards) - 1; i >= 0; i-- { downstreams = append(downstreams, DownstreamSampleExpr{ - shard: &astmapper.ShardAnnotation{ - Shard: shard, - Of: shards, - }, + shard: &shards[i], SampleExpr: expr, }) } From 69f504bb7c6cf57464a582defdd27fc54e75db30 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 7 May 2024 16:08:31 +0200 Subject: [PATCH 27/30] Return an empty result instead of nil --- pkg/logql/first_last_over_time.go | 4 ++-- pkg/logql/step_evaluator.go | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index df499e07626b..e24133d13bfe 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -197,7 +197,7 @@ func (*mergeOverTimeStepEvaluator) Error() error { return nil } func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { if len(m) == 0 { - return EmptyEvaluator{} + return EmptyEvaluator[SampleVector]{} } var ( @@ -234,7 +234,7 @@ func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.S func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { if len(m) == 0 { - return EmptyEvaluator{} + return EmptyEvaluator[SampleVector]{} } var ( diff --git a/pkg/logql/step_evaluator.go b/pkg/logql/step_evaluator.go index 3d4ffa192c22..6e9dfcf0645a 100644 --- a/pkg/logql/step_evaluator.go +++ b/pkg/logql/step_evaluator.go @@ -33,17 +33,19 @@ type StepEvaluator interface { Explain(Node) } -type EmptyEvaluator struct{} +type EmptyEvaluator[R StepResult] struct{ + value R +} -var _ StepEvaluator = EmptyEvaluator{} +var _ StepEvaluator = EmptyEvaluator[SampleVector]{} // Close implements StepEvaluator. -func (EmptyEvaluator) Close() error { return nil } +func (EmptyEvaluator[_]) Close() error { return nil } // Error implements StepEvaluator. -func (EmptyEvaluator) Error() error { return nil } +func (EmptyEvaluator[_]) Error() error { return nil } // Next implements StepEvaluator. -func (EmptyEvaluator) Next() (ok bool, ts int64, r StepResult) { - return false, 0, nil +func (e EmptyEvaluator[_]) Next() (ok bool, ts int64, r StepResult) { + return false, 0, e.value } From 27a5d66a0c6761668c3f8f88daba5dacad573549 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 7 May 2024 23:27:06 -0700 Subject: [PATCH 28/30] fix other minor issues Signed-off-by: Callum Styan --- pkg/logql/downstream.go | 4 ++-- pkg/logql/explain.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 6d0dcfe9be5d..f28eaddde3f1 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -540,7 +540,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( if shard := d.shard; shard != nil { qry.Params = ParamsWithShardsOverride{ Params: qry.Params, - ShardsOverride: Shards{*shard}.Encode(), + ShardsOverride: Shards{shard.Shard}.Encode(), } } queries[i] = qry @@ -577,7 +577,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( if shard := d.shard; shard != nil { qry.Params = ParamsWithShardsOverride{ Params: qry.Params, - ShardsOverride: Shards{*shard}.Encode(), + ShardsOverride: Shards{shard.Shard}.Encode(), } } queries[i] = qry diff --git a/pkg/logql/explain.go b/pkg/logql/explain.go index b29b4eb6a42a..22240f5804b3 100644 --- a/pkg/logql/explain.go +++ b/pkg/logql/explain.go @@ -67,6 +67,6 @@ func (e *mergeOverTimeStepEvaluator) Explain(parent Node) { parent.Child("MergeFirstOverTime") } -func (EmptyEvaluator) Explain(parent Node) { +func (EmptyEvaluator[SampleVector]) Explain(parent Node) { parent.Child("Empty") } From 1b09ad3e2c4fddefc8ee394e2c00e5089e7504bd Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 7 May 2024 23:27:24 -0700 Subject: [PATCH 29/30] handle sharding noop case for first/last over time Signed-off-by: Callum Styan --- pkg/logql/shardmapper.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 41d4f9475dd2..67b35b809df1 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -481,6 +481,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, if err != nil { return nil, 0, err } + if len(shards) == 0 { + return noOp(expr, m.shards.Resolver()) + } downstreams := make([]DownstreamSampleExpr, 0, len(shards)) // This is the magic. We send a custom operation @@ -505,6 +508,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, if err != nil { return nil, 0, err } + if len(shards) == 0 { + return noOp(expr, m.shards.Resolver()) + } downstreams := make([]DownstreamSampleExpr, 0, len(shards)) expr.Operation = syntax.OpRangeTypeLastWithTimestamp From bd7f2759216e2c391f0eb7117a879dcd1f2d2396 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 10 May 2024 14:59:21 -0700 Subject: [PATCH 30/30] fix formatting Signed-off-by: Callum Styan --- pkg/logql/step_evaluator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logql/step_evaluator.go b/pkg/logql/step_evaluator.go index 6e9dfcf0645a..23b313f18da2 100644 --- a/pkg/logql/step_evaluator.go +++ b/pkg/logql/step_evaluator.go @@ -33,7 +33,7 @@ type StepEvaluator interface { Explain(Node) } -type EmptyEvaluator[R StepResult] struct{ +type EmptyEvaluator[R StepResult] struct { value R } @@ -47,5 +47,5 @@ func (EmptyEvaluator[_]) Error() error { return nil } // Next implements StepEvaluator. func (e EmptyEvaluator[_]) Next() (ok bool, ts int64, r StepResult) { - return false, 0, e.value + return false, 0, e.value }