Skip to content

Commit

Permalink
Merge pull request #5751 from influxdata/jw-5719
Browse files Browse the repository at this point in the history
Fix cache not deduplicating points in some cases
  • Loading branch information
jwilder committed Feb 22, 2016
2 parents b6a0b6a + aa2e878 commit e25b5ab
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- [#5724](https://github.com/influxdata/influxdb/issues/5724): influx\_tsm doesn't close file handles properly
- [#5664](https://github.com/influxdata/influxdb/issues/5664): panic in model.Points.scanTo #5664
- [#5716](https://github.com/influxdata/influxdb/pull/5716): models: improve handling of points with empty field names or with no fields.
- [#5719](https://github.com/influxdata/influxdb/issues/5719): Fix cache not deduplicating points

## v0.10.1 [2016-02-18]

Expand Down
26 changes: 11 additions & 15 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func newEntry() *entry {

// add adds the given values to the entry.
func (e *entry) add(values []Value) {
// See if the new values are sorted or contain duplicate timestamps
var prevTime int64
for _, v := range values {
if v.UnixNano() <= prevTime {
e.needSort = true
break
}
prevTime = v.UnixNano()
}

// if there are existing values make sure they're all less than the first of
// the new values being added
if len(e.values) == 0 {
Expand All @@ -36,20 +46,6 @@ func (e *entry) add(values []Value) {
}
e.values = append(e.values, values...)
}

// if there's only one value, we know it's sorted
if len(values) <= 1 || e.needSort {
return
}

// make sure the new values were in sorted order
min := values[0].UnixNano()
for _, v := range values[1:] {
if min >= v.UnixNano() {
e.needSort = true
break
}
}
}

// deduplicate sorts and orders the entry's values. If values are already deduped and
Expand Down Expand Up @@ -271,7 +267,7 @@ func (c *Cache) merged(key string) Values {
n := 0
for _, e := range entries {
if !needSort && n > 0 {
needSort = values[n-1].UnixNano() > e.values[0].UnixNano()
needSort = values[n-1].UnixNano() >= e.values[0].UnixNano()
}
n += copy(values[n:], e.values)
}
Expand Down
35 changes: 35 additions & 0 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

// This tests writing two batches to the same series. The first batch
// is sorted. The second batch is also sorted but contains duplicates.
func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
v0 := NewValue(time.Unix(2, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(3, 0).UTC(), 1.0)
values0 := Values{v0, v1}

v3 := NewValue(time.Unix(4, 0).UTC(), 2.0)
v4 := NewValue(time.Unix(5, 0).UTC(), 3.0)
v5 := NewValue(time.Unix(5, 0).UTC(), 3.0)
values1 := Values{v3, v4, v5}

c := NewCache(0)

if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if err := c.WriteMulti(map[string][]Value{"foo": values1}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

expAscValues := Values{v0, v1, v3, v5}
if exp, got := len(expAscValues), len(c.Values("foo")); exp != got {
t.Fatalf("value count mismatch: exp: %v, got %v", exp, got)
}
if deduped := c.Values("foo"); !reflect.DeepEqual(expAscValues, deduped) {
t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped)
}
}

func TestCache_CacheValues(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 0.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
Expand Down

0 comments on commit e25b5ab

Please sign in to comment.