Skip to content

Commit

Permalink
Fix cache not deduplicating points in some cases
Browse files Browse the repository at this point in the history
The cache had some incorrect logic for determine when a series needed
to be deduplicated.  The logic was checking for unsorted points and
not considering duplicate points.  This would manifest itself as many
points (duplicate) points being returned from the cache and after a
snapshot compaction run, the points would disappear because snapshot
compaction always deduplicates and sorts the points.

Added a test that reproduces the issue.

Fixes #5719
  • Loading branch information
jwilder committed Feb 22, 2016
1 parent b6a0b6a commit aa2e878
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- [#5724](https://github.com/influxdata/influxdb/issues/5724): influx\_tsm doesn't close file handles properly
- [#5664](https://github.com/influxdata/influxdb/issues/5664): panic in model.Points.scanTo #5664
- [#5716](https://github.com/influxdata/influxdb/pull/5716): models: improve handling of points with empty field names or with no fields.
- [#5719](https://github.com/influxdata/influxdb/issues/5719): Fix cache not deduplicating points

## v0.10.1 [2016-02-18]

Expand Down
26 changes: 11 additions & 15 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func newEntry() *entry {

// add adds the given values to the entry.
func (e *entry) add(values []Value) {
// See if the new values are sorted or contain duplicate timestamps
var prevTime int64
for _, v := range values {
if v.UnixNano() <= prevTime {
e.needSort = true
break
}
prevTime = v.UnixNano()
}

// if there are existing values make sure they're all less than the first of
// the new values being added
if len(e.values) == 0 {
Expand All @@ -36,20 +46,6 @@ func (e *entry) add(values []Value) {
}
e.values = append(e.values, values...)
}

// if there's only one value, we know it's sorted
if len(values) <= 1 || e.needSort {
return
}

// make sure the new values were in sorted order
min := values[0].UnixNano()
for _, v := range values[1:] {
if min >= v.UnixNano() {
e.needSort = true
break
}
}
}

// deduplicate sorts and orders the entry's values. If values are already deduped and
Expand Down Expand Up @@ -271,7 +267,7 @@ func (c *Cache) merged(key string) Values {
n := 0
for _, e := range entries {
if !needSort && n > 0 {
needSort = values[n-1].UnixNano() > e.values[0].UnixNano()
needSort = values[n-1].UnixNano() >= e.values[0].UnixNano()
}
n += copy(values[n:], e.values)
}
Expand Down
35 changes: 35 additions & 0 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

// This tests writing two batches to the same series. The first batch
// is sorted. The second batch is also sorted but contains duplicates.
func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
v0 := NewValue(time.Unix(2, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(3, 0).UTC(), 1.0)
values0 := Values{v0, v1}

v3 := NewValue(time.Unix(4, 0).UTC(), 2.0)
v4 := NewValue(time.Unix(5, 0).UTC(), 3.0)
v5 := NewValue(time.Unix(5, 0).UTC(), 3.0)
values1 := Values{v3, v4, v5}

c := NewCache(0)

if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if err := c.WriteMulti(map[string][]Value{"foo": values1}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

expAscValues := Values{v0, v1, v3, v5}
if exp, got := len(expAscValues), len(c.Values("foo")); exp != got {
t.Fatalf("value count mismatch: exp: %v, got %v", exp, got)
}
if deduped := c.Values("foo"); !reflect.DeepEqual(expAscValues, deduped) {
t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped)
}
}

func TestCache_CacheValues(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 0.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
Expand Down

0 comments on commit aa2e878

Please sign in to comment.