Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TraceQL performance improvement: dynamic reordering of binop branches #4163

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.
* [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Improve performance of TraceQL queries [#4114](https://github.com/grafana/tempo/pull/4114) (@mdisibio)
* [ENHANCEMENT] Improve performance of TraceQL queries [#4163](https://github.com/grafana/tempo/pull/4163) (@mdisibio)
* [ENHANCEMENT] Implement simple Fetch by key for cache items [#4032](https://github.com/grafana/tempo/pull/4032) (@javiermolinar)
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
Expand Down
6 changes: 6 additions & 0 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ type BinaryOperation struct {
RHS FieldExpression

compiledExpression *regexp.Regexp

b branchOptimizer
}

func newBinaryOperation(op Operator, lhs, rhs FieldExpression) FieldExpression {
Expand All @@ -417,6 +419,10 @@ func newBinaryOperation(op Operator, lhs, rhs FieldExpression) FieldExpression {
}
}

if (op == OpAnd || op == OpOr) && binop.referencesSpan() {
binop.b = newBranchPredictor(2, 1000)
}

return binop
}

Expand Down
71 changes: 61 additions & 10 deletions pkg/traceql/ast_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,30 +325,50 @@ func (a Aggregate) evaluate(input []*Spanset) (output []*Spanset, err error) {
}

func (o *BinaryOperation) execute(span Span) (Static, error) {
recording := o.b.Recording
if recording {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is crafted to stay out of the hot-path as much as possible. There may be a way to improve it even further. However I can say that previously pulling LHS.execute into a function pointer made things slower.

o.b.Start()
}

lhs, err := o.LHS.execute(span)
if err != nil {
return NewStaticNil(), err
}

if recording {
o.b.Finish(leftBranch)
}

// Look for cases where we don't even need to evalulate the RHS
if lhsB, ok := lhs.Bool(); ok {
if o.Op == OpAnd && !lhsB {
// x && y
// x is false so we don't need to evalulate y
return StaticFalse, nil
}
if o.Op == OpOr && lhsB {
// x || y
// x is true so we don't need to evalulate y
return StaticTrue, nil
// But wait until we have enough samples so we can optimize
if !recording {
if lhsB, ok := lhs.Bool(); ok {
if o.Op == OpAnd && !lhsB {
// x && y
// x is false so we don't need to evalulate y
return StaticFalse, nil
}
if o.Op == OpOr && lhsB {
// x || y
// x is true so we don't need to evalulate y
return StaticTrue, nil
}
}
}

if recording {
o.b.Start()
}

rhs, err := o.RHS.execute(span)
if err != nil {
return NewStaticNil(), err
}

if recording {
o.b.Finish(rightBranch)
}

// Ensure the resolved types are still valid
lhsT := lhs.Type
rhsT := rhs.Type
Expand Down Expand Up @@ -428,6 +448,37 @@ func (o *BinaryOperation) execute(span Span) (Static, error) {
lhsB, _ := lhs.Bool()
rhsB, _ := rhs.Bool()

if recording {
switch o.Op {
case OpAnd:
if !lhsB {
// Record cost of wasted rhs execution
o.b.Penalize(rightBranch)
}
if !rhsB {
// Record cost of wasted lhs execution
o.b.Penalize(leftBranch)
}
case OpOr:
if rhsB {
// Record cost of wasted lhs execution
o.b.Penalize(rightBranch)
}
if lhsB {
// Record cost of wasated rhs execution
o.b.Penalize(leftBranch)
}
}

if done := o.b.Sampled(); done {
if o.b.OptimalBranch() == rightBranch {
// RHS is the optimal starting branch,
// so swap the elements now.
o.LHS, o.RHS = o.RHS, o.LHS
}
}
}

switch o.Op {
case OpAnd:
return NewStaticBool(lhsB && rhsB), nil
Expand Down
60 changes: 60 additions & 0 deletions pkg/traceql/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package traceql

import (
"time"

"github.com/grafana/tempo/pkg/tempopb"
"go.opentelemetry.io/otel"
)
Expand Down Expand Up @@ -81,3 +83,61 @@ func (b *bucketSet) addAndTest(i int) bool {
b.buckets[b.sz]++
return false
}

const (
leftBranch = 0
rightBranch = 1
)

type branchOptimizer struct {
start time.Time
last []time.Duration
totals []time.Duration
Recording bool
samplesRemaining int
}

func newBranchPredictor(numBranches int, numSamples int) branchOptimizer {
return branchOptimizer{
totals: make([]time.Duration, numBranches),
last: make([]time.Duration, numBranches),
samplesRemaining: numSamples,
Recording: true,
}
}

// Start recording. Should be called immediately prior to a branch execution.
func (b *branchOptimizer) Start() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about StartRecording and StopRecording?

b.start = time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the same last to track the start value? that way we don't need to share that variable:

b.last[branch] = time.Now()

and in the Finish method:

b.last[branch] = time.Since(b.last[branch])

We can rename last to be more accurate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea. The variable types are different, time.Time vs time.Duration, will have to see if the overhead of getting unix nanoseconds is less than the var.

joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}

// Finish the recording and temporarily save the cost for the given branch number.
func (b *branchOptimizer) Finish(branch int) {
b.last[branch] = time.Since(b.start)
}

// Penalize the given branch using it's previously recorded cost. This is called after
// executing all branches and then knowing in retrospect which ones were not needed.
func (b *branchOptimizer) Penalize(branch int) {
b.totals[branch] += b.last[branch]
}

// Sampled indicates that a full execution was done and see if we have enough samples.
func (b *branchOptimizer) Sampled() (done bool) {
b.samplesRemaining--
b.Recording = b.samplesRemaining > 0
return !b.Recording
}
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

// OptimalBranch returns the branch with the least penalized cost over time, i.e. the optimal one to start with.
func (b *branchOptimizer) OptimalBranch() int {
mini := 0
min := b.totals[0]
for i := 1; i < len(b.totals); i++ {
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
if b.totals[i] < min {
mini = i
min = b.totals[i]
}
}
return mini
}