Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding optimizations I: AST mapping #1846

Merged
merged 21 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,18 @@ func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string)
}

const (
OpTypeSum = "sum"
OpTypeAvg = "avg"
OpTypeMax = "max"
OpTypeMin = "min"
OpTypeCount = "count"
OpTypeStddev = "stddev"
OpTypeStdvar = "stdvar"
OpTypeBottomK = "bottomk"
OpTypeTopK = "topk"
// vector ops
OpTypeSum = "sum"
OpTypeAvg = "avg"
OpTypeMax = "max"
OpTypeMin = "min"
OpTypeCount = "count"
OpTypeStddev = "stddev"
OpTypeStdvar = "stdvar"
OpTypeBottomK = "bottomk"
OpTypeTopK = "topk"

// range vector ops
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"

Expand All @@ -217,6 +220,8 @@ func IsLogicalBinOp(op string) bool {
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
// Operations returns the list of operations used in this SampleExpr
Operations() []string
Expr
}

Expand Down Expand Up @@ -244,6 +249,11 @@ func (e *rangeAggregationExpr) String() string {
return formatOperation(e.operation, nil, e.left.String())
}

// impl SampleExpr
func (e *rangeAggregationExpr) Operations() []string {
return []string{e.operation}
}

type grouping struct {
groups []string
without bool
Expand Down Expand Up @@ -320,6 +330,11 @@ func (e *vectorAggregationExpr) String() string {
return formatOperation(e.operation, e.grouping, params...)
}

// impl SampleExpr
func (e *vectorAggregationExpr) Operations() []string {
return append(e.left.Operations(), e.operation)
}

type binOpExpr struct {
SampleExpr
RHS SampleExpr
Expand All @@ -330,6 +345,12 @@ func (e *binOpExpr) String() string {
return fmt.Sprintf("%s %s %s", e.SampleExpr.String(), e.op, e.RHS.String())
}

// impl SampleExpr
func (e *binOpExpr) Operations() []string {
ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...)
return append(ops, e.op)
}

func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
left, ok := lhs.(SampleExpr)
if !ok {
Expand Down Expand Up @@ -386,7 +407,7 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
// This is because literals need match all labels, which is currently difficult to encode into StepEvaluators.
// Therefore, we ensure a binop can be reduced/simplified, maintaining the invariant that it does not have two literal legs.
func reduceBinOp(op string, left, right *literalExpr) *literalExpr {
merged := (&defaultEvaluator{}).mergeBinOp(
merged := mergeBinOp(
op,
&promql.Sample{Point: promql.Point{V: left.value}},
&promql.Sample{Point: promql.Point{V: right.value}},
Expand Down Expand Up @@ -423,6 +444,7 @@ func (e *literalExpr) String() string {
// to facilitate sum types. We'll be type switching when evaluating them anyways
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) Operations() []string { return nil }
func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }

Expand Down
26 changes: 26 additions & 0 deletions pkg/logql/astmapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package logql

import (
"fmt"

"github.com/pkg/errors"
)

// ASTMapper is the exported interface for mapping between multiple AST representations
type ASTMapper interface {
Map(Expr) (Expr, error)
}

// CloneExpr is a helper function to clone a node.
func CloneExpr(expr Expr) (Expr, error) {
return ParseExpr(expr.String())
}

func badASTMapping(expected string, got Expr) error {
return fmt.Errorf("Bad AST mapping: expected one type (%s), but got (%T)", expected, got)
}

// MapperUnsuportedType is a helper for signaling that an evaluator does not support an Expr type
func MapperUnsupportedType(expr Expr, m ASTMapper) error {
return errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}
2 changes: 1 addition & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
return ng.evalLiteral(ctx, lit, q)
}

stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q)
stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q)
if err != nil {
return nil, err
}
Expand Down
93 changes: 56 additions & 37 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,20 @@ func GetRangeType(q Params) QueryRangeType {
return RangeType
}

// Evaluator is an interface for iterating over data at different nodes in the AST
// StepEvaluator is an interface for iterating over data at different nodes in the AST
owen-d marked this conversation as resolved.
Show resolved Hide resolved
type Evaluator interface {
// Evaluator returns a StepEvaluator for a given SampleExpr
Evaluator(context.Context, SampleExpr, Params) (StepEvaluator, error)
// StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible
// StepEvaluator implementations which can be composed.
StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error)
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error)
}

// EvaluatorUnsupportedType is a helper for signaling that an evaluator does not support an Expr type
func EvaluatorUnsupportedType(expr Expr, ev Evaluator) error {
return errors.Errorf("unexpected expr type (%T) for Evaluator type (%T) ", expr, ev)
}

type defaultEvaluator struct {
maxLookBackPeriod time.Duration
querier Querier
Expand All @@ -99,21 +105,43 @@ func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr,

}

func (ev *defaultEvaluator) Evaluator(ctx context.Context, expr SampleExpr, q Params) (StepEvaluator, error) {
func (ev *defaultEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
expr SampleExpr,
q Params,
) (StepEvaluator, error) {
switch e := expr.(type) {
case *vectorAggregationExpr:
return ev.vectorAggEvaluator(ctx, e, q)
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
return ev.rangeAggEvaluator(ctx, e, q)
entryIter, err := ev.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Limit: 0,
Direction: logproto.FORWARD,
Selector: expr.Selector().String(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(entryIter, e, q)
case *binOpExpr:
return ev.binOpEvaluator(ctx, e, q)
return binOpStepEvaluator(ctx, nextEv, e, q)
default:
return nil, errors.Errorf("unexpected type (%T): %v", e, e)
return nil, EvaluatorUnsupportedType(e, ev)
}
}

func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vectorAggregationExpr, q Params) (StepEvaluator, error) {
nextEvaluator, err := ev.Evaluator(ctx, expr.left, q)
func vectorAggEvaluator(
ctx context.Context,
ev Evaluator,
expr *vectorAggregationExpr,
q Params,
) (StepEvaluator, error) {
nextEvaluator, err := ev.StepEvaluator(ctx, ev, expr.left, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,21 +330,11 @@ func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vector
}, nextEvaluator.Close)
}

func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAggregationExpr, q Params) (StepEvaluator, error) {
entryIter, err := ev.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.Start().Add(-expr.left.interval),
End: q.End(),
Limit: 0,
Direction: logproto.FORWARD,
Selector: expr.Selector().String(),
},
})

if err != nil {
return nil, err
}

func rangeAggEvaluator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love those changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if evaluator is big enough to become a package. Just a thought, not action required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's a good idea, let's see where everything settles and then do some refactoring

entryIter iter.EntryIterator,
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())

Expand All @@ -341,8 +359,9 @@ func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAg

// binOpExpr explicly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func (ev *defaultEvaluator) binOpEvaluator(
func binOpStepEvaluator(
ctx context.Context,
ev Evaluator,
expr *binOpExpr,
q Params,
) (StepEvaluator, error) {
Expand All @@ -352,26 +371,26 @@ func (ev *defaultEvaluator) binOpEvaluator(

// match a literal expr with all labels in the other leg
if lOk {
rhs, err := ev.Evaluator(ctx, expr.RHS, q)
rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
return nil, err
}
return ev.literalEvaluator(expr.op, leftLit, rhs, false)
return literalStepEvaluator(expr.op, leftLit, rhs, false)
}
if rOk {
lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q)
lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
return ev.literalEvaluator(expr.op, rightLit, lhs, true)
return literalStepEvaluator(expr.op, rightLit, lhs, true)
}

// we have two non literal legs
lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q)
lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
rhs, err := ev.Evaluator(ctx, expr.RHS, q)
rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -409,7 +428,7 @@ func (ev *defaultEvaluator) binOpEvaluator(
for _, pair := range pairs {

// merge
if merged := ev.mergeBinOp(expr.op, pair[0], pair[1]); merged != nil {
if merged := mergeBinOp(expr.op, pair[0], pair[1]); merged != nil {
results = append(results, *merged)
}
}
Expand All @@ -425,7 +444,7 @@ func (ev *defaultEvaluator) binOpEvaluator(
})
}

func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *promql.Sample {
func mergeBinOp(op string, left, right *promql.Sample) *promql.Sample {
var merger func(left, right *promql.Sample) *promql.Sample

switch op {
Expand Down Expand Up @@ -554,9 +573,9 @@ func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *p

}

// literalEvaluator merges a literal with a StepEvaluator. Since order matters in
// literalStepEvaluator merges a literal with a StepEvaluator. Since order matters in
// non commutative operations, inverted should be true when the literalExpr is not the left argument.
func (ev *defaultEvaluator) literalEvaluator(
func literalStepEvaluator(
op string,
lit *literalExpr,
eval StepEvaluator,
Expand All @@ -578,7 +597,7 @@ func (ev *defaultEvaluator) literalEvaluator(
left, right = right, left
}

if merged := ev.mergeBinOp(
if merged := mergeBinOp(
op,
left,
right,
Expand Down
5 changes: 2 additions & 3 deletions pkg/logql/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
)

func TestDefaultEvaluator_DivideByZero(t *testing.T) {
ev := &defaultEvaluator{}

require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeDiv,
require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeDiv,
&promql.Sample{
Point: promql.Point{T: 1, V: 1},
},
Expand All @@ -20,7 +19,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
).Point.V))

require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeMod,
require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeMod,
&promql.Sample{
Point: promql.Point{T: 1, V: 1},
},
Expand Down
38 changes: 38 additions & 0 deletions pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,44 @@ func TestParse(t *testing.T) {
exp Expr
err error
}{
{
// test [12h] before filter expr
in: `count_over_time({foo="bar"}[12h] |= "error")`,
exp: &rangeAggregationExpr{
operation: "count_over_time",
left: &logRange{
left: &filterExpr{
ty: labels.MatchEqual,
match: "error",
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
interval: 12 * time.Hour,
},
},
},
{
// test [12h] after filter expr
in: `count_over_time({foo="bar"} |= "error" [12h])`,
exp: &rangeAggregationExpr{
operation: "count_over_time",
left: &logRange{
left: &filterExpr{
ty: labels.MatchEqual,
match: "error",
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
interval: 12 * time.Hour,
},
},
},
{
in: `{foo="bar"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Expand Down
Loading