From 3c66683a1d11f54f31ef4e41ba58fd3846cc3b9b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 19 Feb 2016 09:59:33 -0700 Subject: [PATCH] Fix cache not deduplicating points in some cases The cache had some incorrect logic for determine when a series needed to be deduplicated. The logic was checking for unsorted points and not considering duplicate points. This would manifest itself as many points (duplicate) points being returned from the cache and after a snapshot compaction run, the points would disappear because snapshot compaction always deduplicates and sorts the points. Added a test that reproduces the issue. Fixes #5719 --- tsdb/engine/tsm1/cache.go | 26 +++++++++++-------------- tsdb/engine/tsm1/cache_test.go | 35 ++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 85cb740ac52..f7e34414f16 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -24,6 +24,16 @@ func newEntry() *entry { // add adds the given values to the entry. func (e *entry) add(values []Value) { + // See if the new values are sorted or contain duplicate timestamps + var prevTime int64 + for _, v := range values { + if v.UnixNano() <= prevTime { + e.needSort = true + break + } + prevTime = v.UnixNano() + } + // if there are existing values make sure they're all less than the first of // the new values being added if len(e.values) == 0 { @@ -36,20 +46,6 @@ func (e *entry) add(values []Value) { } e.values = append(e.values, values...) } - - // if there's only one value, we know it's sorted - if len(values) <= 1 || e.needSort { - return - } - - // make sure the new values were in sorted order - min := values[0].UnixNano() - for _, v := range values[1:] { - if min >= v.UnixNano() { - e.needSort = true - break - } - } } // deduplicate sorts and orders the entry's values. If values are already deduped and @@ -271,7 +267,7 @@ func (c *Cache) merged(key string) Values { n := 0 for _, e := range entries { if !needSort && n > 0 { - needSort = values[n-1].UnixNano() > e.values[0].UnixNano() + needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() } n += copy(values[n:], e.values) } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 21512704bfa..9a7a4a06e49 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -73,6 +73,41 @@ func TestCache_CacheWriteMulti(t *testing.T) { } } +// This tests writing two batches to the same series. The first batch +// is sorted. The second batch is also sorted but contains duplicates. +func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { + v0 := NewValue(time.Unix(2, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(3, 0).UTC(), 1.0) + values0 := Values{v0, v1} + + v3 := NewValue(time.Unix(4, 0).UTC(), 2.0) + v4 := NewValue(time.Unix(5, 0).UTC(), 3.0) + v5 := NewValue(time.Unix(5, 0).UTC(), 3.0) + values1 := Values{v3, v4, v5} + + c := NewCache(0) + + if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if err := c.WriteMulti(map[string][]Value{"foo": values1}); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + } + + expAscValues := Values{v0, v1, v3, v5} + if exp, got := len(expAscValues), len(c.Values("foo")); exp != got { + t.Fatalf("value count mismatch: exp: %v, got %v", exp, got) + } + if deduped := c.Values("foo"); !reflect.DeepEqual(expAscValues, deduped) { + t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped) + } +} + func TestCache_CacheValues(t *testing.T) { v0 := NewValue(time.Unix(1, 0).UTC(), 0.0) v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)