Skip to content

Commit

Permalink
Enforce predictable series cursor order
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Aug 25, 2015
1 parent 6193226 commit 405ee97
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
61 changes: 61 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,67 @@ func TestServer_Query_Aggregates(t *testing.T) {
}
}

// Test various aggregates when different series only have data for the same timestamp.
func TestServer_Query_AggregatesIdenticalTime(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`series,host=a value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=b value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=c value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=d value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=e value=5 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=f value=5 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=g value=5 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=h value=5 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`series,host=i value=5 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "last from multiple series with identical timestamp",
params: url.Values{"db": []string{"db0"}},
command: `SELECT last(value) FROM "series"`,
exp: `{"results":[{"series":[{"name":"series","columns":["time","last"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
&Query{
name: "first from multiple series with identical timestamp",
params: url.Values{"db": []string{"db0"}},
command: `SELECT first(value) FROM "series"`,
exp: `{"results":[{"series":[{"name":"series","columns":["time","first"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Write_Precision(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
Expand Down
10 changes: 8 additions & 2 deletions tsdb/mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsdb

import (
"bytes"
"container/heap"
"encoding/binary"
"encoding/json"
Expand Down Expand Up @@ -653,8 +654,13 @@ func newPointHeap() *pointHeap {
func (pq pointHeap) Len() int { return len(pq) }

func (pq pointHeap) Less(i, j int) bool {
// We want a min-heap (points in chronological order), so use less than.
return pq[i].timestamp < pq[j].timestamp
if pq[i].timestamp < pq[j].timestamp {
return true
} else if pq[i].timestamp > pq[j].timestamp {
return false
}
// Break the tie in a deterministic way.
return bytes.Compare(pq[i].value, pq[j].value) == -1
}

func (pq pointHeap) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
Expand Down

0 comments on commit 405ee97

Please sign in to comment.