From c1643e69c1cb0e97e2d3f28f213c17a61093c5f5 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 29 Mar 2016 11:29:34 -0400 Subject: [PATCH] Have the server kill all queries on shutdown Related to #6140, but won't actually fix that problem. It will correctly stop new queries from being started during shutdown and will send the interrupt signal to queries during shutdown. Since the interrupt signal is asynchronous, there isn't currently a way to wait for the queries to complete themselves before shutting down the engine. --- CHANGELOG.md | 1 + cmd/influxd/run/server.go | 4 ++++ influxql/query_manager.go | 24 ++++++++++++++++++++++++ influxql/query_manager_test.go | 34 ++++++++++++++++++++++++++++++++-- 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfe70d25101..4c07dec478b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#6116](https://github.com/influxdata/influxdb/pull/6116): Allow `httpd` service to be extensible for routes - [#6111](https://github.com/influxdata/influxdb/pull/6111): Add ability to build static assest. Improved handling of TAR and ZIP package outputs. - [#1825](https://github.com/influxdata/influxdb/issues/1825): Implement difference function. +- [#6149](https://github.com/influxdata/influxdb/pull/6149): Kill running queries when server is shutdown. ### Bugfixes diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 0e327fc2c6f..d82204f24fb 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -310,6 +310,10 @@ func (s *Server) Close() error { s.PointsWriter.Close() } + if s.QueryExecutor.QueryManager != nil { + s.QueryExecutor.QueryManager.Close() + } + // Close the TSDBStore, no more reads or writes at this point if s.TSDBStore != nil { s.TSDBStore.Close() diff --git a/influxql/query_manager.go b/influxql/query_manager.go index 5c7a99fc87b..3b7a68fb4bd 100644 --- a/influxql/query_manager.go +++ b/influxql/query_manager.go @@ -20,6 +20,10 @@ var ( // ErrMaxConcurrentQueriesReached is an error when a query cannot be run // because the maximum number of queries has been reached. ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached") + + // ErrQueryManagerShutdown is an error sent when the query cannot be + // attached because it was previous shutdown. + ErrQueryManagerShutdown = errors.New("query manager shutdown") ) // QueryTaskInfo holds information about a currently running query. @@ -61,6 +65,9 @@ type QueryManager interface { // This method can be used to forcefully terminate a running query. KillQuery(qid uint64) error + // Close kills all running queries and prevents new queries from being attached. + Close() error + // Queries lists the currently running tasks. Queries() []QueryTaskInfo } @@ -116,12 +123,17 @@ type defaultQueryManager struct { nextID uint64 maxQueries int mu sync.Mutex + shutdown bool } func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) { qm.mu.Lock() defer qm.mu.Unlock() + if qm.shutdown { + return 0, nil, ErrQueryManagerShutdown + } + if qm.maxQueries > 0 && len(qm.queries) >= qm.maxQueries { return 0, nil, ErrMaxConcurrentQueriesReached } @@ -169,6 +181,18 @@ func (qm *defaultQueryManager) KillQuery(qid uint64) error { return nil } +func (qm *defaultQueryManager) Close() error { + qm.mu.Lock() + defer qm.mu.Unlock() + + qm.shutdown = true + for _, query := range qm.queries { + close(query.closing) + } + qm.queries = nil + return nil +} + func (qm *defaultQueryManager) Queries() []QueryTaskInfo { qm.mu.Lock() defer qm.mu.Unlock() diff --git a/influxql/query_manager_test.go b/influxql/query_manager_test.go index b01932d1f09..eeee290b491 100644 --- a/influxql/query_manager_test.go +++ b/influxql/query_manager_test.go @@ -52,11 +52,11 @@ func TestQueryManager_KillQuery(t *testing.T) { select { case <-ch: case <-time.After(100 * time.Millisecond): - t.Error("detaching the query did not close the channel after 100 milliseconds") + t.Error("killing the query did not close the channel after 100 milliseconds") } if err := qm.KillQuery(qid); err == nil || err.Error() != fmt.Sprintf("no such query id: %d", qid) { - t.Errorf("incorrect error detaching query, got %s", err) + t.Errorf("incorrect error killing query, got %s", err) } } @@ -175,3 +175,33 @@ func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) { t.Errorf("unexpected error: %s", err) } } + +func TestQueryManager_Close(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + qm := influxql.DefaultQueryManager(0) + params := influxql.QueryParams{ + Query: q, + Database: `mydb`, + } + + _, ch, err := qm.AttachQuery(¶ms) + if err != nil { + t.Fatal(err) + } + qm.Close() + + select { + case <-ch: + case <-time.After(100 * time.Millisecond): + t.Error("closing the query manager did not kill the query after 100 milliseconds") + } + + _, _, err = qm.AttachQuery(¶ms) + if err == nil || err != influxql.ErrQueryManagerShutdown { + t.Errorf("unexpected error: %s", err) + } +}