Skip to content

Commit

Permalink
fix: return correct count of ErrNotExecuted (influxdata#22273)
Browse files Browse the repository at this point in the history
executeQuery() iterates over statements until each is
processed or if an error is encountered that causes
the loop to exit pre-maturely. It should return
ErrNotExecuted for each remaining statement in the
query

closes influxdata#19136
  • Loading branch information
davidby-influx authored and chengshiwen committed Aug 11, 2024
1 parent 0858af3 commit aa6c21e
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ v1.8.11 [unreleased]
- [#21792](https://github.com/influxdata/influxdb/pull/21792): fix: error instead of panic for statement rewrite failure
- [#21795](https://github.com/influxdata/influxdb/pull/21795): fix: show shards gives empty expiry time for inf duration shards
- [#22040](https://github.com/influxdata/influxdb/pull/22040): fix: copy names from mmapped memory before closing iterator
- [#22273](https://github.com/influxdata/influxdb/pull/22273): fix: return correct count of ErrNotExecuted

v1.8.10 [2021-10-11]
-------------------
Expand Down
1 change: 0 additions & 1 deletion query/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} {
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.AbortCh:
return ErrQueryAborted
Expand Down
4 changes: 2 additions & 2 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil {
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted {
return
}
break
Expand Down Expand Up @@ -380,7 +380,7 @@ LOOP:
}

// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
for i++; i < len(query.Statements); i++ {
if err := ctx.send(&Result{
StatementID: i,
Err: ErrNotExecuted,
Expand Down
67 changes: 67 additions & 0 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt i
return e.ExecuteStatementFn(stmt, ctx)
}

type StatementNormalizerExecutor struct {
StatementExecutor
NormalizeStatementFn func(stmt influxql.Statement, database, retentionPolicy string) error
}

func (e *StatementNormalizerExecutor) NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error {
return e.NormalizeStatementFn(stmt, database, retentionPolicy)
}

func NewQueryExecutor() *query.Executor {
return query.NewExecutor()
}
Expand Down Expand Up @@ -478,6 +487,64 @@ func TestQueryExecutor_Panic(t *testing.T) {
}
}

const goodStatement = `SELECT count(value) FROM cpu`

func TestQueryExecutor_NotExecuted(t *testing.T) {
var executorFailIndex int
var executorCallCount int
queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement}
queryStr := strings.Join(queryStatements, ";")
var closing chan struct{}

q, err := influxql.ParseQuery(queryStr)
if err != nil {
t.Fatalf("parsing %s: %v", queryStr, err)
}

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
defer func() { executorCallCount++ }()
if executorFailIndex == executorCallCount {
closing <- struct{}{}
close(closing)
select {
case <-ctx.Done():
return nil
}
} else {
return ctx.Send(&query.Result{Err: nil})
}
},
}
testFn := func(testName string, i int) {
results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing)
checkNotExecutedResults(t, results, testName, i, len(q.Statements))
}
for i := 0; i < len(q.Statements); i++ {
closing = make(chan struct{})
executorFailIndex = i
executorCallCount = 0
testFn("executor", i)
}
}

func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) {
notExecutedIndex := failIndex + 1
for result := range results {
if result.Err == query.ErrNotExecuted {
if result.StatementID != notExecutedIndex {
t.Fatalf("StatementID for ErrNotExecuted in wrong order - expected: %d, got: %d", notExecutedIndex, result.StatementID)
} else {
notExecutedIndex++
}
}
}
if notExecutedIndex != lenQuery {
t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, notExecutedIndex - (1 + failIndex), lenQuery-(1+failIndex))
}
}

func TestQueryExecutor_InvalidSource(t *testing.T) {
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
Expand Down

0 comments on commit aa6c21e

Please sign in to comment.