Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of explain plan #8776

Merged
merged 1 commit into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml.
- [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries.
- [#6563](https://github.com/influxdata/influxdb/issues/6563): Support Ctrl+C to cancel a running query in the Influx CLI. Thanks @emluque!
- [#8776](https://github.com/influxdata/influxdb/pull/8776): Initial implementation of explain plan.

### Bugfixes

Expand Down
26 changes: 26 additions & 0 deletions coordinator/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
return sg.CreateIterator(m.Name, opt)
}

func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}

sg := a.ShardMap[source]
if sg == nil {
return query.IteratorCost{}, nil
}

if m.Regex != nil {
var costs query.IteratorCost
measurements := sg.MeasurementsByRegex(m.Regex.Val)
for _, measurement := range measurements {
cost, err := sg.IteratorCost(measurement, opt)
if err != nil {
return query.IteratorCost{}, err
}
costs = costs.Combine(cost)
}
return costs, nil
}
return sg.IteratorCost(m.Name, opt)
}

// Close clears out the list of mapped shards.
func (a *LocalShardMapping) Close() error {
a.ShardMap = nil
Expand Down
35 changes: 34 additions & 1 deletion coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sort"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb"
Expand Down Expand Up @@ -401,7 +402,39 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
return nil, errors.New("unimplemented")
if q.Analyze {
return nil, errors.New("analyze is currently unimplemented")
}

opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
}

// Prepare the query for execution, but do not actually execute it.
// This should perform any needed substitutions.
p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
if err != nil {
return nil, err
}
defer p.Close()

plan, err := p.Explain()
if err != nil {
return nil, err
}
plan = strings.TrimSpace(plan)

row := &models.Row{
Columns: []string{"QUERY PLAN"},
}
for _, s := range strings.Split(plan, "\n") {
row.Values = append(row.Values, []interface{}{s})
}
return models.Rows{row}, nil
}

func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
Expand Down
5 changes: 5 additions & 0 deletions coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ type MockShard struct {
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}

Expand Down Expand Up @@ -420,6 +421,10 @@ func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOption
return sh.CreateIteratorFn(measurement, opt)
}

func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
return sh.IteratorCostFn(measurement, opt)
}

func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return sh.ExpandSourcesFn(sources)
}
Expand Down
84 changes: 84 additions & 0 deletions query/explain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package query

import (
"bytes"
"fmt"
"io"
"strings"

"github.com/influxdata/influxdb/influxql"
)

func (p *preparedStatement) Explain() (string, error) {
// Determine the cost of all iterators created as part of this plan.
ic := &explainIteratorCreator{ic: p.ic}
p.ic = ic
itrs, _, err := p.Select()
p.ic = ic.ic

if err != nil {
return "", err
}
Iterators(itrs).Close()

var buf bytes.Buffer
for i, node := range ic.nodes {
if i > 0 {
buf.WriteString("\n")
}

expr := "<nil>"
if node.Expr != nil {
expr = node.Expr.String()
}
fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr)
if len(node.Aux) != 0 {
refs := make([]string, len(node.Aux))
for i, ref := range node.Aux {
refs[i] = ref.String()
}
fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", "))
}
fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards)
fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries)
fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles)
fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead)
fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize)
}
return buf.String(), nil
}

type planNode struct {
Expr influxql.Expr
Aux []influxql.VarRef
Cost IteratorCost
}

type explainIteratorCreator struct {
ic interface {
IteratorCreator
io.Closer
}
nodes []planNode
}

func (e *explainIteratorCreator) CreateIterator(m *influxql.Measurement, opt IteratorOptions) (Iterator, error) {
cost, err := e.ic.IteratorCost(m, opt)
if err != nil {
return nil, err
}
e.nodes = append(e.nodes, planNode{
Expr: opt.Expr,
Aux: opt.Aux,
Cost: cost,
})
return &nilFloatIterator{}, nil
}

func (e *explainIteratorCreator) IteratorCost(m *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) {
return e.ic.IteratorCost(m, opt)
}

func (e *explainIteratorCreator) Close() error {
return e.ic.Close()
}
37 changes: 37 additions & 0 deletions query/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,9 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats)
type IteratorCreator interface {
// Creates a simple iterator for use in an InfluxQL query.
CreateIterator(source *influxql.Measurement, opt IteratorOptions) (Iterator, error)

// Determines the potential cost for creating an iterator.
IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error)
}

// IteratorOptions is an object passed to CreateIterator to specify creation options.
Expand Down Expand Up @@ -1341,6 +1344,40 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats {
}
}

// IteratorCost contains statistics retrieved for explaining what potential
// cost may be incurred by instantiating an iterator.
type IteratorCost struct {
// The total number of shards that are touched by this query.
NumShards int64

// The total number of non-unique series that are accessed by this query.
// This number matches the number of cursors created by the query since
// one cursor is created for every series.
NumSeries int64

// The total number of non-unique files that may be accessed by this query.
// This will count the number of files accessed by each series so files
// will likely be double counted.
NumFiles int64

// The number of blocks that had the potential to be accessed.
BlocksRead int64

// The amount of data that can be potentially read.
BlockSize int64
}

// Combine combines the results of two IteratorCost structures into one.
func (c IteratorCost) Combine(other IteratorCost) IteratorCost {
return IteratorCost{
NumShards: c.NumShards + other.NumShards,
NumSeries: c.NumSeries + other.NumSeries,
NumFiles: c.NumFiles + other.NumFiles,
BlocksRead: c.BlocksRead + other.BlocksRead,
BlockSize: c.BlockSize + other.BlockSize,
}
}

// floatFastDedupeIterator outputs unique points where the point has a single aux field.
type floatFastDedupeIterator struct {
input FloatIterator
Expand Down
3 changes: 3 additions & 0 deletions query/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type PreparedStatement interface {
// Select creates the Iterators that will be used to read the query.
Select() ([]Iterator, []string, error)

// Explain outputs the explain plan for this statement.
Explain() (string, error)

// Close closes the resources associated with this prepared statement.
// This must be called as the mapped shards may hold open resources such
// as network connections.
Expand Down
4 changes: 4 additions & 0 deletions query/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,10 @@ func (sh *ShardGroup) CreateIterator(m *influxql.Measurement, opt query.Iterator
return sh.CreateIteratorFn(m, opt)
}

func (sh *ShardGroup) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
return query.IteratorCost{}, nil
}

func (sh *ShardGroup) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Engine interface {
Import(r io.Reader, basePath string) error

CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error

CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
Expand Down
66 changes: 66 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2138,6 +2138,72 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt qu
return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}

// IteratorCost produces the cost of an iterator.
func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
// Determine if this measurement exists. If it does not, then no shards are
// accessed to begin with.
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
return query.IteratorCost{}, err
} else if !exists {
return query.IteratorCost{}, nil
}

// Determine all of the tag sets for this query.
tagSets, err := e.index.TagSets([]byte(measurement), opt)
if err != nil {
return query.IteratorCost{}, err
}

// Attempt to retrieve the ref from the main expression (if it exists).
var ref *influxql.VarRef
if opt.Expr != nil {
if v, ok := opt.Expr.(*influxql.VarRef); ok {
ref = v
} else if call, ok := opt.Expr.(*influxql.Call); ok {
if len(call.Args) > 0 {
ref, _ = call.Args[0].(*influxql.VarRef)
}
}
}

// Count the number of series concatenated from the tag set.
cost := query.IteratorCost{NumShards: 1}
for _, t := range tagSets {
cost.NumSeries += int64(len(t.SeriesKeys))
for i, key := range t.SeriesKeys {
// Retrieve the cost for the main expression (if it exists).
if ref != nil {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}

// Retrieve the cost for every auxiliary field since these are also
// iterators that we may have to look through.
// We may want to separate these though as we are unlikely to incur
// anywhere close to the full costs of the auxiliary iterators because
// many of the selected values are usually skipped.
for _, ref := range opt.Aux {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}

// Retrieve the expression names in the condition (if there is a condition).
// We will also create cursors for these too.
if t.Filters[i] != nil {
refs := influxql.ExprNames(t.Filters[i])
for _, ref := range refs {
k := SeriesFieldKey(key, ref.Val)
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}
}
}
}
return cost, nil
}

func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
return e.index.SeriesPointIterator(opt)
}
Expand Down
Loading