From 50d404e6908c4d3e673ab0f3c6d38466b86201d9 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 24 Aug 2017 11:27:29 -0500 Subject: [PATCH] Initial implementation of explain plan It prints the statistics of each iterator that will access the storage engine. For each access of the storage engine, it will print the number of shards that will potentially be accessed, the number of files that may be accessed, the number of series that will be created, the number of blocks, and the size of those blocks. --- CHANGELOG.md | 1 + coordinator/shard_mapper.go | 26 ++++++++ coordinator/statement_executor.go | 35 ++++++++++- coordinator/statement_executor_test.go | 5 ++ query/explain.go | 84 ++++++++++++++++++++++++++ query/iterator.go | 37 ++++++++++++ query/select.go | 3 + query/select_test.go | 4 ++ tsdb/engine.go | 1 + tsdb/engine/tsm1/engine.go | 66 ++++++++++++++++++++ tsdb/engine/tsm1/file_store.go | 50 ++++++++++++++- tsdb/shard.go | 51 ++++++++++++++++ 12 files changed, 361 insertions(+), 2 deletions(-) create mode 100644 query/explain.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d0e116494..ba1f26befd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml. - [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries. - [#6563](https://github.com/influxdata/influxdb/issues/6563): Support Ctrl+C to cancel a running query in the Influx CLI. Thanks @emluque! +- [#8776](https://github.com/influxdata/influxdb/pull/8776): Initial implementation of explain plan. ### Bugfixes diff --git a/coordinator/shard_mapper.go b/coordinator/shard_mapper.go index ff28ae326ab..961affc548a 100644 --- a/coordinator/shard_mapper.go +++ b/coordinator/shard_mapper.go @@ -181,6 +181,32 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It return sg.CreateIterator(m.Name, opt) } +func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { + source := Source{ + Database: m.Database, + RetentionPolicy: m.RetentionPolicy, + } + + sg := a.ShardMap[source] + if sg == nil { + return query.IteratorCost{}, nil + } + + if m.Regex != nil { + var costs query.IteratorCost + measurements := sg.MeasurementsByRegex(m.Regex.Val) + for _, measurement := range measurements { + cost, err := sg.IteratorCost(measurement, opt) + if err != nil { + return query.IteratorCost{}, err + } + costs = costs.Combine(cost) + } + return costs, nil + } + return sg.IteratorCost(m.Name, opt) +} + // Close clears out the list of mapped shards. func (a *LocalShardMapping) Close() error { a.ShardMap = nil diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 0afe9fc8438..06e8efff76c 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -7,6 +7,7 @@ import ( "io" "sort" "strconv" + "strings" "time" "github.com/influxdata/influxdb" @@ -401,7 +402,39 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme } func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { - return nil, errors.New("unimplemented") + if q.Analyze { + return nil, errors.New("analyze is currently unimplemented") + } + + opt := query.SelectOptions{ + InterruptCh: ctx.InterruptCh, + NodeID: ctx.ExecutionOptions.NodeID, + MaxSeriesN: e.MaxSelectSeriesN, + MaxBucketsN: e.MaxSelectBucketsN, + Authorizer: ctx.Authorizer, + } + + // Prepare the query for execution, but do not actually execute it. + // This should perform any needed substitutions. + p, err := query.Prepare(q.Statement, e.ShardMapper, opt) + if err != nil { + return nil, err + } + defer p.Close() + + plan, err := p.Explain() + if err != nil { + return nil, err + } + plan = strings.TrimSpace(plan) + + row := &models.Row{ + Columns: []string{"QUERY PLAN"}, + } + for _, s := range strings.Split(plan, "\n") { + row.Values = append(row.Values, []interface{}{s}) + } + return models.Rows{row}, nil } func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error { diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 893c190860b..d301d17004f 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -385,6 +385,7 @@ type MockShard struct { Measurements []string FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) } @@ -420,6 +421,10 @@ func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOption return sh.CreateIteratorFn(measurement, opt) } +func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + return sh.IteratorCostFn(measurement, opt) +} + func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { return sh.ExpandSourcesFn(sources) } diff --git a/query/explain.go b/query/explain.go new file mode 100644 index 00000000000..71f99ed24f7 --- /dev/null +++ b/query/explain.go @@ -0,0 +1,84 @@ +package query + +import ( + "bytes" + "fmt" + "io" + "strings" + + "github.com/influxdata/influxdb/influxql" +) + +func (p *preparedStatement) Explain() (string, error) { + // Determine the cost of all iterators created as part of this plan. + ic := &explainIteratorCreator{ic: p.ic} + p.ic = ic + itrs, _, err := p.Select() + p.ic = ic.ic + + if err != nil { + return "", err + } + Iterators(itrs).Close() + + var buf bytes.Buffer + for i, node := range ic.nodes { + if i > 0 { + buf.WriteString("\n") + } + + expr := "" + if node.Expr != nil { + expr = node.Expr.String() + } + fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr) + if len(node.Aux) != 0 { + refs := make([]string, len(node.Aux)) + for i, ref := range node.Aux { + refs[i] = ref.String() + } + fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", ")) + } + fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards) + fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries) + fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles) + fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead) + fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize) + } + return buf.String(), nil +} + +type planNode struct { + Expr influxql.Expr + Aux []influxql.VarRef + Cost IteratorCost +} + +type explainIteratorCreator struct { + ic interface { + IteratorCreator + io.Closer + } + nodes []planNode +} + +func (e *explainIteratorCreator) CreateIterator(m *influxql.Measurement, opt IteratorOptions) (Iterator, error) { + cost, err := e.ic.IteratorCost(m, opt) + if err != nil { + return nil, err + } + e.nodes = append(e.nodes, planNode{ + Expr: opt.Expr, + Aux: opt.Aux, + Cost: cost, + }) + return &nilFloatIterator{}, nil +} + +func (e *explainIteratorCreator) IteratorCost(m *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) { + return e.ic.IteratorCost(m, opt) +} + +func (e *explainIteratorCreator) Close() error { + return e.ic.Close() +} diff --git a/query/iterator.go b/query/iterator.go index 4b692dc83a3..07767a529c8 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -625,6 +625,9 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats) type IteratorCreator interface { // Creates a simple iterator for use in an InfluxQL query. CreateIterator(source *influxql.Measurement, opt IteratorOptions) (Iterator, error) + + // Determines the potential cost for creating an iterator. + IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) } // IteratorOptions is an object passed to CreateIterator to specify creation options. @@ -1341,6 +1344,40 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats { } } +// IteratorCost contains statistics retrieved for explaining what potential +// cost may be incurred by instantiating an iterator. +type IteratorCost struct { + // The total number of shards that are touched by this query. + NumShards int64 + + // The total number of non-unique series that are accessed by this query. + // This number matches the number of cursors created by the query since + // one cursor is created for every series. + NumSeries int64 + + // The total number of non-unique files that may be accessed by this query. + // This will count the number of files accessed by each series so files + // will likely be double counted. + NumFiles int64 + + // The number of blocks that had the potential to be accessed. + BlocksRead int64 + + // The amount of data that can be potentially read. + BlockSize int64 +} + +// Combine combines the results of two IteratorCost structures into one. +func (c IteratorCost) Combine(other IteratorCost) IteratorCost { + return IteratorCost{ + NumShards: c.NumShards + other.NumShards, + NumSeries: c.NumSeries + other.NumSeries, + NumFiles: c.NumFiles + other.NumFiles, + BlocksRead: c.BlocksRead + other.BlocksRead, + BlockSize: c.BlockSize + other.BlockSize, + } +} + // floatFastDedupeIterator outputs unique points where the point has a single aux field. type floatFastDedupeIterator struct { input FloatIterator diff --git a/query/select.go b/query/select.go index d35f6962f8d..ef76ac7561f 100644 --- a/query/select.go +++ b/query/select.go @@ -56,6 +56,9 @@ type PreparedStatement interface { // Select creates the Iterators that will be used to read the query. Select() ([]Iterator, []string, error) + // Explain outputs the explain plan for this statement. + Explain() (string, error) + // Close closes the resources associated with this prepared statement. // This must be called as the mapped shards may hold open resources such // as network connections. diff --git a/query/select_test.go b/query/select_test.go index da08f800ed4..733e43e4d76 100644 --- a/query/select_test.go +++ b/query/select_test.go @@ -2785,6 +2785,10 @@ func (sh *ShardGroup) CreateIterator(m *influxql.Measurement, opt query.Iterator return sh.CreateIteratorFn(m, opt) } +func (sh *ShardGroup) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { + return query.IteratorCost{}, nil +} + func (sh *ShardGroup) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) diff --git a/tsdb/engine.go b/tsdb/engine.go index cd66691b87c..1976ca53ef8 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -44,6 +44,7 @@ type Engine interface { Import(r io.Reader, basePath string) error CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) WritePoints(points []models.Point) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 829324b7707..ce8edf60cdb 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2138,6 +2138,72 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt qu return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor) } +// IteratorCost produces the cost of an iterator. +func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + // Determine if this measurement exists. If it does not, then no shards are + // accessed to begin with. + if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { + return query.IteratorCost{}, err + } else if !exists { + return query.IteratorCost{}, nil + } + + // Determine all of the tag sets for this query. + tagSets, err := e.index.TagSets([]byte(measurement), opt) + if err != nil { + return query.IteratorCost{}, err + } + + // Attempt to retrieve the ref from the main expression (if it exists). + var ref *influxql.VarRef + if opt.Expr != nil { + if v, ok := opt.Expr.(*influxql.VarRef); ok { + ref = v + } else if call, ok := opt.Expr.(*influxql.Call); ok { + if len(call.Args) > 0 { + ref, _ = call.Args[0].(*influxql.VarRef) + } + } + } + + // Count the number of series concatenated from the tag set. + cost := query.IteratorCost{NumShards: 1} + for _, t := range tagSets { + cost.NumSeries += int64(len(t.SeriesKeys)) + for i, key := range t.SeriesKeys { + // Retrieve the cost for the main expression (if it exists). + if ref != nil { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + + // Retrieve the cost for every auxiliary field since these are also + // iterators that we may have to look through. + // We may want to separate these though as we are unlikely to incur + // anywhere close to the full costs of the auxiliary iterators because + // many of the selected values are usually skipped. + for _, ref := range opt.Aux { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + + // Retrieve the expression names in the condition (if there is a condition). + // We will also create cursors for these too. + if t.Filters[i] != nil { + refs := influxql.ExprNames(t.Filters[i]) + for _, ref := range refs { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + } + } + } + return cost, nil +} + func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) { return e.index.SeriesPointIterator(opt) } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index e09a422eb15..0aee0588cd6 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -15,6 +15,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" "github.com/uber-go/zap" ) @@ -472,6 +473,12 @@ func (f *FileStore) Read(key []byte, t int64) ([]Value, error) { return nil, nil } +func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost { + f.mu.RLock() + defer f.mu.RUnlock() + return f.cost(key, min, max) +} + // KeyCursor returns a KeyCursor for key and t across the files in the FileStore. func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor { f.mu.RLock() @@ -726,6 +733,47 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error { return nil } +// We need to determine the possible files that may be accessed by this query given +// the time range. +func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost { + var entries []IndexEntry + cost := query.IteratorCost{} + for _, fd := range f.files { + minTime, maxTime := fd.TimeRange() + if !(maxTime > min && minTime < max) { + continue + } + skipped := true + tombstones := fd.TombstoneRange(key) + + fd.ReadEntries(key, &entries) + ENTRIES: + for i := 0; i < len(entries); i++ { + ie := entries[i] + + if !(ie.MaxTime > min && ie.MinTime < max) { + continue + } + + // Skip any blocks only contain values that are tombstoned. + for _, t := range tombstones { + if t.Min <= ie.MinTime && t.Max >= ie.MaxTime { + continue ENTRIES + } + } + + cost.BlocksRead++ + cost.BlockSize += int64(ie.Size) + skipped = false + } + + if !skipped { + cost.NumFiles++ + } + } + return cost +} + // locations returns the files and index blocks for a key and time. ascending indicates // whether the key will be scan in ascending time order or descenging time order. // This function assumes the read-lock has been taken. @@ -735,7 +783,6 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { for _, fd := range f.files { minTime, maxTime := fd.TimeRange() - tombstones := fd.TombstoneRange(key) // If we ascending and the max time of the file is before where we want to start // skip it. if ascending && maxTime < t { @@ -745,6 +792,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { } else if !ascending && minTime > t { continue } + tombstones := fd.TombstoneRange(key) // This file could potential contain points we are looking for so find the blocks for // the given key. diff --git a/tsdb/shard.go b/tsdb/shard.go index e709bb41bbf..119437c6b4e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -8,6 +8,7 @@ import ( "math" "path/filepath" "regexp" + "runtime" "sort" "strings" "sync" @@ -18,6 +19,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/query" internal "github.com/influxdata/influxdb/tsdb/internal" "github.com/uber-go/zap" @@ -776,6 +778,14 @@ func (s *Shard) createSeriesIterator(opt query.IteratorOptions) (query.Iterator, return s.engine.SeriesPointIterator(opt) } +// IteratorCost returns the estimated cost of constructing and reading an iterator. +func (s *Shard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + if err := s.ready(); err != nil { + return query.IteratorCost{}, err + } + return s.engine.IteratorCost(measurement, opt) +} + // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { if err := s.ready(); err != nil { @@ -1008,6 +1018,7 @@ type ShardGroup interface { FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) MapType(measurement, field string) influxql.DataType CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSources(sources influxql.Sources) (influxql.Sources, error) } @@ -1106,6 +1117,46 @@ func (a Shards) CreateIterator(measurement string, opt query.IteratorOptions) (q return query.Iterators(itrs).Merge(opt) } +func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + var costs query.IteratorCost + var costerr error + var mu sync.RWMutex + + limit := limiter.NewFixed(runtime.GOMAXPROCS(0)) + var wg sync.WaitGroup + for _, sh := range a { + limit.Take() + wg.Add(1) + + mu.RLock() + if costerr != nil { + mu.RUnlock() + break + } + mu.RUnlock() + + go func(sh *Shard) { + defer limit.Release() + defer wg.Done() + + cost, err := sh.IteratorCost(measurement, opt) + + mu.Lock() + defer mu.Unlock() + + if err != nil { + if costerr == nil { + costerr = err + } + return + } + costs = costs.Combine(cost) + }(sh) + } + wg.Wait() + return costs, costerr +} + func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{}