Skip to content

Commit

Permalink
Test for correct sequence number assignment when interpolating series…
Browse files Browse the repository at this point in the history
… names in continuous queries with a group by clause.
  • Loading branch information
toddboom committed Apr 21, 2014
1 parent c330208 commit 03435de
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ func (self ServerSuite) RemoveAllContinuousQueries(db string, c *C) {
}
}

func (self ServerSuite) AssertContinuousQueryCount(db string, count int, c *C) {
client := self.serverProcesses[0].GetClient(db, c)
queries, err := client.GetContinuousQueries()
c.Assert(err, IsNil)
c.Assert(queries, HasLen, count)
}

func (self *ServerSuite) TestContinuousQueryManagement(c *C) {
defer self.RemoveAllContinuousQueries("test_cq", c)

Expand Down Expand Up @@ -771,6 +778,77 @@ func (self *ServerSuite) TestContinuousQueryInterpolation(c *C) {
/* self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 4;", false, c) */
}

func (self *ServerSuite) TestContinuousQuerySequenceNumberAssignmentWithInterpolation(c *C) {
defer self.RemoveAllContinuousQueries("test_cq", c)

currentTime := time.Now()
t0 := currentTime.Truncate(10 * time.Second)
t1 := time.Unix(t0.Unix()-5, 0).Unix()
t2 := time.Unix(t0.Unix()-10, 0).Unix()

data := fmt.Sprintf(`[
{ "name": "points",
"columns": ["c1", "c2", "time"],
"points": [
[1, "a", %d],
[2, "a", %d],
[3, "a", %d],
[7, "b", %d],
[8, "b", %d],
[9, "b", %d]
]}
]`, t1, t1, t1, t2, t2, t2)

self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)

self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
self.AssertContinuousQueryCount("test_cq", 1, c)
self.serverProcesses[0].WaitForServerToSync()
time.Sleep(time.Second)

self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)

data = fmt.Sprintf(`[
{ "name": "points",
"columns": ["c1", "c2", "time"],
"points": [
[1, "aa", %d],
[2, "aa", %d],
[3, "aa", %d],
[7, "bb", %d],
[8, "bb", %d],
[9, "bb", %d]
]}
]`, t1, t1, t1, t2, t2, t2)

self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)

self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
self.AssertContinuousQueryCount("test_cq", 1, c)
self.serverProcesses[0].WaitForServerToSync()
time.Sleep(time.Second)

collection := self.serverProcesses[0].Query("test_cq", "select * from points;", false, c)
series := collection.GetSeries("points", c)
c.Assert(series.Points, HasLen, 12)

collection = self.serverProcesses[0].Query("test_cq", "select * from points.count.a;", false, c)
series = collection.GetSeries("points.count.a", c)
c.Assert(series.Points, HasLen, 1)

collection = self.serverProcesses[0].Query("test_cq", "select * from points.count.aa;", false, c)
series = collection.GetSeries("points.count.aa", c)
c.Assert(series.Points, HasLen, 1)

collection = self.serverProcesses[0].Query("test_cq", "select * from points.count.b;", false, c)
series = collection.GetSeries("points.count.b", c)
c.Assert(series.Points, HasLen, 1)

collection = self.serverProcesses[0].Query("test_cq", "select * from points.count.bb;", false, c)
series = collection.GetSeries("points.count.bb", c)
c.Assert(series.Points, HasLen, 1)
}

func (self *ServerSuite) TestGetServers(c *C) {
body := self.serverProcesses[0].Get("/cluster/servers?u=root&p=root", c)

Expand Down

0 comments on commit 03435de

Please sign in to comment.