Skip to content

Commit

Permalink
Merge pull request #8947 from influxdata/sgc-explain
Browse files Browse the repository at this point in the history
EXPLAIN ANALYZE implementation
  • Loading branch information
stuartcarnie authored Oct 20, 2017
2 parents 4cdb782 + e931387 commit 618f0d0
Show file tree
Hide file tree
Showing 64 changed files with 3,997 additions and 438 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce
github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
github.com/xlab/treeprint 06dfc6fa17cdde904617990a0c2d89e3e332dbb3
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
1 change: 1 addition & 0 deletions LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
- github.com/uber-go/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
- github.com/xlab/treeprint [MIT LICENSE](https://github.com/xlab/treeprint/blob/master/LICENSE)
7 changes: 4 additions & 3 deletions coordinator/shard_mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"io"
"time"

Expand Down Expand Up @@ -160,7 +161,7 @@ func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influ
return typ
}

func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
Expand All @@ -184,7 +185,7 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
inputs := make([]query.Iterator, 0, len(measurements))
if err := func() error {
for _, measurement := range measurements {
input, err := sg.CreateIterator(measurement, opt)
input, err := sg.CreateIterator(ctx, measurement, opt)
if err != nil {
return err
}
Expand All @@ -197,7 +198,7 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
}
return query.Iterators(inputs).Merge(opt)
}
return sg.CreateIterator(m.Name, opt)
return sg.CreateIterator(ctx, m.Name, opt)
}

func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
Expand Down
7 changes: 4 additions & 3 deletions coordinator/shard_mapper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator_test

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestLocalShardMapper(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(measurement string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
if measurement != "cpu" {
t.Errorf("unexpected measurement: %s", measurement)
}
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestLocalShardMapper(t *testing.T) {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

if _, err := ic.CreateIterator(measurement, query.IteratorOptions{}); err != nil {
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{}); err != nil {
t.Fatalf("unexpected error: %s", err)
}

Expand All @@ -97,7 +98,7 @@ func TestLocalShardMapper(t *testing.T) {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

if _, err := ic.CreateIterator(measurement, query.IteratorOptions{}); err != nil {
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
129 changes: 100 additions & 29 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/pkg/tracing"
"github.com/influxdata/influxdb/pkg/tracing/fields"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -56,7 +59,7 @@ type StatementExecutor struct {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(stmt, &ctx)
return e.executeSelectStatement(context.Background(), stmt, &ctx)
}

var rows models.Rows
Expand Down Expand Up @@ -136,7 +139,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
}
err = e.executeDropUserStatement(stmt)
case *influxql.ExplainStatement:
rows, err = e.executeExplainStatement(stmt, &ctx)
if stmt.Analyze {
rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx)
} else {
rows, err = e.executeExplainStatement(stmt, &ctx)
}
case *influxql.GrantStatement:
if ctx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
Expand Down Expand Up @@ -401,17 +408,13 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
return e.MetaClient.DropUser(q.Name)
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
if q.Analyze {
return nil, errors.New("analyze is currently unimplemented")
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
InterruptCh: ectx.InterruptCh,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
Authorizer: ectx.Authorizer,
}

// Prepare the query for execution, but do not actually execute it.
Expand All @@ -437,6 +440,74 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement
return models.Rows{row}, nil
}

func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
stmt := q.Statement
t, span := tracing.NewTrace("select")
ctx := tracing.NewContextWithTrace(context.Background(), t)
ctx = tracing.NewContextWithSpan(ctx, span)
start := time.Now()

itrs, columns, err := e.createIterators(ctx, stmt, ectx)
if err != nil {
return nil, err
}

iterTime := time.Since(start)

// Generate a row emitter from the iterator set.
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize)
em.Columns = columns
if stmt.Location != nil {
em.Location = stmt.Location
}
em.OmitTime = stmt.OmitTime
em.EmitName = stmt.EmitName

// Emit rows to the results channel.
var writeN int64
for {
var row *models.Row
row, _, err = em.Emit()
if err != nil {
goto CLEANUP
} else if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-ectx.InterruptCh:
err = query.ErrQueryInterrupted
goto CLEANUP
default:
}
break
}

writeN += int64(len(row.Values))
}

CLEANUP:
em.Close()
if err != nil {
return nil, err
}

totalTime := time.Since(start)
span.MergeFields(
fields.Duration("total_time", totalTime),
fields.Duration("planning_time", iterTime),
fields.Duration("execution_time", totalTime-iterTime),
)
span.Finish()

row := &models.Row{
Columns: []string{"EXPLAIN ANALYZE"},
}
for _, s := range strings.Split(t.Tree().String(), "\n") {
row.Values = append(row.Values, []interface{}{s})
}

return models.Rows{row}, nil
}

func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)
}
Expand Down Expand Up @@ -469,14 +540,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
return e.MetaClient.UpdateUser(q.Name, q.Password)
}

func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
itrs, columns, err := e.createIterators(stmt, ctx)
func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error {
itrs, columns, err := e.createIterators(ctx, stmt, ectx)
if err != nil {
return err
}

// Generate a row emitter from the iterator set.
em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize)
em.Columns = columns
if stmt.Location != nil {
em.Location = stmt.Location
Expand All @@ -501,7 +572,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
} else if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-ctx.InterruptCh:
case <-ectx.InterruptCh:
return query.ErrQueryInterrupted
default:
}
Expand All @@ -518,13 +589,13 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

result := &query.Result{
StatementID: ctx.StatementID,
StatementID: ectx.StatementID,
Series: []*models.Row{row},
Partial: partial,
}

// Send results or exit if closing.
if err := ctx.Send(result); err != nil {
if err := ectx.Send(result); err != nil {
return err
}

Expand All @@ -538,12 +609,12 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

var messages []*query.Message
if ctx.ReadOnly {
if ectx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
}

return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
return ectx.Send(&query.Result{
StatementID: ectx.StatementID,
Messages: messages,
Series: []*models.Row{{
Name: "result",
Expand All @@ -555,33 +626,33 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen

// Always emit at least one result.
if !emitted {
return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
return ectx.Send(&query.Result{
StatementID: ectx.StatementID,
Series: make([]*models.Row, 0),
})
}

return nil
}

func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) ([]query.Iterator, []string, error) {
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) {
opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
InterruptCh: ectx.InterruptCh,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
Authorizer: ectx.Authorizer,
}

// Create a set of iterators from a selection.
itrs, columns, err := query.Select(stmt, e.ShardMapper, opt)
itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt)
if err != nil {
return nil, nil, err
}

if e.MaxSelectPointN > 0 {
monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN)
ctx.Query.Monitor(monitor)
ectx.Query.Monitor(monitor)
}
return itrs, columns, nil
}
Expand Down Expand Up @@ -1073,8 +1144,8 @@ func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultD
case *influxql.Measurement:
switch stmt.(type) {
case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
// DB and RP not supported by these statements so don't rewrite into invalid
// statements
// DB and RP not supported by these statements so don't rewrite into invalid
// statements
default:
err = e.normalizeMeasurement(node, defaultDatabase)
}
Expand Down
11 changes: 6 additions & 5 deletions coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator_test

import (
"bytes"
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(m string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error) {
return &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}},
{Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}},
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(m string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error) {
return &FloatIterator{
Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
}, nil
Expand Down Expand Up @@ -384,7 +385,7 @@ func (s *TSDBStore) TagValues(_ query.Authorizer, database string, cond influxql
type MockShard struct {
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error)
CreateIteratorFn func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}
Expand Down Expand Up @@ -417,8 +418,8 @@ func (sh *MockShard) MapType(measurement, field string) influxql.DataType {
return influxql.Unknown
}

func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) {
return sh.CreateIteratorFn(measurement, opt)
func (sh *MockShard) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
return sh.CreateIteratorFn(ctx, measurement, opt)
}

func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/metrics/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metrics

import "context"

type key int

const (
groupKey key = iota
)

// NewContextWithGroup returns a new context with the given Group added.
func NewContextWithGroup(ctx context.Context, c *Group) context.Context {
return context.WithValue(ctx, groupKey, c)
}

// GroupFromContext returns the Group associated with ctx or nil if no Group has been assigned.
func GroupFromContext(ctx context.Context) *Group {
c, _ := ctx.Value(groupKey).(*Group)
return c
}
Loading

0 comments on commit 618f0d0

Please sign in to comment.