diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 85cb740ac52..f7e34414f16 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -24,6 +24,16 @@ func newEntry() *entry { // add adds the given values to the entry. func (e *entry) add(values []Value) { + // See if the new values are sorted or contain duplicate timestamps + var prevTime int64 + for _, v := range values { + if v.UnixNano() <= prevTime { + e.needSort = true + break + } + prevTime = v.UnixNano() + } + // if there are existing values make sure they're all less than the first of // the new values being added if len(e.values) == 0 { @@ -36,20 +46,6 @@ func (e *entry) add(values []Value) { } e.values = append(e.values, values...) } - - // if there's only one value, we know it's sorted - if len(values) <= 1 || e.needSort { - return - } - - // make sure the new values were in sorted order - min := values[0].UnixNano() - for _, v := range values[1:] { - if min >= v.UnixNano() { - e.needSort = true - break - } - } } // deduplicate sorts and orders the entry's values. If values are already deduped and @@ -271,7 +267,7 @@ func (c *Cache) merged(key string) Values { n := 0 for _, e := range entries { if !needSort && n > 0 { - needSort = values[n-1].UnixNano() > e.values[0].UnixNano() + needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() } n += copy(values[n:], e.values) } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 21512704bfa..9a7a4a06e49 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -73,6 +73,41 @@ func TestCache_CacheWriteMulti(t *testing.T) { } } +// This tests writing two batches to the same series. The first batch +// is sorted. The second batch is also sorted but contains duplicates. +func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { + v0 := NewValue(time.Unix(2, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(3, 0).UTC(), 1.0) + values0 := Values{v0, v1} + + v3 := NewValue(time.Unix(4, 0).UTC(), 2.0) + v4 := NewValue(time.Unix(5, 0).UTC(), 3.0) + v5 := NewValue(time.Unix(5, 0).UTC(), 3.0) + values1 := Values{v3, v4, v5} + + c := NewCache(0) + + if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if err := c.WriteMulti(map[string][]Value{"foo": values1}); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + } + + expAscValues := Values{v0, v1, v3, v5} + if exp, got := len(expAscValues), len(c.Values("foo")); exp != got { + t.Fatalf("value count mismatch: exp: %v, got %v", exp, got) + } + if deduped := c.Values("foo"); !reflect.DeepEqual(expAscValues, deduped) { + t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped) + } +} + func TestCache_CacheValues(t *testing.T) { v0 := NewValue(time.Unix(1, 0).UTC(), 0.0) v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)