Skip to content

Commit

Permalink
Avoid deadlock when max-row-limit is hit
Browse files Browse the repository at this point in the history
When the `max-row-limit` was hit, the goroutine reading from the results
channel would stop reading from the channel, but it didn't signal to the
sender that it was no longer reading from the results. This caused the
sender to continue trying to send results even though nobody would ever
read it and this created a deadlock.

Include an `AbortCh` on the `ExecutionContext` that will signal when
results are no longer desired so the sender can abort instead of
deadlocking.
  • Loading branch information
jsternberg committed Nov 8, 2016
1 parent 3c0aaae commit 40b66c3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ All Changes:
- [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES.
- [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data.
- [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies
- [#7606](https://github.com/influxdata/influxdb/pull/7606): Avoid deadlock when `max-row-limit` is hit.

## v1.0.2 [2016-10-05]

Expand Down
48 changes: 21 additions & 27 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
return err
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: rows,
Messages: messages,
}
return nil
})
}

func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
Expand Down Expand Up @@ -441,10 +440,8 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

// Send results or exit if closing.
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case ctx.Results <- result:
if err := ctx.Send(result); err != nil {
return err
}

emitted = true
Expand All @@ -461,24 +458,23 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Messages: messages,
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
}},
}
return nil
})
}

// Always emit at least one result.
if !emitted {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: make([]*models.Row, 0),
}
})
}

return nil
Expand Down Expand Up @@ -673,11 +669,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea

measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition)
if err != nil || len(measurements) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}

if q.Offset > 0 {
Expand All @@ -700,21 +695,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
}

if len(values) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}
return nil
})
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{{
Name: "measurements",
Columns: []string{"name"},
Values: values,
}},
}
return nil
})
}

func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
Expand Down Expand Up @@ -849,11 +842,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem

tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition)
if err != nil {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}

emitted := false
Expand Down Expand Up @@ -887,18 +879,20 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
row.Values[i] = []interface{}{v.Key, v.Value}
}

ctx.Results <- &influxql.Result{
if err := ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}); err != nil {
return err
}
emitted = true
}

// Ensure at least one result is emitted.
if !emitted {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}
})
}
return nil
}
Expand Down
49 changes: 44 additions & 5 deletions influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")

// ErrQueryAborted is an error returned when the query is aborted.
ErrQueryAborted = errors.New("query aborted")

// ErrQueryEngineShutdown is an error sent when the query cannot be
// created because the query engine was shutdown.
ErrQueryEngineShutdown = errors.New("query engine shutdown")
Expand Down Expand Up @@ -74,6 +77,9 @@ type ExecutionOptions struct {

// Quiet suppresses non-essential output from the query executor.
Quiet bool

// AbortCh is a channel that signals when results are no longer desired by the caller.
AbortCh <-chan struct{}
}

// ExecutionContext contains state that the query is currently executing with.
Expand All @@ -100,6 +106,30 @@ type ExecutionContext struct {
ExecutionOptions
}

// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
select {
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}

// Send sends a Result to the Results channel and will exit if the query has
// been interrupted or aborted.
func (ctx *ExecutionContext) Send(result *Result) error {
select {
case <-ctx.InterruptCh:
return ErrQueryInterrupted
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}

// StatementExecutor executes a statement within the QueryExecutor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
Expand Down Expand Up @@ -194,7 +224,11 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing

qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
if err != nil {
results <- &Result{Err: err}
select {
case results <- &Result{Err: err}:
case <-closing:
case <-opt.AbortCh:
}
return
}
defer e.TaskManager.KillQuery(qid)
Expand Down Expand Up @@ -265,8 +299,9 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil {
results <- &Result{Err: err}
break
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
return
}
}
}

Expand All @@ -287,9 +322,11 @@ LOOP:

// Send an error for this result if it failed for some reason.
if err != nil {
results <- &Result{
if err := ctx.send(&Result{
StatementID: i,
Err: err,
}); err == ErrQueryAborted {
return
}
// Stop after the first error.
break
Expand All @@ -313,9 +350,11 @@ LOOP:

// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
results <- &Result{
if err := ctx.send(&Result{
StatementID: i,
Err: ErrNotExecuted,
}); err == ErrQueryAborted {
return
}
}
}
Expand Down
32 changes: 30 additions & 2 deletions influxql/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
}
}

func TestQueryExecutor_Abort(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected
}
},
}

closing := make(chan struct{})
results := e.ExecuteQuery(q, influxql.ExecutionOptions{}, closing)
close(closing)
result := <-results
if result.Err != influxql.ErrQueryInterrupted {
t.Errorf("unexpected error: %s", result.Err)
}
}

func TestQueryExecutor_ShowQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
Expand Down Expand Up @@ -225,7 +253,6 @@ func TestQueryExecutor_Close(t *testing.T) {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
close(ch1)
<-ctx.InterruptCh
close(ch2)
return influxql.ErrQueryInterrupted
},
}
Expand All @@ -236,6 +263,7 @@ func TestQueryExecutor_Close(t *testing.T) {
if result.Err != influxql.ErrQueryEngineShutdown {
t.Errorf("unexpected error: %s", result.Err)
}
close(ch2)
}(results)

// Wait for the statement to start executing.
Expand All @@ -248,7 +276,7 @@ func TestQueryExecutor_Close(t *testing.T) {
select {
case <-ch2:
case <-time.After(100 * time.Millisecond):
t.Error("closing the query manager did not kill the query after 100 milliseconds")
t.Fatal("closing the query manager did not kill the query after 100 milliseconds")
}

results = e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil)
Expand Down
15 changes: 9 additions & 6 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Parse whether this is an async command.
async := r.FormValue("async") == "true"

opts := influxql.ExecutionOptions{
Database: db,
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}

// Make sure if the client disconnects we signal the query to abort
var closing chan struct{}
if !async {
Expand All @@ -398,19 +405,15 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
close(closing)
}
}()
opts.AbortCh = done
} else {
defer close(closing)
}
}

// Execute query.
rw.Header().Add("Connection", "close")
results := h.QueryExecutor.ExecuteQuery(query, influxql.ExecutionOptions{
Database: db,
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}, closing)
results := h.QueryExecutor.ExecuteQuery(query, opts, closing)

// If we are running in async mode, open a goroutine to drain the results
// and return with a StatusNoContent.
Expand Down

0 comments on commit 40b66c3

Please sign in to comment.