Skip to content

Commit

Permalink
Use system cursors for measurement, series, and tag key meta queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Aug 30, 2017
1 parent 0da7116 commit 1dbe066
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 142 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [#8662](https://github.com/influxdata/influxdb/pull/8662): Improve test coverage across both indexes.
- [#8611](https://github.com/influxdata/influxdb/issues/8611): Respect X-Request-Id/Request-Id headers.
- [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml.
- [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
em.Location = stmt.Location
}
em.OmitTime = stmt.OmitTime
em.EmitName = stmt.EmitName
defer em.Close()

// Emit rows to the results channel.
Expand Down
6 changes: 6 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,12 @@ type SelectStatement struct {
// Removes the "time" column from the output.
OmitTime bool

// Removes measurement name from resulting query. Useful for meta queries.
StripName bool

// Overrides the output measurement name.
EmitName string

// Removes duplicate rows from raw queries.
Dedupe bool
}
Expand Down
78 changes: 46 additions & 32 deletions influxql/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,25 @@ func rewriteShowFieldKeyCardinalityStatement(stmt *ShowFieldKeyCardinalityStatem
}

func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW MEASUREMENTS doesn't support time in WHERE clause")
}

condition := stmt.Condition
var sources Sources
if stmt.Source != nil {
condition = rewriteSourcesCondition(Sources([]Source{stmt.Source}), stmt.Condition)
sources = Sources{stmt.Source}
}
return &ShowMeasurementsStatement{
Database: stmt.Database,
Condition: condition,
Limit: stmt.Limit,

return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "_name"}, Alias: "name"},
},
Sources: rewriteSources2(sources, stmt.Database),
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
StripName: true,
EmitName: "measurements",
Dedupe: true,
IsRawQuery: true,
}, nil
}

Expand Down Expand Up @@ -140,25 +144,22 @@ func rewriteShowMeasurementCardinalityStatement(stmt *ShowMeasurementCardinality
Offset: stmt.Offset,
Limit: stmt.Limit,
OmitTime: true,
StripName: true,
}, nil
}

func rewriteShowSeriesStatement(stmt *ShowSeriesStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW SERIES doesn't support time in WHERE clause")
}

return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "key"}},
{Expr: &VarRef{Val: "_seriesKey"}, Alias: "key"},
},
Sources: rewriteSources(stmt.Sources, "_series", stmt.Database),
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Sources: rewriteSources2(stmt.Sources, stmt.Database),
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
StripName: true,
Dedupe: true,
IsRawQuery: true,
}, nil
Expand Down Expand Up @@ -248,11 +249,6 @@ func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, err
}

func rewriteShowTagValuesCardinalityStatement(stmt *ShowTagValuesCardinalityStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG VALUES CARDINALITY doesn't support time in WHERE clause")
}

// Use all measurements, if zero.
if len(stmt.Sources) == 0 {
stmt.Sources = Sources{
Expand Down Expand Up @@ -324,17 +320,18 @@ func rewriteShowTagValuesCardinalityStatement(stmt *ShowTagValuesCardinalityStat
}

func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
}

return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "tagKey"}},
{
Expr: &Call{
Name: "distinct",
Args: []Expr{&VarRef{Val: "_tagKey"}},
},
Alias: "tagKey",
},
},
Sources: rewriteSources(stmt.Sources, "_tagKeys", stmt.Database),
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Sources: rewriteSources2(stmt.Sources, stmt.Database),
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
Expand Down Expand Up @@ -457,3 +454,20 @@ func rewriteSourcesCondition(sources Sources, cond Expr) Expr {
}
return scond
}

func rewriteSources2(sources Sources, database string) Sources {
if len(sources) == 0 {
sources = Sources{&Measurement{Regex: &RegexLiteral{Val: matchAllRegex.Copy()}}}
}
for _, source := range sources {
switch source := source.(type) {
case *Measurement:
if source.Database == "" {
source.Database = database
}
}
}
return sources
}

var matchAllRegex = regexp.MustCompile(`.+`)
44 changes: 22 additions & 22 deletions influxql/statement_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,91 +53,91 @@ func TestRewriteStatement(t *testing.T) {
},
{
stmt: `SHOW SERIES`,
s: `SELECT "key" FROM _series`,
s: `SELECT _seriesKey AS "key" FROM /.+/`,
},
{
stmt: `SHOW SERIES ON db0`,
s: `SELECT "key" FROM db0.._series`,
s: `SELECT _seriesKey AS "key" FROM db0../.+/`,
},
{
stmt: `SHOW SERIES FROM cpu`,
s: `SELECT "key" FROM _series WHERE _name = 'cpu'`,
s: `SELECT _seriesKey AS "key" FROM cpu`,
},
{
stmt: `SHOW SERIES ON db0 FROM cpu`,
s: `SELECT "key" FROM db0.._series WHERE _name = 'cpu'`,
s: `SELECT _seriesKey AS "key" FROM db0..cpu`,
},
{
stmt: `SHOW SERIES FROM mydb.myrp1.cpu`,
s: `SELECT "key" FROM mydb.myrp1._series WHERE _name = 'cpu'`,
s: `SELECT _seriesKey AS "key" FROM mydb.myrp1.cpu`,
},
{
stmt: `SHOW SERIES ON db0 FROM mydb.myrp1.cpu`,
s: `SELECT "key" FROM mydb.myrp1._series WHERE _name = 'cpu'`,
s: `SELECT _seriesKey AS "key" FROM mydb.myrp1.cpu`,
},
{
stmt: `SHOW SERIES FROM mydb.myrp1./c.*/`,
s: `SELECT "key" FROM mydb.myrp1._series WHERE _name =~ /c.*/`,
s: `SELECT _seriesKey AS "key" FROM mydb.myrp1./c.*/`,
},
{
stmt: `SHOW SERIES ON db0 FROM mydb.myrp1./c.*/`,
s: `SELECT "key" FROM mydb.myrp1._series WHERE _name =~ /c.*/`,
s: `SELECT _seriesKey AS "key" FROM mydb.myrp1./c.*/`,
},
{
stmt: `SHOW TAG KEYS`,
s: `SELECT tagKey FROM _tagKeys`,
s: `SELECT distinct(_tagKey) AS tagKey FROM /.+/`,
},
{
stmt: `SHOW TAG KEYS ON db0`,
s: `SELECT tagKey FROM db0.._tagKeys`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0../.+/`,
},
{
stmt: `SHOW TAG KEYS FROM cpu`,
s: `SELECT tagKey FROM _tagKeys WHERE _name = 'cpu'`,
s: `SELECT distinct(_tagKey) AS tagKey FROM cpu`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE _name = 'cpu'`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0..cpu`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/`,
s: `SELECT tagKey FROM _tagKeys WHERE _name =~ /c.*/`,
s: `SELECT distinct(_tagKey) AS tagKey FROM /c.*/`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM /c.*/`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE _name =~ /c.*/`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0../c.*/`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM _tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SELECT distinct(_tagKey) AS tagKey FROM cpu WHERE region = 'uswest'`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0..cpu WHERE region = 'uswest'`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name = 'cpu'`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name = 'cpu'`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1./c.*/`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name =~ /c.*/`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1./c.*/`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1./c.*/`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name =~ /c.*/`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1./c.*/`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
},
{
stmt: `SELECT value FROM cpu`,
Expand Down
7 changes: 7 additions & 0 deletions query/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Emitter struct {
// The columns to attach to each row.
Columns []string

// Overridden measurement name to emit.
EmitName string

// The time zone location.
Location *time.Location

Expand Down Expand Up @@ -139,6 +142,10 @@ func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) {

// createRow creates a new row attached to the emitter.
func (e *Emitter) createRow(name string, tags Tags, values []interface{}) {
if e.EmitName != "" {
name = e.EmitName
}

e.tags = tags
e.row = &models.Row{
Name: name,
Expand Down
Loading

0 comments on commit 1dbe066

Please sign in to comment.