Skip to content

Commit

Permalink
fix #279. Limit not working for regex
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Feb 27, 2014
1 parent 4698a4f commit 245174a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 9 deletions.
36 changes: 27 additions & 9 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type QueryEngine struct {
responseChan chan *protocol.Response
shouldLimit bool
limit int
limits map[string]int
seriesToPoints map[string]*protocol.Series
yield func(*protocol.Series) error

Expand Down Expand Up @@ -68,6 +69,7 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo
query: query,
where: query.GetWhereCondition(),
limit: limit,
limits: make(map[string]int),
shouldLimit: shouldLimit,
responseChan: responseChan,
seriesToPoints: make(map[string]*protocol.Series),
Expand Down Expand Up @@ -131,29 +133,45 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
}
}
if err != nil {
log.Error(err)
return false
}
return !self.hitLimit()
return !self.hitLimit(*series.Name)
}

// TODO: make limits work for aggregate queries and for queries that pull from multiple series.
func (self *QueryEngine) calculateLimitAndSlicePoints(series *protocol.Series) {
if !self.isAggregateQuery && self.shouldLimit {
self.limit -= len(series.Points)
if self.limit < 0 {
sliceTo := len(series.Points) + self.limit
if sliceTo > 0 {
series.Points = series.Points[0:sliceTo]
}
// if the limit is 0, stop returning any points
limit := self.limitForSeries(*series.Name)
defer func() { self.limits[*series.Name] = limit }()
if limit == 0 {
series.Points = nil
return
}
limit -= len(series.Points)
if limit <= 0 {
sliceTo := len(series.Points) + limit
series.Points = series.Points[0:sliceTo]
limit = 0
}
}
}

func (self *QueryEngine) hitLimit() bool {
func (self *QueryEngine) hitLimit(seriesName string) bool {
if self.isAggregateQuery || !self.shouldLimit {
return false
}
return self.limit < 1
return self.limitForSeries(seriesName) <= 0
}

func (self *QueryEngine) limitForSeries(name string) int {
currentLimit, ok := self.limits[name]
if !ok {
currentLimit = self.limit
self.limits[name] = currentLimit
}
return currentLimit
}

func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) {
Expand Down
40 changes: 40 additions & 0 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,46 @@ func (self *ServerSuite) TestColumnNamesReturnInDistributedQuery(c *C) {
}
}

func generateHttpApiSeries(name string, n int) *common.SerializedSeries {
points := [][]interface{}{}

for i := 0; i < n; i++ {
points = append(points, []interface{}{i})
}

return &common.SerializedSeries{
Name: name,
Columns: []string{"value"},
Points: points,
}
}

func (self *ServerSuite) TestLimitWithRegex(c *C) {
// run the test once with less than POINT_BATCH_SIZE points and once
// with more than POINT_BATCH_SIZE points
for _, numberOfPoints := range []int{100, 1000} {
for i := 0; i < 100; i++ {
name := fmt.Sprintf("limit_with_regex_%d_%d", numberOfPoints, i)
series := generateHttpApiSeries(name, numberOfPoints)
bytes, err := json.Marshal([]*common.SerializedSeries{series})
c.Assert(err, IsNil)
resp := self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", string(bytes), c)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
}
time.Sleep(time.Second * 2)
query := fmt.Sprintf("select * from /.*limit_with_regex_%d.*/ limit 1", numberOfPoints)
collection := self.serverProcesses[0].Query("test_rep", query, false, c)
// make sure all series get back 1 point only
for i := 0; i < 100; i++ {
table := fmt.Sprintf("limit_with_regex_%d_%d", numberOfPoints, i)
series := collection.GetSeries(table, c)
c.Assert(series.SerializedSeries.Points, HasLen, 1)
c.Assert(series.GetValueForPointAndColumn(0, "value", c), Equals, float64(numberOfPoints-1))
}
}
}

// For issue #131 https://github.com/influxdb/influxdb/issues/131
func (self *ServerSuite) TestSelectFromRegexInCluster(c *C) {
data := `[{
Expand Down

0 comments on commit 245174a

Please sign in to comment.