Skip to content

Commit

Permalink
Merge pull request #370 from influxdb/fix-370-shards-limit
Browse files Browse the repository at this point in the history
Shards should be checking the limit on the query and stop returning data as soon as the limit is hit
  • Loading branch information
pauldix committed Mar 31, 2014
2 parents 67545b6 + aa61caa commit f76762d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 48 deletions.
10 changes: 8 additions & 2 deletions src/cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,26 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
maxDeleteResults := 10000
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
query := querySpec.SelectQuery()
if self.ShouldAggregateLocally(querySpec) {
log.Debug("creating a query engine\n")
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
processor, err = engine.NewQueryEngine(query, response)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while creating engine: %s", err)
return
}
processor.SetShardInfo(int(self.Id()), self.IsLocal)
} else {
} else if query.HasAggregates() {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine\n")
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
} else {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine with limit\n")
processor = engine.NewPassthroughEngineWithLimit(response, maxPointsToBufferBeforeSending, query.Limit)
}
processor = engine.NewFilteringEngine(query, processor)
}
shard, err := self.store.GetOrCreateShard(self.id)
if err != nil {
Expand Down
50 changes: 4 additions & 46 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
log "code.google.com/p/log4go"
"common"
"fmt"
"parser"
Expand All @@ -10,6 +9,8 @@ import (
"strconv"
"strings"
"time"

log "code.google.com/p/log4go"
)

type QueryEngine struct {
Expand Down Expand Up @@ -158,59 +159,16 @@ func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldCon
}

func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
var err error
if self.where != nil {
serieses, err := self.filter(series)
if err != nil {
log.Error("Error while filtering points: %s\n", err)
return false
}
for _, series := range serieses {
if len(series.Points) > 0 {
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
if err = self.yield(series); err != nil {
return false
}
}
}
}
} else {
self.limiter.calculateLimitAndSlicePoints(series)
self.limiter.calculateLimitAndSlicePoints(series)

if len(series.Points) > 0 {
err = self.yield(series)
}
}
err := self.yield(series)
if err != nil {
log.Error(err)
return false
}
return !self.limiter.hitLimit(*series.Name)
}

func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) {
aliases := self.query.GetTableAliases(*series.Name)
result := make([]*protocol.Series, len(aliases), len(aliases))
for i, alias := range aliases {
_alias := alias
newSeries := &protocol.Series{Name: &_alias, Points: series.Points, Fields: series.Fields}

filteredResult := newSeries
var err error

// var err error
if self.query.GetFromClause().Type != parser.FromClauseInnerJoin {
filteredResult, err = Filter(self.query, newSeries)
if err != nil {
return nil, err
}
}
result[i] = filteredResult
}
return result, nil
}

func (self *QueryEngine) Close() {
for _, series := range self.seriesToPoints {
if len(series.Points) == 0 {
Expand Down
52 changes: 52 additions & 0 deletions src/engine/filtering_engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package engine

import (
"parser"
p "protocol"
)

type FilteringEngine struct {
query *parser.SelectQuery
processor QueryProcessor
shouldFilter bool
}

func NewFilteringEngine(query *parser.SelectQuery, processor QueryProcessor) *FilteringEngine {
shouldFilter := query.GetWhereCondition() != nil
return &FilteringEngine{query, processor, shouldFilter}
}

// optimize for yield series and use it here
func (self *FilteringEngine) YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool {
return self.YieldSeries(&p.Series{
Name: seriesName,
Fields: columnNames,
Points: []*p.Point{point},
})
}

func (self *FilteringEngine) YieldSeries(seriesIncoming *p.Series) bool {
if !self.shouldFilter {
return self.processor.YieldSeries(seriesIncoming)
}

series, err := Filter(self.query, seriesIncoming)
if err != nil {
panic(err)
}
if len(series.Points) == 0 {
return false
}
return self.processor.YieldSeries(series)
}

func (self *FilteringEngine) Close() {
self.processor.Close()
}

func (self *FilteringEngine) SetShardInfo(shardId int, shardLocal bool) {
self.processor.SetShardInfo(shardId, shardLocal)
}
func (self *FilteringEngine) GetName() string {
return self.processor.GetName()
}
19 changes: 19 additions & 0 deletions src/engine/query_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package engine

import (
p "protocol"
)

type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()

// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)

// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}

0 comments on commit f76762d

Please sign in to comment.