diff --git a/query/stdlib/influxdata/influxdb/multi_measure_test.flux b/query/stdlib/influxdata/influxdb/multi_measure_test.flux new file mode 100644 index 00000000000..6fecd0ddd62 --- /dev/null +++ b/query/stdlib/influxdata/influxdb/multi_measure_test.flux @@ -0,0 +1,261 @@ +package influxdb_test + +import "csv" +import "testing" + +option now = () => 2030-01-01T00:00:00Z + +input = " +#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.72 +,,0,2018-05-22T19:53:46Z,system,host.local,load1,1.74 +,,0,2018-05-22T19:53:56Z,system,host.local,load1,1.63 +,,0,2018-05-22T19:54:06Z,system,host.local,load1,1.91 +,,0,2018-05-22T19:54:16Z,system,host.local,load1,1.84 + +,,1,2018-05-22T19:53:26Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:53:36Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:46Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:56Z,sys,host.local,load3,1.96 +,,1,2018-05-22T19:54:06Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:54:16Z,sys,host.local,load3,1.97 + +,,2,2018-05-22T19:53:26Z,system,host.local,load5,1.95 +,,2,2018-05-22T19:53:36Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:46Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:56Z,system,host.local,load5,1.89 +,,2,2018-05-22T19:54:06Z,system,host.local,load5,1.94 +,,2,2018-05-22T19:54:16Z,system,host.local,load5,1.93 + +,,3,2018-05-22T19:53:26Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:53:36Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:46Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:56Z,var,host.local,load3,91.96 +,,3,2018-05-22T19:54:06Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:54:16Z,var,host.local,load3,91.97 + +,,4,2018-05-22T19:53:26Z,swap,host.global,used_percent,82.98 +,,4,2018-05-22T19:53:36Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:46Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:56Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:06Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:16Z,swap,host.global,used_percent,82.64 + +#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,loc,_field,_value +,,0,2018-05-22T19:53:26Z,locale,en,lat,37.09 +,,0,2018-05-22T19:53:36Z,locale,en,lat,37.10 +,,0,2018-05-22T19:53:46Z,locale,en,lat,37.08 +" + +testcase multi_measure { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "system" or r["_measurement"] == "sys") + |> filter(fn: (r) => r["_field"] == "load1" or r["_field"] == "load3") + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.72 +,,0,2018-05-22T19:53:46Z,system,host.local,load1,1.74 +,,0,2018-05-22T19:53:56Z,system,host.local,load1,1.63 +,,0,2018-05-22T19:54:06Z,system,host.local,load1,1.91 +,,0,2018-05-22T19:54:16Z,system,host.local,load1,1.84 +,,1,2018-05-22T19:53:26Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:53:36Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:46Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:56Z,sys,host.local,load3,1.96 +,,1,2018-05-22T19:54:06Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:54:16Z,sys,host.local,load3,1.97 +") + + testing.diff(got, want) +} + +testcase multi_measure_match_all { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "system" or r["_measurement"] == "sys" or r["_measurement"] == "var" or r["_measurement"] == "swap") + |> filter(fn: (r) => r["_field"] == "load1" or r["_field"] == "load3" or r["_field"] == "load5" or r["_field"] == "used_percent") + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.72 +,,0,2018-05-22T19:53:46Z,system,host.local,load1,1.74 +,,0,2018-05-22T19:53:56Z,system,host.local,load1,1.63 +,,0,2018-05-22T19:54:06Z,system,host.local,load1,1.91 +,,0,2018-05-22T19:54:16Z,system,host.local,load1,1.84 +,,1,2018-05-22T19:53:26Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:53:36Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:46Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:56Z,sys,host.local,load3,1.96 +,,1,2018-05-22T19:54:06Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:54:16Z,sys,host.local,load3,1.97 +,,2,2018-05-22T19:53:26Z,system,host.local,load5,1.95 +,,2,2018-05-22T19:53:36Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:46Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:56Z,system,host.local,load5,1.89 +,,2,2018-05-22T19:54:06Z,system,host.local,load5,1.94 +,,2,2018-05-22T19:54:16Z,system,host.local,load5,1.93 +,,3,2018-05-22T19:53:26Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:53:36Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:46Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:56Z,var,host.local,load3,91.96 +,,3,2018-05-22T19:54:06Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:54:16Z,var,host.local,load3,91.97 +,,4,2018-05-22T19:53:26Z,swap,host.global,used_percent,82.98 +,,4,2018-05-22T19:53:36Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:46Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:56Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:06Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:16Z,swap,host.global,used_percent,82.64 +") + + testing.diff(got, want) +} + +testcase multi_measure_tag_filter { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "system" or r["_measurement"] == "swap") + |> filter(fn: (r) => r["_field"] == "load1" or r["_field"] == "load3" or r["_field"] == "used_percent") + |> filter(fn: (r) => r["host"] == "host.local" or r["host"] == "host.global") + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.72 +,,0,2018-05-22T19:53:46Z,system,host.local,load1,1.74 +,,0,2018-05-22T19:53:56Z,system,host.local,load1,1.63 +,,0,2018-05-22T19:54:06Z,system,host.local,load1,1.91 +,,0,2018-05-22T19:54:16Z,system,host.local,load1,1.84 +,,4,2018-05-22T19:53:26Z,swap,host.global,used_percent,82.98 +,,4,2018-05-22T19:53:36Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:46Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:56Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:06Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:16Z,swap,host.global,used_percent,82.64 +") + + testing.diff(got, want) +} + +testcase multi_measure_complex_or { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => (r["_measurement"] == "system" or r["_measurement"] == "swap") or (r["_measurement"] != "var" and r["host"] == "host.local")) + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.72 +,,0,2018-05-22T19:53:46Z,system,host.local,load1,1.74 +,,0,2018-05-22T19:53:56Z,system,host.local,load1,1.63 +,,0,2018-05-22T19:54:06Z,system,host.local,load1,1.91 +,,0,2018-05-22T19:54:16Z,system,host.local,load1,1.84 +,,2,2018-05-22T19:53:26Z,system,host.local,load5,1.95 +,,2,2018-05-22T19:53:36Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:46Z,system,host.local,load5,1.92 +,,2,2018-05-22T19:53:56Z,system,host.local,load5,1.89 +,,2,2018-05-22T19:54:06Z,system,host.local,load5,1.94 +,,2,2018-05-22T19:54:16Z,system,host.local,load5,1.93 +,,4,2018-05-22T19:53:26Z,swap,host.global,used_percent,82.98 +,,4,2018-05-22T19:53:36Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:46Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:56Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:06Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:16Z,swap,host.global,used_percent,82.64 +,,1,2018-05-22T19:53:26Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:53:36Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:46Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:56Z,sys,host.local,load3,1.96 +,,1,2018-05-22T19:54:06Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:54:16Z,sys,host.local,load3,1.97 +") + + testing.diff(got, want) +} + +testcase multi_measure_complex_and { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] != "system" or r["_measurement"] == "swap") + |> filter(fn: (r) => r["_measurement"] == "swap" or r["_measurement"] == "var") + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,4,2018-05-22T19:53:26Z,swap,host.global,used_percent,82.98 +,,4,2018-05-22T19:53:36Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:46Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:53:56Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:06Z,swap,host.global,used_percent,82.59 +,,4,2018-05-22T19:54:16Z,swap,host.global,used_percent,82.64 +,,3,2018-05-22T19:53:26Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:53:36Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:46Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:56Z,var,host.local,load3,91.96 +,,3,2018-05-22T19:54:06Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:54:16Z,var,host.local,load3,91.97 +") + + testing.diff(got, want) +} + +testcase multi_measure_negation { + got = testing.loadStorage(csv: input) + |> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] != "system") + |> filter(fn: (r) => r["host"] == "host.local" or not exists r["host"]) + |> drop(columns: ["_start", "_stop"]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,1,2018-05-22T19:53:26Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:53:36Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:46Z,sys,host.local,load3,1.97 +,,1,2018-05-22T19:53:56Z,sys,host.local,load3,1.96 +,,1,2018-05-22T19:54:06Z,sys,host.local,load3,1.98 +,,1,2018-05-22T19:54:16Z,sys,host.local,load3,1.97 +,,3,2018-05-22T19:53:26Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:53:36Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:46Z,var,host.local,load3,91.97 +,,3,2018-05-22T19:53:56Z,var,host.local,load3,91.96 +,,3,2018-05-22T19:54:06Z,var,host.local,load3,91.98 +,,3,2018-05-22T19:54:16Z,var,host.local,load3,91.97 + +#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,loc,_field,_value +,,0,2018-05-22T19:53:26Z,locale,en,lat,37.09 +,,0,2018-05-22T19:53:36Z,locale,en,lat,37.10 +,,0,2018-05-22T19:53:46Z,locale,en,lat,37.08 +") + + testing.diff(got, want) +} diff --git a/tsdb/shard.go b/tsdb/shard.go index 21e67b77ec6..9a61c66b530 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -42,6 +42,7 @@ const ( statWritePointsOK = "writePointsOk" statWriteBytes = "writeBytes" statDiskBytes = "diskBytes" + measurementKey = "_name" ) var ( @@ -1289,9 +1290,12 @@ func (a Shards) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error // FieldKeysByPredicate returns the field keys for series that match // the given predicate. func (a Shards) FieldKeysByPredicate(expr influxql.Expr) (map[string][]string, error) { - names, err := a.MeasurementNamesByPredicate(expr) - if err != nil { - return nil, err + names, ok := measurementOptimization(expr, measurementKey) + if !ok { + var err error + if names, err = a.MeasurementNamesByPredicate(expr); err != nil { + return nil, err + } } all := make(map[string][]string, len(names)) @@ -1301,6 +1305,134 @@ func (a Shards) FieldKeysByPredicate(expr influxql.Expr) (map[string][]string, e return all, nil } +// consecutiveAndChildren finds all child nodes of consecutive +// influxql.BinaryExpr with AND operator nodes ("AND nodes") which are not +// themselves AND nodes. This may be the root of the tree if the root of the +// tree is not an AND node. +type consecutiveAndChildren struct { + children []influxql.Node +} + +func (v *consecutiveAndChildren) Visit(node influxql.Node) influxql.Visitor { + switch n := node.(type) { + case *influxql.BinaryExpr: + if n.Op == influxql.AND { + return v + } + case *influxql.ParenExpr: + // Parens are essentially a no-op and can be traversed through. + return v + } + + // If this wasn't a BinaryExpr with an AND operator or a Paren, record this + // child node and stop the search for this branch. + v.children = append(v.children, node) + return nil +} + +// orMeasurementTree determines if a tree (or subtree) represents a grouping of +// exclusively measurement names OR'd together with EQ operators for the +// measurements themselves. It collects the list of measurement names +// encountered and records the validity of the tree. +type orMeasurementTree struct { + measurementKey string + measurementNames []string + valid bool +} + +func (v *orMeasurementTree) Visit(node influxql.Node) influxql.Visitor { + // Return early if this tree has already been invalidated - no reason to + // continue evaluating at that point. + if !v.valid { + return nil + } + + switch n := node.(type) { + case *influxql.BinaryExpr: + // A BinaryExpr must have an operation of OR or EQ in a valid tree + if n.Op == influxql.OR { + return v + } else if n.Op == influxql.EQ { + // An EQ must be in the form of "v.measurementKey == measurementName" in a + // valid tree + if name, ok := measurementNameFromEqBinary(n, v.measurementKey); ok { + v.measurementNames = append(v.measurementNames, name) + // If a valid measurement key/value was found, there is no need to + // continue evaluating the VarRef/StringLiteral child nodes of this + // node. + return nil + } + } + case *influxql.ParenExpr: + // Parens are essentially a no-op and can be traversed through. + return v + } + + // The the type switch didn't already return, this tree is invalid. + v.valid = false + return nil +} + +func measurementOptimization(expr influxql.Expr, key string) ([][]byte, bool) { + // A measurement optimization is possible if the query contains a single group + // of one or more measurements (in the form of _measurement = measName, + // equality operator only) grouped together by OR operators, with the subtree + // containing the OR'd measurements accessible from root of the tree either + // directly (tree contains nothing but OR'd measurements) or by traversing AND + // binary expression nodes. + + // Get a list of "candidate" measurement subtrees. + v := consecutiveAndChildren{} + influxql.Walk(&v, expr) + possibleSubtrees := v.children + + // Evaluate the candidate subtrees to determine which measurement names they + // contain, and to see if they are valid for the optimization. + validSubtrees := []orMeasurementTree{} + for _, h := range possibleSubtrees { + t := orMeasurementTree{ + measurementKey: key, + valid: true, + } + influxql.Walk(&t, h) + if t.valid { + validSubtrees = append(validSubtrees, t) + } + } + + // There must be exactly one valid measurement subtree for this optimization + // to be applied. Note: It may also be possible to have measurements in + // multiple subtrees, as long as there are no measurements in invalid + // subtrees, by determining an intersection of the measurement names across + // all valid subtrees - this is not currently implemented. + if len(validSubtrees) != 1 { + return nil, false + } + + return slices.StringsToBytes(validSubtrees[0].measurementNames...), true +} + +// measurementNameFromEqBinary returns the name of a measurement from a binary +// expression if possible, and a boolean status indicating if the binary +// expression contained a measurement name. A meausurement name will only be +// returned if the operator for the binary is EQ, and the measurement key is on +// the LHS with the measurement name on the RHS. +func measurementNameFromEqBinary(be *influxql.BinaryExpr, key string) (string, bool) { + lhs, ok := be.LHS.(*influxql.VarRef) + if !ok { + return "", false + } else if lhs.Val != key { + return "", false + } + + rhs, ok := be.RHS.(*influxql.StringLiteral) + if !ok { + return "", false + } + + return rhs.Val, true +} + func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) diff --git a/tsdb/shard_internal_test.go b/tsdb/shard_internal_test.go index 8858371d792..e6ca2a53a9d 100644 --- a/tsdb/shard_internal_test.go +++ b/tsdb/shard_internal_test.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxql" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -203,6 +204,125 @@ mem,host=serverB value=50i,val3=t 10 } } +func TestShard_MeasurementOptimization(t *testing.T) { + t.Parallel() + + cases := []struct { + expr influxql.Expr + name string + ok bool + names [][]byte + }{ + { + expr: influxql.MustParseExpr(`_name = 'm0'`), + name: "single measurement", + ok: true, + names: [][]byte{[]byte("m0")}, + }, + { + expr: influxql.MustParseExpr(`_something = 'f' AND _name = 'm0'`), + name: "single measurement with AND", + ok: true, + names: [][]byte{[]byte("m0")}, + }, + { + expr: influxql.MustParseExpr(`_something = 'f' AND (a =~ /x0/ AND _name = 'm0')`), + name: "single measurement with multiple AND", + ok: true, + names: [][]byte{[]byte("m0")}, + }, + { + expr: influxql.MustParseExpr(`_name = 'm0' OR _name = 'm1' OR _name = 'm2'`), + name: "multiple measurements alone", + ok: true, + names: [][]byte{[]byte("m0"), []byte("m1"), []byte("m2")}, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR _name = 'm1' OR _name = 'm2') AND (_field = 'foo' OR _field = 'bar' OR _field = 'qux')`), + name: "multiple measurements combined", + ok: true, + names: [][]byte{[]byte("m0"), []byte("m1"), []byte("m2")}, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR (_name = 'm1' OR _name = 'm2')) AND tag1 != 'foo'`), + name: "parens in expression", + ok: true, + names: [][]byte{[]byte("m0"), []byte("m1"), []byte("m2")}, + }, + { + expr: influxql.MustParseExpr(`(tag1 != 'foo' OR tag2 = 'bar') AND (_name = 'm0' OR _name = 'm1' OR _name = 'm2') AND (_field = 'val1' OR _field = 'val2')`), + name: "multiple AND", + ok: true, + names: [][]byte{[]byte("m0"), []byte("m1"), []byte("m2")}, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR _name = 'm1' OR _name = 'm2') AND (tag1 != 'foo' OR _name = 'm1')`), + name: "measurements on in multiple groups, only one valid group", + ok: true, + names: [][]byte{[]byte("m0"), []byte("m1"), []byte("m2")}, + }, + { + expr: influxql.MustParseExpr(`_name = 'm0' OR tag1 != 'foo'`), + name: "single measurement with OR", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`_name = 'm0' OR true`), + name: "measurement with OR boolean literal", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`_name != 'm0' AND tag1 != 'foo'`), + name: "single measurement with non-equal", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR _name != 'm1' OR _name = 'm2') AND (_field = 'foo' OR _field = 'bar' OR _field = 'qux')`), + name: "multiple measurements with non-equal", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`tag1 = 'foo' AND tag2 = 'bar'`), + name: "no measurements - multiple tags", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`_field = 'foo'`), + name: "no measurements - single field", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR _name = 'm1' AND _name = 'm2') AND tag1 != 'foo'`), + name: "measurements with AND", + ok: false, + names: nil, + }, + { + expr: influxql.MustParseExpr(`(_name = 'm0' OR _name = 'm1' OR _name = 'm2') OR (tag1 != 'foo' OR _name = 'm1')`), + name: "top level is not AND", + ok: false, + names: nil, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + names, ok := measurementOptimization(tc.expr, measurementKey) + require.Equal(t, tc.names, names) + require.Equal(t, tc.ok, ok) + }) + } +} + // TempShard represents a test wrapper for Shard that uses temporary // filesystem paths. type TempShard struct { diff --git a/v1/services/storage/predicate_influxql.go b/v1/services/storage/predicate_influxql.go index 76f07110016..66b72df3dc5 100644 --- a/v1/services/storage/predicate_influxql.go +++ b/v1/services/storage/predicate_influxql.go @@ -25,56 +25,6 @@ func RewriteExprRemoveFieldKeyAndValue(expr influxql.Expr) influxql.Expr { }) } -// HasSingleMeasurementNoOR determines if an index optimisation is available. -// -// Typically the read service will use the query engine to retrieve all field -// keys for all measurements that match the expression, which can be very -// inefficient if it can be proved that only one measurement matches the expression. -// -// This condition is determined when the following is true: -// -// * there is only one occurrence of the tag key `_measurement`. -// * there are no OR operators in the expression tree. -// * the operator for the `_measurement` binary expression is ==. -// -func HasSingleMeasurementNoOR(expr influxql.Expr) (string, bool) { - var lastMeasurement string - foundOnce := true - var invalidOP bool - - influxql.WalkFunc(expr, func(node influxql.Node) { - if !foundOnce || invalidOP { - return - } - - if be, ok := node.(*influxql.BinaryExpr); ok { - if be.Op == influxql.OR { - invalidOP = true - return - } - - if ref, ok := be.LHS.(*influxql.VarRef); ok { - if ref.Val == measurementRemap[measurementKey] { - if be.Op != influxql.EQ { - invalidOP = true - return - } - - if lastMeasurement != "" { - foundOnce = false - } - - // Check that RHS is a literal string - if ref, ok := be.RHS.(*influxql.StringLiteral); ok { - lastMeasurement = ref.Val - } - } - } - } - }) - return lastMeasurement, len(lastMeasurement) > 0 && foundOnce && !invalidOP -} - type hasRefs struct { refs []string found []bool diff --git a/v1/services/storage/predicate_test.go b/v1/services/storage/predicate_test.go index 13aceabdf3e..25036b70669 100644 --- a/v1/services/storage/predicate_test.go +++ b/v1/services/storage/predicate_test.go @@ -10,65 +10,6 @@ import ( "github.com/influxdata/influxql" ) -func TestHasSingleMeasurementNoOR(t *testing.T) { - cases := []struct { - expr influxql.Expr - name string - ok bool - }{ - { - expr: influxql.MustParseExpr(`_name = 'm0'`), - name: "m0", - ok: true, - }, - { - expr: influxql.MustParseExpr(`_something = 'f' AND _name = 'm0'`), - name: "m0", - ok: true, - }, - { - expr: influxql.MustParseExpr(`_something = 'f' AND (a =~ /x0/ AND _name = 'm0')`), - name: "m0", - ok: true, - }, - { - expr: influxql.MustParseExpr(`tag1 != 'foo'`), - ok: false, - }, - { - expr: influxql.MustParseExpr(`_name = 'm0' OR tag1 != 'foo'`), - ok: false, - }, - { - expr: influxql.MustParseExpr(`_name = 'm0' AND tag1 != 'foo' AND _name = 'other'`), - ok: false, - }, - { - expr: influxql.MustParseExpr(`_name = 'm0' AND tag1 != 'foo' OR _name = 'other'`), - ok: false, - }, - { - expr: influxql.MustParseExpr(`_name = 'm0' AND (tag1 != 'foo' OR tag2 = 'other')`), - ok: false, - }, - { - expr: influxql.MustParseExpr(`(tag1 != 'foo' OR tag2 = 'other') OR _name = 'm0'`), - ok: false, - }, - } - - for _, tc := range cases { - name, ok := storage.HasSingleMeasurementNoOR(tc.expr) - if ok != tc.ok { - t.Fatalf("got %q, %v for expression %q, expected %q, %v", name, ok, tc.expr, tc.name, tc.ok) - } - - if ok && name != tc.name { - t.Fatalf("got %q, %v for expression %q, expected %q, %v", name, ok, tc.expr, tc.name, tc.ok) - } - } -} - func TestRewriteExprRemoveFieldKeyAndValue(t *testing.T) { node := &datatypes.Node{ NodeType: datatypes.NodeTypeLogicalExpression, diff --git a/v1/services/storage/series_cursor.go b/v1/services/storage/series_cursor.go index d000ce29d99..522f3942db4 100644 --- a/v1/services/storage/series_cursor.go +++ b/v1/services/storage/series_cursor.go @@ -2,9 +2,11 @@ package storage import ( "context" + "sort" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" "github.com/influxdata/influxdb/v2/tsdb" @@ -93,51 +95,27 @@ func newIndexSeriesCursorInfluxQLPred(ctx context.Context, predicate influxql.Ex } } - var mitr tsdb.MeasurementIterator - name, singleMeasurement := HasSingleMeasurementNoOR(p.measurementCond) - if singleMeasurement { - mitr = tsdb.NewMeasurementSliceIterator([][]byte{[]byte(name)}) - } - sg := tsdb.Shards(shards) - p.sqry, err = sg.CreateSeriesCursor(ctx, tsdb.SeriesCursorRequest{Measurements: mitr}, opt.Condition) - if p.sqry != nil && err == nil { - // Optimisation to check if request is only interested in results for a - // single measurement. In this case we can efficiently produce all known - // field keys from the collection of shards without having to go via - // the query engine. - if singleMeasurement { - fkeys := sg.FieldKeysByMeasurement([]byte(name)) - if len(fkeys) == 0 { - goto CLEANUP - } - - fields := make([]field, 0, len(fkeys)) - for _, key := range fkeys { - fields = append(fields, field{n: key, nb: []byte(key)}) - } - p.fields = map[string][]field{name: fields} - return p, nil - } - - var mfkeys map[string][]string - mfkeys, err = sg.FieldKeysByPredicate(opt.Condition) - if err != nil { - goto CLEANUP - } - + if mfkeys, err := sg.FieldKeysByPredicate(opt.Condition); err == nil { p.fields = make(map[string][]field, len(mfkeys)) + measurementNamesForFields := []string{} for name, fkeys := range mfkeys { fields := make([]field, 0, len(fkeys)) for _, key := range fkeys { fields = append(fields, field{n: key, nb: []byte(key)}) } p.fields[name] = fields + measurementNamesForFields = append(measurementNamesForFields, name) + } + + sort.Strings(measurementNamesForFields) + mitr := tsdb.NewMeasurementSliceIterator(slices.StringsToBytes(measurementNamesForFields...)) + p.sqry, err = sg.CreateSeriesCursor(ctx, tsdb.SeriesCursorRequest{Measurements: mitr}, opt.Condition) + if p.sqry != nil && err == nil { + return p, nil } - return p, nil } -CLEANUP: p.Close() return nil, err }