Skip to content

Commit

Permalink
fix: correct tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davidby-influx committed Aug 23, 2021
1 parent 57b7921 commit b3aa31b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 66 deletions.
1 change: 0 additions & 1 deletion query/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} {
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.AbortCh:
return ErrQueryAborted
Expand Down
2 changes: 1 addition & 1 deletion query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil {
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted {
return
}
break
Expand Down
87 changes: 23 additions & 64 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,100 +491,59 @@ const goodStatement = `SELECT count(value) FROM cpu`

func TestQueryExecutor_NotExecuted(t *testing.T) {
var executorFailIndex int
var normalizerFailIndex int
var executorCallCount int
var normalizerCallCount int
queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement}
queryStr := strings.Join(queryStatements, ";")
var closing chan struct{}

q, err := influxql.ParseQuery(queryStr)
if err != nil {
t.Fatalf("parsing %s: %v", queryStr, err)
}

e := NewQueryExecutor()
e.StatementExecutor = &StatementNormalizerExecutor{
StatementExecutor: StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
defer func() { executorCallCount++ }()
if executorFailIndex == executorCallCount {
return fmt.Errorf("executor failure on call %d", executorCallCount)
} else {
return ctx.Send(&query.Result{Err: nil})
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
defer func() { executorCallCount++ }()
if executorFailIndex == executorCallCount {
closing <- struct{}{}
close(closing)
select {
case <-ctx.Done():
return nil
}
},
},
NormalizeStatementFn: func(stmt influxql.Statement, database, retentionPolicy string) error {
defer func() { normalizerCallCount++ }()
if normalizerFailIndex == normalizerCallCount {
return fmt.Errorf("normalizer failure on call %d", normalizerCallCount)
} else {
return nil
return ctx.Send(&query.Result{Err: nil})
}
},
}
testFn := func(testName string, i int, failIndex, ignoreIndex *int) {
testFn := func(testName string, i int, failIndex *int) {
*failIndex = i
*ignoreIndex = -1
executorCallCount = 0
normalizerCallCount = 0
results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing)

results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
checkNotExecutedResults(t, results, testName, *failIndex, len(q.Statements))
}
for i := 0; i < len(q.Statements); i++ {
testFn("executor", i, &executorFailIndex, &normalizerFailIndex)
testFn("normalizer", i, &normalizerFailIndex, &executorFailIndex)
}
}

func TestQueryExecutor_SystemNameNotExecuted(t *testing.T) {
e := NewQueryExecutor()
e.StatementExecutor = &StatementNormalizerExecutor{
StatementExecutor: StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return ctx.Send(&query.Result{Err: nil})
},
},
NormalizeStatementFn: func(stmt influxql.Statement, database, retentionPolicy string) error {
return nil
},
}
const testName = "system measurement detection"
const metaTestCnt = 3
for j := 0; j < metaTestCnt; j++ {
stmt := make([]string, 0, metaTestCnt)
for i := 0; i < metaTestCnt; i++ {
if i != j {
stmt = append(stmt, goodStatement)
} else {
stmt = append(stmt, "SELECT * FROM _fieldKeys")
}
}
queryStr := strings.Join(stmt, ";")
q, err := influxql.ParseQuery(queryStr)
if err != nil {
t.Fatalf("parsing %s: %v", queryStr, err)
}
results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
checkNotExecutedResults(t, results, testName, j, len(q.Statements))
closing = make(chan struct{})
testFn("executor", i, &executorFailIndex)
}
}

func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) {
r := 0
cnt := 0
for result := range results {
if r < failIndex && result.Err != nil {
t.Fatalf("%s failed early: %v", testName, result.Err)
} else if r == failIndex && result.Err == nil {
t.Fatalf("%s unexpected success at %d", testName, r)
} else if r > failIndex && result.Err != query.ErrNotExecuted {
t.Fatalf("expected ErrorNotExecuted from %s but got: %v", testName, result.Err)
if result.Err == query.ErrNotExecuted {
cnt++
if result.StatementID <= failIndex {
t.Fatalf("StatementID for ErrNotExecuted is wrong index: %d", result.StatementID)
}
}
r++
}
if r != lenQuery {
t.Fatalf("wrong number of results from %s - got: %d, expected: %d", testName, r, lenQuery)
if cnt != (lenQuery - (1 + failIndex)) {
t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, cnt, lenQuery-(1+failIndex))
}
}

Expand Down

0 comments on commit b3aa31b

Please sign in to comment.