From 55973d2815e48b5e86e84411206015feaa160ee3 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 10 Jun 2016 12:30:49 -0500 Subject: [PATCH 1/3] Separate the task manager from the query executor The task manager now acts as its own statement executor so that a custom statement executor can perform custom actions for KillQueryStatement and ShowQueriesStatement. --- cmd/influxd/run/server.go | 7 +- coordinator/statement_executor.go | 6 + influxql/query_executor.go | 236 ++------------------ influxql/query_executor_test.go | 34 ++- influxql/task_manager.go | 236 ++++++++++++++++++++ services/continuous_querier/service_test.go | 4 - services/httpd/handler_test.go | 4 - 7 files changed, 286 insertions(+), 241 deletions(-) create mode 100644 influxql/task_manager.go diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index c5271c00a0a..6feb02e334b 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -173,6 +173,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.QueryExecutor = influxql.NewQueryExecutor() s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{ MetaClient: s.MetaClient, + TaskManager: s.QueryExecutor.TaskManager, TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore}, Monitor: s.Monitor, PointsWriter: s.PointsWriter, @@ -180,9 +181,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN, MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN, } - s.QueryExecutor.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) - s.QueryExecutor.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) - s.QueryExecutor.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries + s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) + s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) + s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries // Initialize the monitor s.Monitor.Version = s.buildInfo.Version diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index e00aaa21d05..4b5ebee75e1 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -26,6 +26,9 @@ type pointsWriter interface { type StatementExecutor struct { MetaClient MetaClient + // TaskManager holds the StatementExecutor that handles task-related commands. + TaskManager influxql.StatementExecutor + // TSDB storage for local node. TSDBStore TSDBStore @@ -182,6 +185,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) } err = e.executeSetPasswordUserStatement(stmt) + case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement: + // Send query related statements to the task manager. + return e.TaskManager.ExecuteStatement(stmt, ctx) default: return influxql.ErrInvalidQuery } diff --git a/influxql/query_executor.go b/influxql/query_executor.go index afe5123f9b5..bbe38dc0b24 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -12,7 +12,6 @@ import ( "time" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/models" ) var ( @@ -42,12 +41,6 @@ var ( ErrQueryTimeoutReached = errors.New("query timeout reached") ) -const ( - // DefaultQueryTimeout is the default timeout for executing a query. - // A value of zero will have no query timeout. - DefaultQueryTimeout = time.Duration(0) -) - // Statistics for the QueryExecutor const ( statQueriesActive = "queriesActive" // Number of queries currently being executed @@ -101,7 +94,10 @@ type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the // results channel in the ExecutionContext. ExecuteStatement(stmt Statement, ctx *ExecutionContext) error +} +// StatementNormalizer normalizes a statement before it is executed. +type StatementNormalizer interface { // NormalizeStatement adds a default database and policy to the // measurements in the statement. NormalizeStatement(stmt Statement, database string) error @@ -112,26 +108,13 @@ type QueryExecutor struct { // Used for executing a statement in the query. StatementExecutor StatementExecutor - // Query execution timeout. - QueryTimeout time.Duration - - // Log queries if they are slower than this time. - // If zero, slow queries will never be logged. - LogQueriesAfter time.Duration - - // Maximum number of concurrent queries. - MaxConcurrentQueries int + // Used for tracking running queries. + TaskManager *TaskManager // Logger to use for all logging. // Defaults to discarding all log output. Logger *log.Logger - // Used for managing and tracking running queries. - queries map[uint64]*QueryTask - nextID uint64 - mu sync.RWMutex - shutdown bool - // expvar-based stats. statMap *expvar.Map } @@ -139,32 +122,22 @@ type QueryExecutor struct { // NewQueryExecutor returns a new instance of QueryExecutor. func NewQueryExecutor() *QueryExecutor { return &QueryExecutor{ - QueryTimeout: DefaultQueryTimeout, - Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags), - queries: make(map[uint64]*QueryTask), - nextID: 1, - statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), + TaskManager: NewTaskManager(), + Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags), + statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), } } // Close kills all running queries and prevents new queries from being attached. func (e *QueryExecutor) Close() error { - e.mu.Lock() - defer e.mu.Unlock() - - e.shutdown = true - for _, query := range e.queries { - query.setError(ErrQueryEngineShutdown) - close(query.closing) - } - e.queries = nil - return nil + return e.TaskManager.Close() } // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (e *QueryExecutor) SetLogOutput(w io.Writer) { e.Logger = log.New(w, "[query] ", log.LstdFlags) + e.TaskManager.Logger = e.Logger } // ExecuteQuery executes each statement within a query. @@ -184,12 +157,12 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds()) }(time.Now()) - qid, task, err := e.attachQuery(query, opt.Database, closing) + qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) if err != nil { results <- &Result{Err: err} return } - defer e.killQuery(qid) + defer e.TaskManager.KillQuery(qid) // Setup the execution context that will be used when executing statements. ctx := ExecutionContext{ @@ -202,7 +175,6 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing } var i int -loop: for ; i < len(query.Statements); i++ { ctx.StatementID = i stmt := query.Statements[i] @@ -224,49 +196,17 @@ loop: } stmt = newStmt - // Normalize each statement. - if err := e.StatementExecutor.NormalizeStatement(stmt, defaultDB); err != nil { - results <- &Result{Err: err} - break + // Normalize each statement if possible. + if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { + if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil { + results <- &Result{Err: err} + break + } } // Log each normalized statement. e.Logger.Println(stmt.String()) - // Handle a query management queries specially so they don't go - // to the underlying statement executor. - switch stmt := stmt.(type) { - case *ShowQueriesStatement: - rows, err := e.executeShowQueriesStatement(stmt) - results <- &Result{ - StatementID: i, - Series: rows, - Err: err, - } - - if err != nil { - break loop - } - continue loop - case *KillQueryStatement: - var messages []*Message - if ctx.ReadOnly { - messages = append(messages, ReadOnlyWarning(stmt.String())) - } - - err := e.executeKillQueryStatement(stmt) - results <- &Result{ - StatementID: i, - Messages: messages, - Err: err, - } - - if err != nil { - break loop - } - continue loop - } - // Send any other statements to the underlying statement executor. err = e.StatementExecutor.ExecuteStatement(stmt, &ctx) if err == ErrQueryInterrupted { @@ -307,146 +247,6 @@ func (e *QueryExecutor) recover(query *Query, results chan *Result) { } } -func (e *QueryExecutor) executeKillQueryStatement(stmt *KillQueryStatement) error { - return e.killQuery(stmt.QueryID) -} - -func (e *QueryExecutor) executeShowQueriesStatement(q *ShowQueriesStatement) (models.Rows, error) { - e.mu.RLock() - defer e.mu.RUnlock() - - now := time.Now() - - values := make([][]interface{}, 0, len(e.queries)) - for id, qi := range e.queries { - d := now.Sub(qi.startTime) - - var ds string - if d == 0 { - ds = "0s" - } else if d < time.Second { - ds = fmt.Sprintf("%du", d) - } else { - ds = (d - (d % time.Second)).String() - } - values = append(values, []interface{}{id, qi.query, qi.database, ds}) - } - - return []*models.Row{{ - Columns: []string{"qid", "query", "database", "duration"}, - Values: values, - }}, nil -} - -func (e *QueryExecutor) query(qid uint64) (*QueryTask, bool) { - e.mu.RLock() - query, ok := e.queries[qid] - e.mu.RUnlock() - return query, ok -} - -// attachQuery attaches a running query to be managed by the QueryExecutor. -// Returns the query id of the newly attached query or an error if it was -// unable to assign a query id or attach the query to the QueryExecutor. -// This function also returns a channel that will be closed when this -// query finishes running. -// -// After a query finishes running, the system is free to reuse a query id. -func (e *QueryExecutor) attachQuery(q *Query, database string, interrupt <-chan struct{}) (uint64, *QueryTask, error) { - e.mu.Lock() - defer e.mu.Unlock() - - if e.shutdown { - return 0, nil, ErrQueryEngineShutdown - } - - if e.MaxConcurrentQueries > 0 && len(e.queries) >= e.MaxConcurrentQueries { - return 0, nil, ErrMaxConcurrentQueriesReached - } - - qid := e.nextID - query := &QueryTask{ - query: q.String(), - database: database, - startTime: time.Now(), - closing: make(chan struct{}), - monitorCh: make(chan error), - } - e.queries[qid] = query - - go e.waitForQuery(qid, query.closing, interrupt, query.monitorCh) - if e.LogQueriesAfter != 0 { - go query.monitor(func(closing <-chan struct{}) error { - t := time.NewTimer(e.LogQueriesAfter) - defer t.Stop() - - select { - case <-t.C: - e.Logger.Printf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)", - query.query, qid, query.database, e.LogQueriesAfter) - case <-closing: - } - return nil - }) - } - e.nextID++ - return qid, query, nil -} - -// killQuery stops and removes a query from the QueryExecutor. -// This method can be used to forcefully terminate a running query. -func (e *QueryExecutor) killQuery(qid uint64) error { - e.mu.Lock() - defer e.mu.Unlock() - - query, ok := e.queries[qid] - if !ok { - return fmt.Errorf("no such query id: %d", qid) - } - - close(query.closing) - delete(e.queries, qid) - return nil -} - -func (e *QueryExecutor) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) { - var timer <-chan time.Time - if e.QueryTimeout != 0 { - t := time.NewTimer(e.QueryTimeout) - timer = t.C - defer t.Stop() - } - - select { - case <-closing: - query, ok := e.query(qid) - if !ok { - break - } - query.setError(ErrQueryInterrupted) - case err := <-monitorCh: - if err == nil { - break - } - - query, ok := e.query(qid) - if !ok { - break - } - query.setError(err) - case <-timer: - query, ok := e.query(qid) - if !ok { - break - } - query.setError(ErrQueryTimeoutReached) - case <-interrupt: - // Query was manually closed so exit the select. - return - } - e.killQuery(qid) -} - // QueryMonitorFunc is a function that will be called to check if a query // is currently healthy. If the query needs to be interrupted for some reason, // the error should be returned by this function. diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 9c605dadf11..77ef1a7fea0 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -19,8 +19,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ return e.ExecuteStatementFn(stmt, ctx) } -func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error { - return nil +func NewQueryExecutor() *influxql.QueryExecutor { + return influxql.NewQueryExecutor() } func TestQueryExecutor_AttachQuery(t *testing.T) { @@ -29,7 +29,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { t.Fatal(err) } - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { if ctx.QueryID != 1 { @@ -50,9 +50,14 @@ func TestQueryExecutor_KillQuery(t *testing.T) { qid := make(chan uint64) - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + switch stmt.(type) { + case *influxql.KillQueryStatement: + return e.TaskManager.ExecuteStatement(stmt, ctx) + } + qid <- ctx.QueryID select { case <-ctx.InterruptCh: @@ -83,7 +88,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) { t.Fatal(err) } - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { select { @@ -111,9 +116,14 @@ func TestQueryExecutor_ShowQueries(t *testing.T) { t.Fatal(err) } - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + switch stmt.(type) { + case *influxql.ShowQueriesStatement: + return e.TaskManager.ExecuteStatement(stmt, ctx) + } + t.Errorf("unexpected statement: %s", stmt) return errUnexpected }, @@ -140,7 +150,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { t.Fatal(err) } - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { select { @@ -152,7 +162,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } }, } - e.QueryTimeout = time.Nanosecond + e.TaskManager.QueryTimeout = time.Nanosecond results := e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil) result := <-results @@ -169,7 +179,7 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { qid := make(chan uint64) - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { qid <- ctx.QueryID @@ -177,7 +187,7 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { return influxql.ErrQueryInterrupted }, } - e.MaxConcurrentQueries = 1 + e.TaskManager.MaxConcurrentQueries = 1 defer e.Close() // Start first query and wait for it to be executing. @@ -209,7 +219,7 @@ func TestQueryExecutor_Close(t *testing.T) { ch1 := make(chan struct{}) ch2 := make(chan struct{}) - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { close(ch1) @@ -256,7 +266,7 @@ func TestQueryExecutor_Panic(t *testing.T) { t.Fatal(err) } - e := influxql.NewQueryExecutor() + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { panic("test error") diff --git a/influxql/task_manager.go b/influxql/task_manager.go new file mode 100644 index 00000000000..2d174d1588b --- /dev/null +++ b/influxql/task_manager.go @@ -0,0 +1,236 @@ +package influxql + +import ( + "fmt" + "io/ioutil" + "log" + "sync" + "time" + + "github.com/influxdata/influxdb/models" +) + +const ( + // DefaultQueryTimeout is the default timeout for executing a query. + // A value of zero will have no query timeout. + DefaultQueryTimeout = time.Duration(0) +) + +// TaskManager takes care of all aspects related to managing running queries. +type TaskManager struct { + // Query execution timeout. + QueryTimeout time.Duration + + // Log queries if they are slower than this time. + // If zero, slow queries will never be logged. + LogQueriesAfter time.Duration + + // Maximum number of concurrent queries. + MaxConcurrentQueries int + + // Logger to use for all logging. + // Defaults to discarding all log output. + Logger *log.Logger + + // Used for managing and tracking running queries. + queries map[uint64]*QueryTask + nextID uint64 + mu sync.RWMutex + shutdown bool +} + +// NewTaskManager creates a new TaskManager. +func NewTaskManager() *TaskManager { + return &TaskManager{ + QueryTimeout: DefaultQueryTimeout, + Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags), + queries: make(map[uint64]*QueryTask), + nextID: 1, + } +} + +// ExecuteStatement executes a statement containing one of the task management queries. +func (t *TaskManager) ExecuteStatement(stmt Statement, ctx *ExecutionContext) error { + switch stmt := stmt.(type) { + case *ShowQueriesStatement: + rows, err := t.executeShowQueriesStatement(stmt) + if err != nil { + return err + } + + ctx.Results <- &Result{ + StatementID: ctx.StatementID, + Series: rows, + } + case *KillQueryStatement: + var messages []*Message + if ctx.ReadOnly { + messages = append(messages, ReadOnlyWarning(stmt.String())) + } + + if err := t.executeKillQueryStatement(stmt); err != nil { + return err + } + ctx.Results <- &Result{ + StatementID: ctx.StatementID, + Messages: messages, + } + default: + return ErrInvalidQuery + } + return nil +} + +func (t *TaskManager) executeKillQueryStatement(stmt *KillQueryStatement) error { + return t.KillQuery(stmt.QueryID) +} + +func (t *TaskManager) executeShowQueriesStatement(q *ShowQueriesStatement) (models.Rows, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + now := time.Now() + + values := make([][]interface{}, 0, len(t.queries)) + for id, qi := range t.queries { + d := now.Sub(qi.startTime) + + var ds string + if d == 0 { + ds = "0s" + } else if d < time.Second { + ds = fmt.Sprintf("%du", d) + } else { + ds = (d - (d % time.Second)).String() + } + values = append(values, []interface{}{id, qi.query, qi.database, ds}) + } + + return []*models.Row{{ + Columns: []string{"qid", "query", "database", "duration"}, + Values: values, + }}, nil +} + +func (t *TaskManager) query(qid uint64) (*QueryTask, bool) { + t.mu.RLock() + query, ok := t.queries[qid] + t.mu.RUnlock() + return query, ok +} + +// AttachQuery attaches a running query to be managed by the TaskManager. +// Returns the query id of the newly attached query or an error if it was +// unable to assign a query id or attach the query to the TaskManager. +// This function also returns a channel that will be closed when this +// query finishes running. +// +// After a query finishes running, the system is free to reuse a query id. +func (t *TaskManager) AttachQuery(q *Query, database string, interrupt <-chan struct{}) (uint64, *QueryTask, error) { + t.mu.Lock() + defer t.mu.Unlock() + + if t.shutdown { + return 0, nil, ErrQueryEngineShutdown + } + + if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries { + return 0, nil, ErrMaxConcurrentQueriesReached + } + + qid := t.nextID + query := &QueryTask{ + query: q.String(), + database: database, + startTime: time.Now(), + closing: make(chan struct{}), + monitorCh: make(chan error), + } + t.queries[qid] = query + + go t.waitForQuery(qid, query.closing, interrupt, query.monitorCh) + if t.LogQueriesAfter != 0 { + go query.monitor(func(closing <-chan struct{}) error { + timer := time.NewTimer(t.LogQueriesAfter) + defer timer.Stop() + + select { + case <-timer.C: + t.Logger.Printf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)", + query.query, qid, query.database, t.LogQueriesAfter) + case <-closing: + } + return nil + }) + } + t.nextID++ + return qid, query, nil +} + +// KillQuery stops and removes a query from the TaskManager. +// This method can be used to forcefully terminate a running query. +func (t *TaskManager) KillQuery(qid uint64) error { + t.mu.Lock() + defer t.mu.Unlock() + + query, ok := t.queries[qid] + if !ok { + return fmt.Errorf("no such query id: %d", qid) + } + + close(query.closing) + delete(t.queries, qid) + return nil +} + +func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) { + var timerCh <-chan time.Time + if t.QueryTimeout != 0 { + timer := time.NewTimer(t.QueryTimeout) + timerCh = timer.C + defer timer.Stop() + } + + select { + case <-closing: + query, ok := t.query(qid) + if !ok { + break + } + query.setError(ErrQueryInterrupted) + case err := <-monitorCh: + if err == nil { + break + } + + query, ok := t.query(qid) + if !ok { + break + } + query.setError(err) + case <-timerCh: + query, ok := t.query(qid) + if !ok { + break + } + query.setError(ErrQueryTimeoutReached) + case <-interrupt: + // Query was manually closed so exit the select. + return + } + t.KillQuery(qid) +} + +// Close kills all running queries and prevents new queries from being attached. +func (t *TaskManager) Close() error { + t.mu.Lock() + defer t.mu.Unlock() + + t.shutdown = true + for _, query := range t.queries { + query.setError(ErrQueryEngineShutdown) + close(query.closing) + } + t.queries = nil + return nil +} diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index ccb6b2c49f3..ec9815c7355 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -490,10 +490,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ return e.ExecuteStatementFn(stmt, ctx) } -func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error { - return nil -} - // NewQueryExecutor returns a *QueryExecutor. func NewQueryExecutor(t *testing.T) *QueryExecutor { e := influxql.NewQueryExecutor() diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index f61f19d1bb3..2417c8ff4e6 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -486,10 +486,6 @@ func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx return e.ExecuteStatementFn(stmt, ctx) } -func (e *HandlerStatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error { - return nil -} - // HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer. type HandlerQueryAuthorizer struct { AuthorizeQueryFn func(u *meta.UserInfo, query *influxql.Query, database string) error From a6147fa685f566cc92fa455ebf9a015911c8ed01 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 10 Jun 2016 12:31:46 -0500 Subject: [PATCH 2/3] Public method to return query information for running queries --- influxql/task_manager.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/influxql/task_manager.go b/influxql/task_manager.go index 2d174d1588b..6d6eec46608 100644 --- a/influxql/task_manager.go +++ b/influxql/task_manager.go @@ -183,6 +183,32 @@ func (t *TaskManager) KillQuery(qid uint64) error { return nil } +// QueryInfo represents the information for a query. +type QueryInfo struct { + ID uint64 `json:"id"` + Query string `json:"query"` + Database string `json:"database"` + Duration time.Duration `json:"duration"` +} + +// Queries returns a list of all running queries with information about them. +func (t *TaskManager) Queries() []QueryInfo { + t.mu.RLock() + defer t.mu.RUnlock() + + now := time.Now() + queries := make([]QueryInfo, 0, len(t.queries)) + for id, qi := range t.queries { + queries = append(queries, QueryInfo{ + ID: id, + Query: qi.query, + Database: qi.database, + Duration: now.Sub(qi.startTime), + }) + } + return queries +} + func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) { var timerCh <-chan time.Time if t.QueryTimeout != 0 { From 9db82e6bf0f5059001f1521ec8859b104a56b86b Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 7 Jun 2016 12:28:33 -0500 Subject: [PATCH 3/3] Switch ExecutionContext to be passed by value --- coordinator/statement_executor.go | 4 ++-- influxql/query_executor.go | 4 ++-- influxql/query_executor_test.go | 20 ++++++++++---------- influxql/task_manager.go | 2 +- services/continuous_querier/service_test.go | 18 +++++++++--------- services/httpd/handler_test.go | 20 ++++++++++---------- 6 files changed, 34 insertions(+), 34 deletions(-) diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 4b5ebee75e1..e9e830abab4 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -44,10 +44,10 @@ type StatementExecutor struct { MaxSelectBucketsN int } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error { // Select statements are handled separately so that they can be streamed. if stmt, ok := stmt.(*influxql.SelectStatement); ok { - return e.executeSelectStatement(stmt, ctx) + return e.executeSelectStatement(stmt, &ctx) } var rows models.Rows diff --git a/influxql/query_executor.go b/influxql/query_executor.go index bbe38dc0b24..84f2b29ae6c 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -93,7 +93,7 @@ type ExecutionContext struct { type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the // results channel in the ExecutionContext. - ExecuteStatement(stmt Statement, ctx *ExecutionContext) error + ExecuteStatement(stmt Statement, ctx ExecutionContext) error } // StatementNormalizer normalizes a statement before it is executed. @@ -208,7 +208,7 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing e.Logger.Println(stmt.String()) // Send any other statements to the underlying statement executor. - err = e.StatementExecutor.ExecuteStatement(stmt, &ctx) + err = e.StatementExecutor.ExecuteStatement(stmt, ctx) if err == ErrQueryInterrupted { // Query was interrupted so retrieve the real interrupt error from // the query task if there is one. diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 77ef1a7fea0..6deda3e699f 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -12,10 +12,10 @@ import ( var errUnexpected = errors.New("unexpected error") type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } @@ -31,7 +31,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { if ctx.QueryID != 1 { t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID) } @@ -52,7 +52,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -90,7 +90,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { select { case <-ctx.InterruptCh: return influxql.ErrQueryInterrupted @@ -118,7 +118,7 @@ func TestQueryExecutor_ShowQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { switch stmt.(type) { case *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -152,7 +152,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { select { case <-ctx.InterruptCh: return influxql.ErrQueryInterrupted @@ -181,7 +181,7 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { qid <- ctx.QueryID <-ctx.InterruptCh return influxql.ErrQueryInterrupted @@ -221,7 +221,7 @@ func TestQueryExecutor_Close(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { close(ch1) <-ctx.InterruptCh close(ch2) @@ -268,7 +268,7 @@ func TestQueryExecutor_Panic(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { panic("test error") }, } diff --git a/influxql/task_manager.go b/influxql/task_manager.go index 6d6eec46608..0ec44e69d60 100644 --- a/influxql/task_manager.go +++ b/influxql/task_manager.go @@ -50,7 +50,7 @@ func NewTaskManager() *TaskManager { } // ExecuteStatement executes a statement containing one of the task management queries. -func (t *TaskManager) ExecuteStatement(stmt Statement, ctx *ExecutionContext) error { +func (t *TaskManager) ExecuteStatement(stmt Statement, ctx ExecutionContext) error { switch stmt := stmt.(type) { case *ShowQueriesStatement: rows, err := t.executeShowQueriesStatement(stmt) diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index ec9815c7355..a4f020f2937 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { callCnt++ if callCnt >= expectCallCnt { done <- struct{}{} @@ -120,7 +120,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { callCnt++ if callCnt >= expectCallCnt { done <- struct{}{} @@ -184,7 +184,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { // Set a callback for ExecuteQuery. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { callCnt++ if callCnt >= expectCallCnt { done <- struct{}{} @@ -239,7 +239,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { done := make(chan struct{}) // Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { done <- struct{}{} ctx.Results <- &influxql.Result{Err: errUnexpected} return nil @@ -266,7 +266,7 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) { done := make(chan struct{}) // Set ExecuteQuery callback, which shouldn't get called because of meta store failure. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { done <- struct{}{} ctx.Results <- &influxql.Result{Err: errUnexpected} return nil @@ -287,7 +287,7 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) { func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return errUnexpected }, } @@ -320,7 +320,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return errExpected }, } @@ -483,10 +483,10 @@ type QueryExecutor struct { // StatementExecutor is a mock statement executor. type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 2417c8ff4e6..e402fe5357b 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -23,7 +23,7 @@ import ( // Ensure the handler returns results from a query (including nil results). func TestHandler_Query(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -86,7 +86,7 @@ func TestHandler_Query_Auth(t *testing.T) { } // Set mock statement executor for handler to use. - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -178,7 +178,7 @@ func TestHandler_Query_Auth(t *testing.T) { // Ensure the handler returns results from a query (including nil results). func TestHandler_QueryRegex(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `test` { @@ -195,7 +195,7 @@ func TestHandler_QueryRegex(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -213,7 +213,7 @@ func TestHandler_Query_MergeResults(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeEmptyResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows{}} ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -231,7 +231,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) { // Ensure the handler can parse chunked and chunk size query parameters. func TestHandler_Query_Chunked(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { if ctx.ChunkSize != 2 { t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) } @@ -292,7 +292,7 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) { // Ensure the handler returns a status 200 if an error is returned in the result. func TestHandler_Query_ErrResult(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return errors.New("measurement not found") } @@ -323,7 +323,7 @@ func TestHandler_Ping(t *testing.T) { // Ensure the handler returns the version correctly from the different endpoints. func TestHandler_Version(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return nil } tests := []struct { @@ -479,10 +479,10 @@ func (s *HandlerMetaStore) User(username string) (*meta.UserInfo, error) { // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error } -func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error { +func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) }