Skip to content

Commit

Permalink
Merge pull request #6820 from benbjohnson/http-query-node-id
Browse files Browse the repository at this point in the history
Add NodeID to execution options
  • Loading branch information
benbjohnson authored Jun 10, 2016
2 parents bdd15be + 7d4bea7 commit 48f1a6d
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing.
- [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable.
- [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service.
- [#6820](https://github.com/influxdata/influxdb/issues/6820): Add NodeID to execution options

### Bugfixes

Expand Down
13 changes: 8 additions & 5 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,10 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen

// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh}
opt := influxql.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
}

// Replace instances of "now()" with the current time, and check the resultant times.
nowValuer := influxql.NowValuer{Now: now}
Expand Down Expand Up @@ -610,7 +613,7 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
if err != nil {
return nil, err
}
return e.TSDBStore.IteratorCreator(shards)
return e.TSDBStore.IteratorCreator(shards, opt)
}

func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext, store LocalTSDBStore) error {
Expand Down Expand Up @@ -1148,19 +1151,19 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error)
}

type LocalTSDBStore struct {
*tsdb.Store
}

func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) {
func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
shardIDs := make([]uint64, len(shards))
for i, sh := range shards {
shardIDs[i] = sh.ID
}
return s.Store.IteratorCreator(shardIDs)
return s.Store.IteratorCreator(shardIDs, opt)
}

// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.
Expand Down
2 changes: 1 addition & 1 deletion coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, con
return s.DeleteSeriesFn(database, sources, condition)
}

func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) {
func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
Expand Down
3 changes: 3 additions & 0 deletions influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type ExecutionOptions struct {

// If this query is being executed in a read-only context.
ReadOnly bool

// Node to execute on.
NodeID uint64
}

// ExecutionContext contains state that the query is currently executing with.
Expand Down
4 changes: 4 additions & 0 deletions influxql/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type SelectOptions struct {
// The upper bound for a select call.
MaxTime time.Time

// Node to exclusively read from.
// If zero, all nodes are used.
NodeID uint64

// An optional channel that, if closed, signals that the select should be
// interrupted.
InterruptCh <-chan struct{}
Expand Down
2 changes: 2 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
}(time.Now())

pretty := r.FormValue("pretty") == "true"
nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64)

qp := strings.TrimSpace(r.FormValue("q"))
if qp == "" {
Expand Down Expand Up @@ -370,6 +371,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
Database: db,
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}, closing)

// if we're not chunking, this will be the in memory buffer for all results before sending to client
Expand Down
2 changes: 1 addition & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (s *Store) IteratorCreators() influxql.IteratorCreators {
return a
}

func (s *Store) IteratorCreator(shards []uint64) (influxql.IteratorCreator, error) {
func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
Expand Down

0 comments on commit 48f1a6d

Please sign in to comment.