From 7d4bea71530a136612ac9d3657550b16e7564a13 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 10 Jun 2016 09:14:21 -0600 Subject: [PATCH] add node id to execution options This commit changes the `ExecutionOptions` and `SelectOptions` to allow a `NodeID` for specifying an exact node to query against. --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 13 ++++++++----- coordinator/statement_executor_test.go | 2 +- influxql/query_executor.go | 3 +++ influxql/select.go | 4 ++++ services/httpd/handler.go | 2 ++ tsdb/store.go | 2 +- 7 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a08b02fb66a..802ef7b2e42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing. - [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable. - [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service. +- [#6820](https://github.com/influxdata/influxdb/issues/6820): Add NodeID to execution options ### Bugfixes diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index e00aaa21d05..478654a6252 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -415,7 +415,10 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now` now := time.Now().UTC() - opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh} + opt := influxql.SelectOptions{ + InterruptCh: ctx.InterruptCh, + NodeID: ctx.ExecutionOptions.NodeID, + } // Replace instances of "now()" with the current time, and check the resultant times. nowValuer := influxql.NowValuer{Now: now} @@ -604,7 +607,7 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt if err != nil { return nil, err } - return e.TSDBStore.IteratorCreator(shards) + return e.TSDBStore.IteratorCreator(shards, opt) } func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext, store LocalTSDBStore) error { @@ -1142,19 +1145,19 @@ type TSDBStore interface { DeleteRetentionPolicy(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error - IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) + IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) } type LocalTSDBStore struct { *tsdb.Store } -func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) { +func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) { shardIDs := make([]uint64, len(shards)) for i, sh := range shards { shardIDs[i] = sh.ID } - return s.Store.IteratorCreator(shardIDs) + return s.Store.IteratorCreator(shardIDs, opt) } // ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard. diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index a672c62cb93..9b39edf7279 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -247,7 +247,7 @@ func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, con return s.DeleteSeriesFn(database, sources, condition) } -func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) { +func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) { // Generate iterators for each node. ics := make([]influxql.IteratorCreator, 0) if err := func() error { diff --git a/influxql/query_executor.go b/influxql/query_executor.go index afe5123f9b5..c8b807644f4 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -70,6 +70,9 @@ type ExecutionOptions struct { // If this query is being executed in a read-only context. ReadOnly bool + + // Node to execute on. + NodeID uint64 } // ExecutionContext contains state that the query is currently executing with. diff --git a/influxql/select.go b/influxql/select.go index 843dbac3001..fbceded1c7e 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -15,6 +15,10 @@ type SelectOptions struct { // The upper bound for a select call. MaxTime time.Time + // Node to exclusively read from. + // If zero, all nodes are used. + NodeID uint64 + // An optional channel that, if closed, signals that the select should be // interrupted. InterruptCh <-chan struct{} diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 5ef24ac9b19..7756d6103aa 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -267,6 +267,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. }(time.Now()) pretty := r.FormValue("pretty") == "true" + nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64) qp := strings.TrimSpace(r.FormValue("q")) if qp == "" { @@ -370,6 +371,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. Database: db, ChunkSize: chunkSize, ReadOnly: r.Method == "GET", + NodeID: nodeID, }, closing) // if we're not chunking, this will be the in memory buffer for all results before sending to client diff --git a/tsdb/store.go b/tsdb/store.go index 4a3e76228c6..4de4c022254 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -709,7 +709,7 @@ func (s *Store) IteratorCreators() influxql.IteratorCreators { return a } -func (s *Store) IteratorCreator(shards []uint64) (influxql.IteratorCreator, error) { +func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) { // Generate iterators for each node. ics := make([]influxql.IteratorCreator, 0) if err := func() error {