Skip to content

Commit

Permalink
feat: Add WITH KEY to show tag keys (#20793)
Browse files Browse the repository at this point in the history
* fix: Change from RewriteExpr to PartitionExpr

Also remove some dead code

* feat: WITH KEY implementation

* feat: query rewriting for WITH KEY in SHOW TAG KEYS
  • Loading branch information
lesam authored Feb 25, 2021
1 parent e85a102 commit 17b9ea8
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 389 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.0
github.com/influxdata/flux v0.65.0
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jsternberg/zap-logfmt v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod
github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385 h1:ED4e5Cc3z5vSN2Tz2GkOHN7vs4Sxe2yds6CXvDnvZFE=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 h1:4t/8PcmLnI2vrcaHcEKeeLsGxC0WMRaOQdPX9b7DF8Y=
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8=
Expand Down
36 changes: 31 additions & 5 deletions query/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ func rewriteShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityS
}, nil
}

func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influxql.Statement, error) {
func withKeyExpr(tagKeyExpr influxql.Expr, op influxql.Token) influxql.Expr {
var expr influxql.Expr
if list, ok := stmt.TagKeyExpr.(*influxql.ListLiteral); ok {
if tagKeyExpr == nil {
return nil
}
if list, ok := tagKeyExpr.(*influxql.ListLiteral); ok {
for _, tagKey := range list.Vals {
tagExpr := &influxql.BinaryExpr{
Op: influxql.EQ,
Expand All @@ -270,11 +273,17 @@ func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influ
}
} else {
expr = &influxql.BinaryExpr{
Op: stmt.Op,
Op: op,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: stmt.TagKeyExpr,
RHS: tagKeyExpr,
}
}
return expr
}

func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influxql.Statement, error) {
// parser enforces that TagKeyExpr is non-nil
expr := withKeyExpr(stmt.TagKeyExpr, stmt.Op)

// Set condition or "AND" together.
condition := stmt.Condition
Expand Down Expand Up @@ -372,9 +381,26 @@ func rewriteShowTagValuesCardinalityStatement(stmt *influxql.ShowTagValuesCardin
}

func rewriteShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement) (influxql.Statement, error) {
condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition)
tagExpr := withKeyExpr(stmt.TagKeyExpr, stmt.TagKeyOp)

// if tagExpr == nil, condition is already set correctly
if tagExpr != nil {
if condition != nil {
condition = &influxql.BinaryExpr{
LHS: &influxql.ParenExpr{Expr: condition},
RHS: &influxql.ParenExpr{Expr: tagExpr},
Op: influxql.AND,
}
} else {
// condition is nil, replace with tagExpr
condition = tagExpr
}
}

return &influxql.ShowTagKeysStatement{
Database: stmt.Database,
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Condition: condition,
SortFields: stmt.SortFields,
Limit: stmt.Limit,
Offset: stmt.Offset,
Expand Down
32 changes: 32 additions & 0 deletions query/statement_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,38 @@ func TestRewriteStatement(t *testing.T) {
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE region = 'uswest'`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS WITH KEY = host`,
s: `SHOW TAG KEYS WHERE _tagKey = 'host'`,
},
{
stmt: `SHOW TAG KEYS ON db0 WITH KEY IN (host, region)`,
s: `SHOW TAG KEYS ON db0 WHERE _tagKey = 'host' OR _tagKey = 'region'`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WITH KEY =~ /h.*/`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (_tagKey =~ /h.*/)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WITH KEY = host`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/ WITH KEY = host`,
s: `SHOW TAG KEYS WHERE (_name =~ /c.*/) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM /c.*/ WITH KEY = host`,
s: `SHOW TAG KEYS ON db0 WHERE (_name =~ /c.*/) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WITH KEY = host WHERE region = 'uswest'`,
s: `SHOW TAG KEYS WHERE ((_name = 'cpu') AND (region = 'uswest')) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WITH KEY in (host, region) WHERE region = 'uswest'`,
s: `SHOW TAG KEYS ON db0 WHERE ((_name = 'cpu') AND (region = 'uswest')) AND (_tagKey = 'host' OR _tagKey = 'region')`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu`,
s: `SHOW TAG KEYS WHERE _name = 'cpu'`,
Expand Down
23 changes: 23 additions & 0 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7983,6 +7983,23 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show tag keys on db0 with key`,
command: "SHOW TAG KEYS ON db0 WITH KEY =~ /ho/",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"]]},{"name":"disk","columns":["tagKey"],"values":[["host"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
},
&Query{
name: "show tag keys from with key",
command: "SHOW TAG KEYS FROM cpu WITH KEY = host",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys from regex with key in",
command: "SHOW TAG KEYS FROM /[cg]pu/ WITH KEY IN (host, region) ",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys measurement not found",
command: "SHOW TAG KEYS FROM doesntexist",
Expand Down Expand Up @@ -8018,6 +8035,12 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys with key with time where",
command: "SHOW TAG KEYS WITH KEY = host WHERE host = 'server03' AND time > 0",
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["tagKey"],"values":[["host"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys with time measurement not found",
command: "SHOW TAG KEYS FROM doesntexist WHERE time > 0",
Expand Down
45 changes: 27 additions & 18 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E
case *influxql.ParenExpr:
return is.measurementNamesByExpr(auth, e.Expr)
default:
return nil, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("Invalid measurement expression %#v", expr)
}
}

Expand Down Expand Up @@ -2443,43 +2443,40 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
}

// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
//
// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// tagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// position of the tag key in the keys argument.
//
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
// lexicographic order.
func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagValuesByKeyAndExpr(auth, name, keys, expr)
}

// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See
// TagValuesByKeyAndExpr for more details.
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
database := is.Database()

valueExpr := influxql.CloneExpr(expr)
valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr {
valueExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "value" {
return nil
if ok && tag.Val == "value" {
return true, nil
}
}
}
return e
}), nil)
return false, nil
})
if err != nil {
return nil, err
}
if remainingExpr == nil {
remainingExpr = &influxql.BooleanLiteral{Val: true}
}

itr, err := is.seriesByExprIterator(name, expr)
itr, err := is.seriesByExprIterator(name, remainingExpr)
if err != nil {
return nil, err
} else if itr == nil {
Expand Down Expand Up @@ -2512,6 +2509,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
break
}

if e.Expr != nil {
// We don't yet have code that correctly processes expressions that
// seriesByExprIterator doesn't handle
lit, ok := e.Expr.(*influxql.BooleanLiteral)
if !ok {
return nil, fmt.Errorf("Expression too complex for metaquery: %v", e.Expr)
}
if !lit.Val {
continue
}
}

buf := is.SeriesFile.SeriesKey(e.SeriesID)
if len(buf) == 0 {
continue
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,14 @@ func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
}
return nil, nil
default:
return nil, fmt.Errorf("invalid operator")
return nil, fmt.Errorf("invalid operator for tag keys by expression")
}

case *influxql.ParenExpr:
return fs.MeasurementTagKeysByExpr(name, e.Expr)
}

return nil, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("Invalid measurement tag keys expression: %#v", expr)
}

// tagKeysByFilter will filter the tag keys for the measurement.
Expand Down
20 changes: 0 additions & 20 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,26 +801,6 @@ func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return engine.MeasurementNamesByRegex(re)
}

// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
engine, err := s.Engine()
if err != nil {
return nil, err
}
return engine.MeasurementTagKeysByExpr(name, expr)
}

// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
// provided expression.
func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
}

// MeasurementFields returns fields for a measurement.
// TODO(edd): This method is currently only being called from tests; do we
// really need it?
Expand Down
Loading

0 comments on commit 17b9ea8

Please sign in to comment.