Skip to content

Commit

Permalink
Merge pull request #8776 from influxdata/js-explain-plan
Browse files Browse the repository at this point in the history
Initial implementation of explain plan
  • Loading branch information
jsternberg authored Sep 1, 2017
2 parents 51e886b + 50d404e commit 091ea5f
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml.
- [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries.
- [#6563](https://github.com/influxdata/influxdb/issues/6563): Support Ctrl+C to cancel a running query in the Influx CLI. Thanks @emluque!
- [#8776](https://github.com/influxdata/influxdb/pull/8776): Initial implementation of explain plan.

### Bugfixes

Expand Down
26 changes: 26 additions & 0 deletions coordinator/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
return sg.CreateIterator(m.Name, opt)
}

func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}

sg := a.ShardMap[source]
if sg == nil {
return query.IteratorCost{}, nil
}

if m.Regex != nil {
var costs query.IteratorCost
measurements := sg.MeasurementsByRegex(m.Regex.Val)
for _, measurement := range measurements {
cost, err := sg.IteratorCost(measurement, opt)
if err != nil {
return query.IteratorCost{}, err
}
costs = costs.Combine(cost)
}
return costs, nil
}
return sg.IteratorCost(m.Name, opt)
}

// Close clears out the list of mapped shards.
func (a *LocalShardMapping) Close() error {
a.ShardMap = nil
Expand Down
35 changes: 34 additions & 1 deletion coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sort"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb"
Expand Down Expand Up @@ -401,7 +402,39 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
return nil, errors.New("unimplemented")
if q.Analyze {
return nil, errors.New("analyze is currently unimplemented")
}

opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
}

// Prepare the query for execution, but do not actually execute it.
// This should perform any needed substitutions.
p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
if err != nil {
return nil, err
}
defer p.Close()

plan, err := p.Explain()
if err != nil {
return nil, err
}
plan = strings.TrimSpace(plan)

row := &models.Row{
Columns: []string{"QUERY PLAN"},
}
for _, s := range strings.Split(plan, "\n") {
row.Values = append(row.Values, []interface{}{s})
}
return models.Rows{row}, nil
}

func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
Expand Down
5 changes: 5 additions & 0 deletions coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ type MockShard struct {
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}

Expand Down Expand Up @@ -420,6 +421,10 @@ func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOption
return sh.CreateIteratorFn(measurement, opt)
}

func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
return sh.IteratorCostFn(measurement, opt)
}

func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return sh.ExpandSourcesFn(sources)
}
Expand Down
84 changes: 84 additions & 0 deletions query/explain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package query

import (
"bytes"
"fmt"
"io"
"strings"

"github.com/influxdata/influxdb/influxql"
)

func (p *preparedStatement) Explain() (string, error) {
// Determine the cost of all iterators created as part of this plan.
ic := &explainIteratorCreator{ic: p.ic}
p.ic = ic
itrs, _, err := p.Select()
p.ic = ic.ic

if err != nil {
return "", err
}
Iterators(itrs).Close()

var buf bytes.Buffer
for i, node := range ic.nodes {
if i > 0 {
buf.WriteString("\n")
}

expr := "<nil>"
if node.Expr != nil {
expr = node.Expr.String()
}
fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr)
if len(node.Aux) != 0 {
refs := make([]string, len(node.Aux))
for i, ref := range node.Aux {
refs[i] = ref.String()
}
fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", "))
}
fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards)
fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries)
fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles)
fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead)
fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize)
}
return buf.String(), nil
}

type planNode struct {
Expr influxql.Expr
Aux []influxql.VarRef
Cost IteratorCost
}

type explainIteratorCreator struct {
ic interface {
IteratorCreator
io.Closer
}
nodes []planNode
}

func (e *explainIteratorCreator) CreateIterator(m *influxql.Measurement, opt IteratorOptions) (Iterator, error) {
cost, err := e.ic.IteratorCost(m, opt)
if err != nil {
return nil, err
}
e.nodes = append(e.nodes, planNode{
Expr: opt.Expr,
Aux: opt.Aux,
Cost: cost,
})
return &nilFloatIterator{}, nil
}

func (e *explainIteratorCreator) IteratorCost(m *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) {
return e.ic.IteratorCost(m, opt)
}

func (e *explainIteratorCreator) Close() error {
return e.ic.Close()
}
37 changes: 37 additions & 0 deletions query/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,9 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats)
type IteratorCreator interface {
// Creates a simple iterator for use in an InfluxQL query.
CreateIterator(source *influxql.Measurement, opt IteratorOptions) (Iterator, error)

// Determines the potential cost for creating an iterator.
IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error)
}

// IteratorOptions is an object passed to CreateIterator to specify creation options.
Expand Down Expand Up @@ -1341,6 +1344,40 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats {
}
}

// IteratorCost contains statistics retrieved for explaining what potential
// cost may be incurred by instantiating an iterator.
type IteratorCost struct {
// The total number of shards that are touched by this query.
NumShards int64

// The total number of non-unique series that are accessed by this query.
// This number matches the number of cursors created by the query since
// one cursor is created for every series.
NumSeries int64

// The total number of non-unique files that may be accessed by this query.
// This will count the number of files accessed by each series so files
// will likely be double counted.
NumFiles int64

// The number of blocks that had the potential to be accessed.
BlocksRead int64

// The amount of data that can be potentially read.
BlockSize int64
}

// Combine combines the results of two IteratorCost structures into one.
func (c IteratorCost) Combine(other IteratorCost) IteratorCost {
return IteratorCost{
NumShards: c.NumShards + other.NumShards,
NumSeries: c.NumSeries + other.NumSeries,
NumFiles: c.NumFiles + other.NumFiles,
BlocksRead: c.BlocksRead + other.BlocksRead,
BlockSize: c.BlockSize + other.BlockSize,
}
}

// floatFastDedupeIterator outputs unique points where the point has a single aux field.
type floatFastDedupeIterator struct {
input FloatIterator
Expand Down
3 changes: 3 additions & 0 deletions query/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type PreparedStatement interface {
// Select creates the Iterators that will be used to read the query.
Select() ([]Iterator, []string, error)

// Explain outputs the explain plan for this statement.
Explain() (string, error)

// Close closes the resources associated with this prepared statement.
// This must be called as the mapped shards may hold open resources such
// as network connections.
Expand Down
4 changes: 4 additions & 0 deletions query/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,10 @@ func (sh *ShardGroup) CreateIterator(m *influxql.Measurement, opt query.Iterator
return sh.CreateIteratorFn(m, opt)
}

func (sh *ShardGroup) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
return query.IteratorCost{}, nil
}

func (sh *ShardGroup) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Engine interface {
Import(r io.Reader, basePath string) error

CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error

CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
Expand Down
66 changes: 66 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2138,6 +2138,72 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt qu
return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}

// IteratorCost produces the cost of an iterator.
func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
// Determine if this measurement exists. If it does not, then no shards are
// accessed to begin with.
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
return query.IteratorCost{}, err
} else if !exists {
return query.IteratorCost{}, nil
}

// Determine all of the tag sets for this query.
tagSets, err := e.index.TagSets([]byte(measurement), opt)
if err != nil {
return query.IteratorCost{}, err
}

// Attempt to retrieve the ref from the main expression (if it exists).
var ref *influxql.VarRef
if opt.Expr != nil {
if v, ok := opt.Expr.(*influxql.VarRef); ok {
ref = v
} else if call, ok := opt.Expr.(*influxql.Call); ok {
if len(call.Args) > 0 {
ref, _ = call.Args[0].(*influxql.VarRef)
}
}
}

// Count the number of series concatenated from the tag set.
cost := query.IteratorCost{NumShards: 1}
for _, t := range tagSets {
cost.NumSeries += int64(len(t.SeriesKeys))
for i, key := range t.SeriesKeys {
// Retrieve the cost for the main expression (if it exists).
if ref != nil {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}

// Retrieve the cost for every auxiliary field since these are also
// iterators that we may have to look through.
// We may want to separate these though as we are unlikely to incur
// anywhere close to the full costs of the auxiliary iterators because
// many of the selected values are usually skipped.
for _, ref := range opt.Aux {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}

// Retrieve the expression names in the condition (if there is a condition).
// We will also create cursors for these too.
if t.Filters[i] != nil {
refs := influxql.ExprNames(t.Filters[i])
for _, ref := range refs {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}
}
}
}
return cost, nil
}

func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
return e.index.SeriesPointIterator(opt)
}
Expand Down
Loading

0 comments on commit 091ea5f

Please sign in to comment.