Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite SHOW SERIES to SELECT #5937

Merged
merged 1 commit into from
Mar 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- [#5880](https://github.com/influxdata/influxdb/issues/5880): TCP connection closed after write (regression/change from 0.9.6)
- [#5865](https://github.com/influxdata/influxdb/issues/5865): Conversion to tsm fails with exceeds max index value
- [#5924](https://github.com/influxdata/influxdb/issues/5924): Missing data after using influx\_tsm
- [#5937](https://github.com/influxdata/influxdb/pull/5937): Rewrite SHOW SERIES to use query engine

## v0.10.2 [2016-03-03]
### Bugfixes
Expand Down
7 changes: 0 additions & 7 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowSeriesStatement:
rows, err = e.executeShowSeriesStatement(stmt, database)
case *influxql.ShowServersStatement:
rows, err = e.executeShowServersStatement(stmt)
case *influxql.ShowShardsStatement:
Expand Down Expand Up @@ -667,10 +665,6 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe
return []*models.Row{row}, nil
}

func (e *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
return e.TSDBStore.ExecuteShowSeriesStatement(stmt, database)
}

func (e *QueryExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) (models.Rows, error) {
nis, err := e.MetaClient.DataNodes()
if err != nil {
Expand Down Expand Up @@ -1100,7 +1094,6 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
Expand Down
5 changes: 0 additions & 5 deletions cluster/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatementFn func(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
Expand Down Expand Up @@ -232,10 +231,6 @@ func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysSt
return s.ExecuteShowFieldKeysStatementFn(stmt, database)
}

func (s *TSDBStore) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
return s.ExecuteShowSeriesStatementFn(stmt, database)
}

func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
return s.ExecuteShowTagValuesStatementFn(stmt, database)
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/influxd/run/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func init() {
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down Expand Up @@ -239,7 +239,7 @@ func init() {
&Query{
name: "Show series is present again after re-write",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
},
Expand All @@ -260,7 +260,7 @@ func init() {
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["a,host=serverA,region=uswest"],["aa,host=serverA,region=uswest"],["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -273,7 +273,7 @@ func init() {
&Query{
name: "Show series is gone",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -286,7 +286,7 @@ func init() {
&Query{
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -298,7 +298,7 @@ func init() {
&Query{
name: "make sure DROP SERIES with field in WHERE didn't delete data",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down
20 changes: 10 additions & 10 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4599,7 +4599,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "show series",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]},{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["memory,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -4623,7 +4623,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "verify series",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["memory,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down Expand Up @@ -4751,43 +4751,43 @@ func TestServer_Query_ShowSeries(t *testing.T) {
&Query{
name: `show series`,
command: "SHOW SERIES",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["disk,host=server03,region=caeast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series from measurement`,
command: "SHOW SERIES FROM cpu",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series from regular expression`,
command: "SHOW SERIES FROM /[cg]pu/",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series with where tag`,
command: "SHOW SERIES WHERE region = 'uswest'",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=uswest","server01","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series where tag matches regular expression`,
command: "SHOW SERIES WHERE region =~ /ca.*/",
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series`,
command: "SHOW SERIES WHERE host !~ /server0[12]/",
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series with from and where`,
command: "SHOW SERIES FROM cpu WHERE region = 'useast'",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=useast"],["cpu,host=server02,region=useast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -4799,7 +4799,7 @@ func TestServer_Query_ShowSeries(t *testing.T) {
&Query{
name: `show series with WHERE field should fail`,
command: "SHOW SERIES WHERE value > 10.0",
exp: `{"results":[{"error":"SHOW SERIES doesn't support fields in WHERE clause"}]}`,
exp: `{"results":[{"error":"invalid tag comparison operator"}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)
Expand Down
35 changes: 29 additions & 6 deletions influxql/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ func RewriteStatement(stmt Statement) (Statement, error) {
return rewriteShowFieldKeysStatement(stmt)
case *ShowMeasurementsStatement:
return rewriteShowMeasurementsStatement(stmt)
case *ShowSeriesStatement:
return rewriteShowSeriesStatement(stmt)
case *ShowTagKeysStatement:
return rewriteShowTagKeysStatement(stmt)
case *ShowTagValuesStatement:
Expand Down Expand Up @@ -81,21 +83,20 @@ func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statemen
}, nil
}

func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
func rewriteShowSeriesStatement(stmt *ShowSeriesStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
return nil, errors.New("SHOW SERIES doesn't support time in WHERE clause")
}

condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition)
return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "tagKey"}},
{Expr: &VarRef{Val: "key"}},
},
Sources: []Source{
&Measurement{Name: "_tagKeys"},
&Measurement{Name: "_series"},
},
Condition: condition,
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
Expand Down Expand Up @@ -161,6 +162,28 @@ func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, err
}, nil
}

func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
}

return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "tagKey"}},
},
Sources: []Source{
&Measurement{Name: "_tagKeys"},
},
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
Dedupe: true,
}, nil
}

// rewriteSourcesCondition rewrites sources into `name` expressions.
// Merges with cond and returns a new condition.
func rewriteSourcesCondition(sources Sources, cond Expr) Expr {
Expand Down
23 changes: 20 additions & 3 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bo
}
return nil, false, nil
default:
return nil, false, fmt.Errorf("invalid operator")
return nil, false, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
return d.measurementsByExpr(e.Expr)
Expand Down Expand Up @@ -740,19 +740,27 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex

// For fields, return all series IDs from this measurement and return
// the expression passed in, as the filter.
if m.HasField(name.Val) {
if name.Val != "name" && m.HasField(name.Val) {
return m.seriesIDs, n, nil
}

tagVals, ok := m.seriesByTagKeyValue[name.Val]
if !ok {
if name.Val != "name" && !ok {
return nil, nil, nil
}

// if we're looking for series with a specific tag value
if str, ok := value.(*influxql.StringLiteral); ok {
var ids SeriesIDs

// Special handling for "name" to match measurement name.
if name.Val == "name" {
if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) {
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
}
return nil, &influxql.BooleanLiteral{Val: true}, nil
}

if n.Op == influxql.EQ {
// return series that have a tag of specific value.
ids = tagVals[str.Val]
Expand All @@ -766,6 +774,15 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
if re, ok := value.(*influxql.RegexLiteral); ok {
var ids SeriesIDs

// Special handling for "name" to match measurement name.
if name.Val == "name" {
match := re.Val.MatchString(m.Name)
if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) {
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
}
return nil, &influxql.BooleanLiteral{Val: true}, nil
}

// The operation is a NEQREGEX, code must start by assuming all match, even
// series without any tags.
if n.Op == influxql.NEQREGEX {
Expand Down
78 changes: 78 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
return NewFieldKeysIterator(s, opt)
case "_measurements":
return NewMeasurementIterator(s, opt)
case "_series":
return NewSeriesIterator(s, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
case "_tags":
Expand Down Expand Up @@ -882,6 +884,82 @@ func (itr *MeasurementIterator) Next() *influxql.FloatPoint {
}
}

// seriesIterator emits series ids.
type seriesIterator struct {
keys []string // remaining series
fields []string // fields to emit (key)
}

// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Retrieve a list of all measurements.
mms := sh.index.Measurements()
sort.Sort(mms)

// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}

// Generate a list of all series keys.
keys := newStringSet()
for _, mm := range mms {
ids, err := mm.seriesIDsAllOrByExpr(opt.Condition)
if err != nil {
return nil, err
}

for _, id := range ids {
keys.add(mm.SeriesByID(id).Key)
}
}

return &seriesIterator{
keys: keys.list(),
fields: opt.Aux,
}, nil
}

// Close closes the iterator.
func (itr *seriesIterator) Close() error { return nil }

// Next emits the next point in the iterator.
func (itr *seriesIterator) Next() *influxql.FloatPoint {
// If there are no more keys then return nil.
if len(itr.keys) == 0 {
return nil
}

// Prepare auxiliary fields.
aux := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "key":
aux[i] = itr.keys[0]
}
}

// Return next key.
p := &influxql.FloatPoint{
Aux: aux,
}
itr.keys = itr.keys[1:]

return p
}

// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string {
Expand Down
Loading