Skip to content

Commit

Permalink
Don't accidentally trigger continuous query backfilling
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Sep 2, 2014
1 parent 69df51a commit 1272492
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
8 changes: 8 additions & 0 deletions coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,14 @@ func (s *RaftServer) checkContinuousQueries() {

currentBoundary := runTime.Truncate(*duration)
lastRun := s.clusterConfig.LastContinuousQueryRunTime()
if lastRun.IsZero() && !query.GetIntoClause().Backfill {
// don't backfill for this continuous query, leave it for next
// iteration. Otherwise, we're going to run the continuous
// query from lastRun which is the beginning of time until
// now. This is equivalent to backfilling.
continue
}

lastBoundary := lastRun.Truncate(*duration)

if currentBoundary.After(lastRun) {
Expand Down
82 changes: 82 additions & 0 deletions integration/continuous_queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package integration

import (
"fmt"
"net/http"
"os"
"time"

. "github.com/influxdb/influxdb/integration/helpers"
. "launchpad.net/gocheck"
)

type ContinuousQueriesSuite struct {
serverProcesses []*Server
}

var _ = Suite(&ContinuousQueriesSuite{})

func (self *ContinuousQueriesSuite) SetUpSuite(c *C) {
err := os.RemoveAll("/tmp/influxdb/test")
c.Assert(err, IsNil)
self.serverProcesses = []*Server{
NewServer("integration/test_rf_1.toml", c),
NewServer("integration/test_rf_2.toml", c),
}
client := self.serverProcesses[0].GetClient("", c)
c.Assert(client.CreateDatabase("test"), IsNil)
for _, s := range self.serverProcesses {
s.WaitForServerToSync()
}
}

func (self *ContinuousQueriesSuite) TearDownSuite(c *C) {
for _, s := range self.serverProcesses {
s.Stop()
}
}

func (self *ContinuousQueriesSuite) TestFirstBackfill(c *C) {
defer self.serverProcesses[0].RemoveAllContinuousQueries("test_cq", c)
client := self.serverProcesses[0].GetClient("", c)
c.Assert(client.CreateDatabase("test_no_backfill"), IsNil)

data := fmt.Sprintf(`[
{
"name": "cqbackfilltest",
"columns": ["time", "reqtime", "url"],
"points": [
[0, 8.0, "/login"],
[0, 3.0, "/list"],
[0, 4.0, "/register"],
[5, 9.0, "/login"],
[5, 4.0, "/list"],
[5, 5.0, "/register"]
]
}
]`)

resp := self.serverProcesses[0].Post("/db/test_no_backfill/series?u=root&p=root&time_precision=s", data, c)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
// wait for the data to get written

for _, s := range self.serverProcesses {
s.WaitForServerToSync()
}

cq := "select count(reqtime), url from cqbackfilltest group by time(10s), url into cqbackfill_off.10s backfill(false)"
self.serverProcesses[0].QueryAsRoot("test_no_backfill", cq, false, c)

// wait for the continuous query to run
time.Sleep(5 * time.Second)

self.serverProcesses[0].RemoveAllContinuousQueries("test_no_backfill", c)

for _, s := range self.serverProcesses {
s.WaitForServerToSync()
}

// check backfill_off query results
body, _ := self.serverProcesses[0].GetErrorBody("test_no_backfill", "select * from cqbackfill_off.10s", "root", "root", false, c)
c.Assert(body, Matches, "Couldn't look up columns.*")
}

0 comments on commit 1272492

Please sign in to comment.