Skip to content

Commit

Permalink
feat: Add WITH KEY to show tag keys (influxdata#20793)
Browse files Browse the repository at this point in the history
* fix: Change from RewriteExpr to PartitionExpr

Also remove some dead code

* feat: WITH KEY implementation

* feat: query rewriting for WITH KEY in SHOW TAG KEYS
  • Loading branch information
lesam authored and chengshiwen committed Sep 1, 2024
1 parent 94e2777 commit f1cb8cc
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 389 deletions.
36 changes: 31 additions & 5 deletions query/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,12 @@ func rewriteShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityS
}, nil
}

func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influxql.Statement, error) {
func withKeyExpr(tagKeyExpr influxql.Expr, op influxql.Token) influxql.Expr {
var expr influxql.Expr
if list, ok := stmt.TagKeyExpr.(*influxql.ListLiteral); ok {
if tagKeyExpr == nil {
return nil
}
if list, ok := tagKeyExpr.(*influxql.ListLiteral); ok {
for _, tagKey := range list.Vals {
tagExpr := &influxql.BinaryExpr{
Op: influxql.EQ,
Expand All @@ -246,11 +249,17 @@ func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influ
}
} else {
expr = &influxql.BinaryExpr{
Op: stmt.Op,
Op: op,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: stmt.TagKeyExpr,
RHS: tagKeyExpr,
}
}
return expr
}

func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influxql.Statement, error) {
// parser enforces that TagKeyExpr is non-nil
expr := withKeyExpr(stmt.TagKeyExpr, stmt.Op)

// Set condition or "AND" together.
condition := stmt.Condition
Expand Down Expand Up @@ -348,9 +357,26 @@ func rewriteShowTagValuesCardinalityStatement(stmt *influxql.ShowTagValuesCardin
}

func rewriteShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement) (influxql.Statement, error) {
condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition)
tagExpr := withKeyExpr(stmt.TagKeyExpr, stmt.TagKeyOp)

// if tagExpr == nil, condition is already set correctly
if tagExpr != nil {
if condition != nil {
condition = &influxql.BinaryExpr{
LHS: &influxql.ParenExpr{Expr: condition},
RHS: &influxql.ParenExpr{Expr: tagExpr},
Op: influxql.AND,
}
} else {
// condition is nil, replace with tagExpr
condition = tagExpr
}
}

return &influxql.ShowTagKeysStatement{
Database: stmt.Database,
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Condition: condition,
SortFields: stmt.SortFields,
Limit: stmt.Limit,
Offset: stmt.Offset,
Expand Down
32 changes: 32 additions & 0 deletions query/statement_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,38 @@ func TestRewriteStatement(t *testing.T) {
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE region = 'uswest'`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS WITH KEY = host`,
s: `SHOW TAG KEYS WHERE _tagKey = 'host'`,
},
{
stmt: `SHOW TAG KEYS ON db0 WITH KEY IN (host, region)`,
s: `SHOW TAG KEYS ON db0 WHERE _tagKey = 'host' OR _tagKey = 'region'`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WITH KEY =~ /h.*/`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (_tagKey =~ /h.*/)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WITH KEY = host`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/ WITH KEY = host`,
s: `SHOW TAG KEYS WHERE (_name =~ /c.*/) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM /c.*/ WITH KEY = host`,
s: `SHOW TAG KEYS ON db0 WHERE (_name =~ /c.*/) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WITH KEY = host WHERE region = 'uswest'`,
s: `SHOW TAG KEYS WHERE ((_name = 'cpu') AND (region = 'uswest')) AND (_tagKey = 'host')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WITH KEY in (host, region) WHERE region = 'uswest'`,
s: `SHOW TAG KEYS ON db0 WHERE ((_name = 'cpu') AND (region = 'uswest')) AND (_tagKey = 'host' OR _tagKey = 'region')`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu`,
s: `SHOW TAG KEYS WHERE _name = 'cpu'`,
Expand Down
23 changes: 23 additions & 0 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7929,6 +7929,23 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show tag keys on db0 with key`,
command: "SHOW TAG KEYS ON db0 WITH KEY =~ /ho/",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"]]},{"name":"disk","columns":["tagKey"],"values":[["host"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
},
&Query{
name: "show tag keys from with key",
command: "SHOW TAG KEYS FROM cpu WITH KEY = host",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys from regex with key in",
command: "SHOW TAG KEYS FROM /[cg]pu/ WITH KEY IN (host, region) ",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys measurement not found",
command: "SHOW TAG KEYS FROM doesntexist",
Expand Down Expand Up @@ -7964,6 +7981,12 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys with key with time where",
command: "SHOW TAG KEYS WITH KEY = host WHERE host = 'server03' AND time > 0",
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["tagKey"],"values":[["host"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys with time measurement not found",
command: "SHOW TAG KEYS FROM doesntexist WHERE time > 0",
Expand Down
43 changes: 26 additions & 17 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2753,43 +2753,40 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
}

// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
//
// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// tagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// position of the tag key in the keys argument.
//
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
// lexicographic order.
func (is IndexSet) TagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagValuesByKeyAndExpr(auth, name, keys, expr)
}

// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See
// TagValuesByKeyAndExpr for more details.
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
database := is.Database()

valueExpr := influxql.CloneExpr(expr)
valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr {
valueExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "value" {
return nil
if ok && tag.Val == "value" {
return true, nil
}
}
}
return e
}), nil)
return false, nil
})
if err != nil {
return nil, err
}
if remainingExpr == nil {
remainingExpr = &influxql.BooleanLiteral{Val: true}
}

itr, err := is.seriesByExprIterator(name, expr)
itr, err := is.seriesByExprIterator(name, remainingExpr)
if err != nil {
return nil, err
} else if itr == nil {
Expand Down Expand Up @@ -2822,6 +2819,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,
break
}

if e.Expr != nil {
// We don't yet have code that correctly processes expressions that
// seriesByExprIterator doesn't handle
lit, ok := e.Expr.(*influxql.BooleanLiteral)
if !ok {
return nil, fmt.Errorf("Expression too complex for metaquery: %v", e.Expr)
}
if !lit.Val {
continue
}
}

buf := is.SeriesFile.SeriesKey(e.SeriesID)
if len(buf) == 0 {
continue
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
}
return nil, nil
default:
return nil, fmt.Errorf("invalid operator")
return nil, fmt.Errorf("invalid operator for tag keys by expression")
}

case *influxql.ParenExpr:
return fs.MeasurementTagKeysByExpr(name, e.Expr)
}

return nil, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("Invalid measurement tag keys expression: %#v", expr)
}

// tagKeysByFilter will filter the tag keys for the measurement.
Expand Down
20 changes: 0 additions & 20 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,26 +857,6 @@ func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return engine.MeasurementNamesByRegex(re)
}

// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
engine, err := s.Engine()
if err != nil {
return nil, err
}
return engine.MeasurementTagKeysByExpr(name, expr)
}

// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
// provided expression.
func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
}

// MeasurementNamesByPredicate returns fields for a measurement filtered by an expression.
func (s *Shard) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) {
index, err := s.Index()
Expand Down
Loading

0 comments on commit f1cb8cc

Please sign in to comment.