From 127249229de97b366e02915036e76d93135772b7 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 27 Aug 2014 15:51:05 -0400 Subject: [PATCH] Don't accidentally trigger continuous query backfilling --- coordinator/raft_server.go | 8 +++ integration/continuous_queries_test.go | 82 ++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 integration/continuous_queries_test.go diff --git a/coordinator/raft_server.go b/coordinator/raft_server.go index a3d1fd7ec3c..d78e2f480ab 100644 --- a/coordinator/raft_server.go +++ b/coordinator/raft_server.go @@ -498,6 +498,14 @@ func (s *RaftServer) checkContinuousQueries() { currentBoundary := runTime.Truncate(*duration) lastRun := s.clusterConfig.LastContinuousQueryRunTime() + if lastRun.IsZero() && !query.GetIntoClause().Backfill { + // don't backfill for this continuous query, leave it for next + // iteration. Otherwise, we're going to run the continuous + // query from lastRun which is the beginning of time until + // now. This is equivalent to backfilling. + continue + } + lastBoundary := lastRun.Truncate(*duration) if currentBoundary.After(lastRun) { diff --git a/integration/continuous_queries_test.go b/integration/continuous_queries_test.go new file mode 100644 index 00000000000..4224f97e462 --- /dev/null +++ b/integration/continuous_queries_test.go @@ -0,0 +1,82 @@ +package integration + +import ( + "fmt" + "net/http" + "os" + "time" + + . "github.com/influxdb/influxdb/integration/helpers" + . "launchpad.net/gocheck" +) + +type ContinuousQueriesSuite struct { + serverProcesses []*Server +} + +var _ = Suite(&ContinuousQueriesSuite{}) + +func (self *ContinuousQueriesSuite) SetUpSuite(c *C) { + err := os.RemoveAll("/tmp/influxdb/test") + c.Assert(err, IsNil) + self.serverProcesses = []*Server{ + NewServer("integration/test_rf_1.toml", c), + NewServer("integration/test_rf_2.toml", c), + } + client := self.serverProcesses[0].GetClient("", c) + c.Assert(client.CreateDatabase("test"), IsNil) + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } +} + +func (self *ContinuousQueriesSuite) TearDownSuite(c *C) { + for _, s := range self.serverProcesses { + s.Stop() + } +} + +func (self *ContinuousQueriesSuite) TestFirstBackfill(c *C) { + defer self.serverProcesses[0].RemoveAllContinuousQueries("test_cq", c) + client := self.serverProcesses[0].GetClient("", c) + c.Assert(client.CreateDatabase("test_no_backfill"), IsNil) + + data := fmt.Sprintf(`[ + { + "name": "cqbackfilltest", + "columns": ["time", "reqtime", "url"], + "points": [ + [0, 8.0, "/login"], + [0, 3.0, "/list"], + [0, 4.0, "/register"], + [5, 9.0, "/login"], + [5, 4.0, "/list"], + [5, 5.0, "/register"] + ] + } + ]`) + + resp := self.serverProcesses[0].Post("/db/test_no_backfill/series?u=root&p=root&time_precision=s", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + // wait for the data to get written + + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } + + cq := "select count(reqtime), url from cqbackfilltest group by time(10s), url into cqbackfill_off.10s backfill(false)" + self.serverProcesses[0].QueryAsRoot("test_no_backfill", cq, false, c) + + // wait for the continuous query to run + time.Sleep(5 * time.Second) + + self.serverProcesses[0].RemoveAllContinuousQueries("test_no_backfill", c) + + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } + + // check backfill_off query results + body, _ := self.serverProcesses[0].GetErrorBody("test_no_backfill", "select * from cqbackfill_off.10s", "root", "root", false, c) + c.Assert(body, Matches, "Couldn't look up columns.*") +}