diff --git a/CHANGELOG.md b/CHANGELOG.md index dfe70d25101..4c07dec478b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#6116](https://github.com/influxdata/influxdb/pull/6116): Allow `httpd` service to be extensible for routes - [#6111](https://github.com/influxdata/influxdb/pull/6111): Add ability to build static assest. Improved handling of TAR and ZIP package outputs. - [#1825](https://github.com/influxdata/influxdb/issues/1825): Implement difference function. +- [#6149](https://github.com/influxdata/influxdb/pull/6149): Kill running queries when server is shutdown. ### Bugfixes diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 0e327fc2c6f..d82204f24fb 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -310,6 +310,10 @@ func (s *Server) Close() error { s.PointsWriter.Close() } + if s.QueryExecutor.QueryManager != nil { + s.QueryExecutor.QueryManager.Close() + } + // Close the TSDBStore, no more reads or writes at this point if s.TSDBStore != nil { s.TSDBStore.Close() diff --git a/influxql/query_manager.go b/influxql/query_manager.go index 5c7a99fc87b..3b7a68fb4bd 100644 --- a/influxql/query_manager.go +++ b/influxql/query_manager.go @@ -20,6 +20,10 @@ var ( // ErrMaxConcurrentQueriesReached is an error when a query cannot be run // because the maximum number of queries has been reached. ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached") + + // ErrQueryManagerShutdown is an error sent when the query cannot be + // attached because it was previous shutdown. + ErrQueryManagerShutdown = errors.New("query manager shutdown") ) // QueryTaskInfo holds information about a currently running query. @@ -61,6 +65,9 @@ type QueryManager interface { // This method can be used to forcefully terminate a running query. KillQuery(qid uint64) error + // Close kills all running queries and prevents new queries from being attached. + Close() error + // Queries lists the currently running tasks. Queries() []QueryTaskInfo } @@ -116,12 +123,17 @@ type defaultQueryManager struct { nextID uint64 maxQueries int mu sync.Mutex + shutdown bool } func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) { qm.mu.Lock() defer qm.mu.Unlock() + if qm.shutdown { + return 0, nil, ErrQueryManagerShutdown + } + if qm.maxQueries > 0 && len(qm.queries) >= qm.maxQueries { return 0, nil, ErrMaxConcurrentQueriesReached } @@ -169,6 +181,18 @@ func (qm *defaultQueryManager) KillQuery(qid uint64) error { return nil } +func (qm *defaultQueryManager) Close() error { + qm.mu.Lock() + defer qm.mu.Unlock() + + qm.shutdown = true + for _, query := range qm.queries { + close(query.closing) + } + qm.queries = nil + return nil +} + func (qm *defaultQueryManager) Queries() []QueryTaskInfo { qm.mu.Lock() defer qm.mu.Unlock() diff --git a/influxql/query_manager_test.go b/influxql/query_manager_test.go index b01932d1f09..eeee290b491 100644 --- a/influxql/query_manager_test.go +++ b/influxql/query_manager_test.go @@ -52,11 +52,11 @@ func TestQueryManager_KillQuery(t *testing.T) { select { case <-ch: case <-time.After(100 * time.Millisecond): - t.Error("detaching the query did not close the channel after 100 milliseconds") + t.Error("killing the query did not close the channel after 100 milliseconds") } if err := qm.KillQuery(qid); err == nil || err.Error() != fmt.Sprintf("no such query id: %d", qid) { - t.Errorf("incorrect error detaching query, got %s", err) + t.Errorf("incorrect error killing query, got %s", err) } } @@ -175,3 +175,33 @@ func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) { t.Errorf("unexpected error: %s", err) } } + +func TestQueryManager_Close(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + qm := influxql.DefaultQueryManager(0) + params := influxql.QueryParams{ + Query: q, + Database: `mydb`, + } + + _, ch, err := qm.AttachQuery(¶ms) + if err != nil { + t.Fatal(err) + } + qm.Close() + + select { + case <-ch: + case <-time.After(100 * time.Millisecond): + t.Error("closing the query manager did not kill the query after 100 milliseconds") + } + + _, _, err = qm.AttachQuery(¶ms) + if err == nil || err != influxql.ErrQueryManagerShutdown { + t.Errorf("unexpected error: %s", err) + } +}