From ac50a12ead8c0907c8784f1c2e6cb0fa501e43e6 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Tue, 24 Aug 2021 11:27:10 -0700 Subject: [PATCH 1/2] fix: return correct count of ErrNotExecuted (#22273) executeQuery() iterates over statements until each is processed or if an error is encountered that causes the loop to exit pre-maturely. It should return ErrNotExecuted for each remaining statement in the query closes https://github.com/influxdata/influxdb/issues/19136 (cherry-picked from commit 0090c5b11141e99ee80dc8980cff615c3b63dfd6) closes https://github.com/influxdata/influxdb/issues/22274 --- query/execution_context.go | 1 - query/executor.go | 4 +-- query/executor_test.go | 67 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/query/execution_context.go b/query/execution_context.go index 5479226f568..a17bbf7d99e 100644 --- a/query/execution_context.go +++ b/query/execution_context.go @@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} { // send sends a Result to the Results channel and will exit if the query has // been aborted. func (ctx *ExecutionContext) send(result *Result) error { - result.StatementID = ctx.statementID select { case <-ctx.AbortCh: return ErrQueryAborted diff --git a/query/executor.go b/query/executor.go index 75d4a7a320b..7b0a28d27ca 100644 --- a/query/executor.go +++ b/query/executor.go @@ -331,7 +331,7 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil { - if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted { return } break @@ -380,7 +380,7 @@ LOOP: } // Send error results for any statements which were not executed. - for ; i < len(query.Statements)-1; i++ { + for i++; i < len(query.Statements); i++ { if err := ctx.send(&Result{ StatementID: i, Err: ErrNotExecuted, diff --git a/query/executor_test.go b/query/executor_test.go index 8ed49fde87e..b8feaa317f0 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -21,6 +21,15 @@ func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt i return e.ExecuteStatementFn(stmt, ctx) } +type StatementNormalizerExecutor struct { + StatementExecutor + NormalizeStatementFn func(stmt influxql.Statement, database, retentionPolicy string) error +} + +func (e *StatementNormalizerExecutor) NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error { + return e.NormalizeStatementFn(stmt, database, retentionPolicy) +} + func NewQueryExecutor() *query.Executor { return query.NewExecutor() } @@ -478,6 +487,64 @@ func TestQueryExecutor_Panic(t *testing.T) { } } +const goodStatement = `SELECT count(value) FROM cpu` + +func TestQueryExecutor_NotExecuted(t *testing.T) { + var executorFailIndex int + var executorCallCount int + queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement} + queryStr := strings.Join(queryStatements, ";") + var closing chan struct{} + + q, err := influxql.ParseQuery(queryStr) + if err != nil { + t.Fatalf("parsing %s: %v", queryStr, err) + } + + e := NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + defer func() { executorCallCount++ }() + if executorFailIndex == executorCallCount { + closing <- struct{}{} + close(closing) + select { + case <-ctx.Done(): + return nil + } + } else { + return ctx.Send(&query.Result{Err: nil}) + } + }, + } + testFn := func(testName string, i int) { + results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing) + checkNotExecutedResults(t, results, testName, i, len(q.Statements)) + } + for i := 0; i < len(q.Statements); i++ { + closing = make(chan struct{}) + executorFailIndex = i + executorCallCount = 0 + testFn("executor", i) + } +} + +func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) { + notExecutedIndex := failIndex + 1 + for result := range results { + if result.Err == query.ErrNotExecuted { + if result.StatementID != notExecutedIndex { + t.Fatalf("StatementID for ErrNotExecuted in wrong order - expected: %d, got: %d", notExecutedIndex, result.StatementID) + } else { + notExecutedIndex++ + } + } + } + if notExecutedIndex != lenQuery { + t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, notExecutedIndex - (1 + failIndex), lenQuery-(1+failIndex)) + } +} + func TestQueryExecutor_InvalidSource(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ From 52189fd041acd73a7d9ed7f7ed95b8addbe6a36a Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Tue, 24 Aug 2021 11:38:45 -0700 Subject: [PATCH 2/2] chore: update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b62521e012..904a58b6ea2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ v1.9.4 [unreleased] - [#22091](https://github.com/influxdata/influxdb/pull/22091): fix: systemd service -- handle https, 40x, and block indefinitely - [#22214](https://github.com/influxdata/influxdb/pull/22214): fix: avoid compaction queue stats flutter - [#22289](https://github.com/influxdata/influxdb/pull/22289): fix: require database authorization to see continuous queries +- [#22294](https://github.com/influxdata/influxdb/pull/22294): fix: return correct count of ErrNotExecuted v1.9.3 [unreleased]