Skip to content

Commit

Permalink
Merge pull request #6806 from influxdata/js-separate-task-manager
Browse files Browse the repository at this point in the history
Refactor the TaskManager to be separate from the QueryExecutor
  • Loading branch information
jsternberg authored Jun 10, 2016
2 parents 48692a1 + 9db82e6 commit bdd15be
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 274 deletions.
7 changes: 4 additions & 3 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,17 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor = influxql.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
}
s.QueryExecutor.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
s.QueryExecutor.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
s.QueryExecutor.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries

// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
Expand Down
10 changes: 8 additions & 2 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type pointsWriter interface {
type StatementExecutor struct {
MetaClient MetaClient

// TaskManager holds the StatementExecutor that handles task-related commands.
TaskManager influxql.StatementExecutor

// TSDB storage for local node.
TSDBStore TSDBStore

Expand All @@ -41,10 +44,10 @@ type StatementExecutor struct {
MaxSelectBucketsN int
}

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(stmt, ctx)
return e.executeSelectStatement(stmt, &ctx)
}

var rows models.Rows
Expand Down Expand Up @@ -182,6 +185,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeSetPasswordUserStatement(stmt)
case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement:
// Send query related statements to the task manager.
return e.TaskManager.ExecuteStatement(stmt, ctx)
default:
return influxql.ErrInvalidQuery
}
Expand Down
240 changes: 20 additions & 220 deletions influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
)

var (
Expand Down Expand Up @@ -42,12 +41,6 @@ var (
ErrQueryTimeoutReached = errors.New("query timeout reached")
)

const (
// DefaultQueryTimeout is the default timeout for executing a query.
// A value of zero will have no query timeout.
DefaultQueryTimeout = time.Duration(0)
)

// Statistics for the QueryExecutor
const (
statQueriesActive = "queriesActive" // Number of queries currently being executed
Expand Down Expand Up @@ -100,8 +93,11 @@ type ExecutionContext struct {
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
// results channel in the ExecutionContext.
ExecuteStatement(stmt Statement, ctx *ExecutionContext) error
ExecuteStatement(stmt Statement, ctx ExecutionContext) error
}

// StatementNormalizer normalizes a statement before it is executed.
type StatementNormalizer interface {
// NormalizeStatement adds a default database and policy to the
// measurements in the statement.
NormalizeStatement(stmt Statement, database string) error
Expand All @@ -112,59 +108,36 @@ type QueryExecutor struct {
// Used for executing a statement in the query.
StatementExecutor StatementExecutor

// Query execution timeout.
QueryTimeout time.Duration

// Log queries if they are slower than this time.
// If zero, slow queries will never be logged.
LogQueriesAfter time.Duration

// Maximum number of concurrent queries.
MaxConcurrentQueries int
// Used for tracking running queries.
TaskManager *TaskManager

// Logger to use for all logging.
// Defaults to discarding all log output.
Logger *log.Logger

// Used for managing and tracking running queries.
queries map[uint64]*QueryTask
nextID uint64
mu sync.RWMutex
shutdown bool

// expvar-based stats.
statMap *expvar.Map
}

// NewQueryExecutor returns a new instance of QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
QueryTimeout: DefaultQueryTimeout,
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
queries: make(map[uint64]*QueryTask),
nextID: 1,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
TaskManager: NewTaskManager(),
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
}
}

// Close kills all running queries and prevents new queries from being attached.
func (e *QueryExecutor) Close() error {
e.mu.Lock()
defer e.mu.Unlock()

e.shutdown = true
for _, query := range e.queries {
query.setError(ErrQueryEngineShutdown)
close(query.closing)
}
e.queries = nil
return nil
return e.TaskManager.Close()
}

// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (e *QueryExecutor) SetLogOutput(w io.Writer) {
e.Logger = log.New(w, "[query] ", log.LstdFlags)
e.TaskManager.Logger = e.Logger
}

// ExecuteQuery executes each statement within a query.
Expand All @@ -184,12 +157,12 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())

qid, task, err := e.attachQuery(query, opt.Database, closing)
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
if err != nil {
results <- &Result{Err: err}
return
}
defer e.killQuery(qid)
defer e.TaskManager.KillQuery(qid)

// Setup the execution context that will be used when executing statements.
ctx := ExecutionContext{
Expand All @@ -202,7 +175,6 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
}

var i int
loop:
for ; i < len(query.Statements); i++ {
ctx.StatementID = i
stmt := query.Statements[i]
Expand All @@ -224,51 +196,19 @@ loop:
}
stmt = newStmt

// Normalize each statement.
if err := e.StatementExecutor.NormalizeStatement(stmt, defaultDB); err != nil {
results <- &Result{Err: err}
break
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil {
results <- &Result{Err: err}
break
}
}

// Log each normalized statement.
e.Logger.Println(stmt.String())

// Handle a query management queries specially so they don't go
// to the underlying statement executor.
switch stmt := stmt.(type) {
case *ShowQueriesStatement:
rows, err := e.executeShowQueriesStatement(stmt)
results <- &Result{
StatementID: i,
Series: rows,
Err: err,
}

if err != nil {
break loop
}
continue loop
case *KillQueryStatement:
var messages []*Message
if ctx.ReadOnly {
messages = append(messages, ReadOnlyWarning(stmt.String()))
}

err := e.executeKillQueryStatement(stmt)
results <- &Result{
StatementID: i,
Messages: messages,
Err: err,
}

if err != nil {
break loop
}
continue loop
}

// Send any other statements to the underlying statement executor.
err = e.StatementExecutor.ExecuteStatement(stmt, &ctx)
err = e.StatementExecutor.ExecuteStatement(stmt, ctx)
if err == ErrQueryInterrupted {
// Query was interrupted so retrieve the real interrupt error from
// the query task if there is one.
Expand Down Expand Up @@ -307,146 +247,6 @@ func (e *QueryExecutor) recover(query *Query, results chan *Result) {
}
}

func (e *QueryExecutor) executeKillQueryStatement(stmt *KillQueryStatement) error {
return e.killQuery(stmt.QueryID)
}

func (e *QueryExecutor) executeShowQueriesStatement(q *ShowQueriesStatement) (models.Rows, error) {
e.mu.RLock()
defer e.mu.RUnlock()

now := time.Now()

values := make([][]interface{}, 0, len(e.queries))
for id, qi := range e.queries {
d := now.Sub(qi.startTime)

var ds string
if d == 0 {
ds = "0s"
} else if d < time.Second {
ds = fmt.Sprintf("%du", d)
} else {
ds = (d - (d % time.Second)).String()
}
values = append(values, []interface{}{id, qi.query, qi.database, ds})
}

return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Values: values,
}}, nil
}

func (e *QueryExecutor) query(qid uint64) (*QueryTask, bool) {
e.mu.RLock()
query, ok := e.queries[qid]
e.mu.RUnlock()
return query, ok
}

// attachQuery attaches a running query to be managed by the QueryExecutor.
// Returns the query id of the newly attached query or an error if it was
// unable to assign a query id or attach the query to the QueryExecutor.
// This function also returns a channel that will be closed when this
// query finishes running.
//
// After a query finishes running, the system is free to reuse a query id.
func (e *QueryExecutor) attachQuery(q *Query, database string, interrupt <-chan struct{}) (uint64, *QueryTask, error) {
e.mu.Lock()
defer e.mu.Unlock()

if e.shutdown {
return 0, nil, ErrQueryEngineShutdown
}

if e.MaxConcurrentQueries > 0 && len(e.queries) >= e.MaxConcurrentQueries {
return 0, nil, ErrMaxConcurrentQueriesReached
}

qid := e.nextID
query := &QueryTask{
query: q.String(),
database: database,
startTime: time.Now(),
closing: make(chan struct{}),
monitorCh: make(chan error),
}
e.queries[qid] = query

go e.waitForQuery(qid, query.closing, interrupt, query.monitorCh)
if e.LogQueriesAfter != 0 {
go query.monitor(func(closing <-chan struct{}) error {
t := time.NewTimer(e.LogQueriesAfter)
defer t.Stop()

select {
case <-t.C:
e.Logger.Printf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
query.query, qid, query.database, e.LogQueriesAfter)
case <-closing:
}
return nil
})
}
e.nextID++
return qid, query, nil
}

// killQuery stops and removes a query from the QueryExecutor.
// This method can be used to forcefully terminate a running query.
func (e *QueryExecutor) killQuery(qid uint64) error {
e.mu.Lock()
defer e.mu.Unlock()

query, ok := e.queries[qid]
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}

close(query.closing)
delete(e.queries, qid)
return nil
}

func (e *QueryExecutor) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) {
var timer <-chan time.Time
if e.QueryTimeout != 0 {
t := time.NewTimer(e.QueryTimeout)
timer = t.C
defer t.Stop()
}

select {
case <-closing:
query, ok := e.query(qid)
if !ok {
break
}
query.setError(ErrQueryInterrupted)
case err := <-monitorCh:
if err == nil {
break
}

query, ok := e.query(qid)
if !ok {
break
}
query.setError(err)
case <-timer:
query, ok := e.query(qid)
if !ok {
break
}
query.setError(ErrQueryTimeoutReached)
case <-interrupt:
// Query was manually closed so exit the select.
return
}
e.killQuery(qid)
}

// QueryMonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
Expand Down
Loading

0 comments on commit bdd15be

Please sign in to comment.