diff --git a/CHANGELOG.md b/CHANGELOG.md index d79fffd7aaf..16f8c1fb2ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco - [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV - [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language. - [#7326](https://github.com/influxdata/influxdb/issues/7326): Verbose output for SSL connection errors. +- [#7830](https://github.com/influxdata/influxdb/pull/7830): Cache snapshotting performance improvements ### Bugfixes diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 1afaca5f3f3..afe9b629a55 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -166,23 +166,34 @@ const ( statCacheWriteDropped = "writeDropped" ) +// storer is the interface that descibes a cache's store. +type storer interface { + entry(key string) (*entry, bool) // Get an entry by its key. + write(key string, values Values) error // Write an entry to the store. + add(key string, entry *entry) // Add a new entry to the store. + remove(key string) // Remove an entry from the store. + keys(sorted bool) []string // Return an optionally sorted slice of entry keys. + apply(f func(string, *entry) error) error // Apply f to all entries in the store in parallel. + applySerial(f func(string, *entry) error) error // Apply f to all entries in serial. + reset() // Reset the store to an initial unused state. +} + // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { - // TODO(edd): size is protected by mu but due to a bug in atomic size needs - // to be the first word in the struct, as that's the only place where you're - // guaranteed to be 64-bit aligned on a 32 bit system. See: - // https://golang.org/pkg/sync/atomic/#pkg-note-BUG - size uint64 - commit sync.Mutex + // Due to a bug in atomic size needs to be the first word in the struct, as + // that's the only place where you're guaranteed to be 64-bit aligned on a + // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG + size uint64 + snapshotSize uint64 + mu sync.RWMutex - store *ring + store storer maxSize uint64 // snapshots are the cache objects that are currently being written to tsm files // they're kept in memory while flushing so they can be queried along with the cache. // they are read only and should never be modified snapshot *Cache - snapshotSize uint64 snapshotting bool // This number is the number of pending or failed WriteSnaphot attempts since the last successful one. @@ -280,13 +291,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { addedSize += uint64(Values(v).Size()) } - // Set everything under one RLock. We'll optimistially set size here, and - // then decrement it later if there is a write error. - c.increaseSize(addedSize) - limit := c.maxSize - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize - // Enough room in the cache? + limit := c.maxSize // maxSize is safe for reading without a lock. + n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) return ErrCacheMemorySizeLimitExceeded(n, limit) @@ -297,6 +304,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { store := c.store c.mu.RUnlock() + // We'll optimistially set size here, and then decrement it for write errors. + c.increaseSize(addedSize) for k, v := range values { if err := store.write(k, v); err != nil { // The write failed, hold onto the error and adjust the size delta. @@ -345,35 +354,26 @@ func (c *Cache) Snapshot() (*Cache, error) { } } - // Append the current cache values to the snapshot. Because we're accessing - // the Cache we need to call f on each partition in serial. - if err := c.store.applySerial(func(k string, e *entry) error { - e.mu.RLock() - defer e.mu.RUnlock() - snapshotEntry, ok := c.snapshot.store.entry(k) - if ok { - if err := snapshotEntry.add(e.values); err != nil { - return err - } - } else { - c.snapshot.store.add(k, e) - snapshotEntry = e - } - atomic.AddUint64(&c.snapshotSize, uint64(Values(e.values).Size())) - return nil - }); err != nil { - return nil, err + // Did a prior snapshot exist that failed? If so, return the existing + // snapshot to retry. + if c.snapshot.Size() > 0 { + return c.snapshot, nil } - snapshotSize := c.Size() // record the number of bytes written into a snapshot + c.snapshot.store, c.store = c.store, c.snapshot.store + snapshotSize := c.Size() + + // Save the size of the snapshot on the snapshot cache + atomic.StoreUint64(&c.snapshot.size, snapshotSize) + // Save the size of the snapshot on the live cache + atomic.StoreUint64(&c.snapshotSize, snapshotSize) // Reset the cache's store. c.store.reset() atomic.StoreUint64(&c.size, 0) c.lastSnapshot = time.Now() - c.updateMemSize(-int64(snapshotSize)) // decrement the number of bytes in cache - c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot + c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot c.updateSnapshots() return c.snapshot, nil @@ -401,7 +401,7 @@ func (c *Cache) ClearSnapshot(success bool) { if success { c.snapshotAttempts = 0 - atomic.StoreUint64(&c.snapshotSize, 0) + c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache // Reset the snapshot's store, and reset the snapshot to a fresh Cache. c.snapshot.store.reset() @@ -409,6 +409,7 @@ func (c *Cache) ClearSnapshot(success bool) { store: c.snapshot.store, } + atomic.StoreUint64(&c.snapshotSize, 0) c.updateSnapshots() } } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 561bfb50012..a7bf6db4900 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -1,6 +1,7 @@ package tsm1 import ( + "errors" "fmt" "io/ioutil" "math" @@ -99,6 +100,49 @@ func TestCache_CacheWriteMulti(t *testing.T) { } } +// Tests that the cache stats and size are correctly maintained during writes. +func TestCache_WriteMulti_Stats(t *testing.T) { + limit := uint64(1) + c := NewCache(limit, "") + ms := NewTestStore() + c.store = ms + + // Not enough room in the cache. + v := NewValue(1, 1.0) + values := map[string][]Value{"foo": []Value{v, v}} + if got, exp := c.WriteMulti(values), ErrCacheMemorySizeLimitExceeded(uint64(v.Size()*2), limit); !reflect.DeepEqual(got, exp) { + t.Fatalf("got %q, expected %q", got, exp) + } + + // Fail one of the values in the write. + c = NewCache(50, "") + c.store = ms + + ms.writef = func(key string, v Values) error { + if key == "foo" { + return errors.New("write failed") + } + return nil + } + + values = map[string][]Value{"foo": []Value{v, v}, "bar": []Value{v}} + if got, exp := c.WriteMulti(values), errors.New("write failed"); !reflect.DeepEqual(got, exp) { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Cache size decreased correctly. + if got, exp := c.Size(), uint64(16); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Write stats updated + if got, exp := c.stats.WriteDropped, int64(1); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } else if got, exp := c.stats.WriteErr, int64(1); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } +} + func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) @@ -388,6 +432,32 @@ func TestCache_CacheSnapshot(t *testing.T) { } } +// Tests that Snapshot updates statistics correctly. +func TestCache_Snapshot_Stats(t *testing.T) { + limit := uint64(16) + c := NewCache(limit, "") + + values := map[string][]Value{"foo": []Value{NewValue(1, 1.0)}} + if err := c.WriteMulti(values); err != nil { + t.Fatal(err) + } + + _, err := c.Snapshot() + if err != nil { + t.Fatal(err) + } + + // Store size should have been reset. + if got, exp := c.Size(), uint64(0); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Cached bytes should have been increased. + if got, exp := c.stats.CachedBytes, int64(16); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } +} + func TestCache_CacheEmptySnapshot(t *testing.T) { c := NewCache(512, "") @@ -425,7 +495,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys) } if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") { - t.Fatalf("wrong error writing key bar to cache") + t.Fatalf("wrong error writing key bar to cache: %v", err) } // Grab snapshot, write should still fail since we're still using the memory. @@ -434,7 +504,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { t.Fatalf("failed to snapshot cache: %v", err) } if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") { - t.Fatalf("wrong error writing key bar to cache") + t.Fatalf("wrong error writing key bar to cache: %v", err) } // Clear the snapshot and the write should now succeed. @@ -695,6 +765,29 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) { return entry.Type(), snappy.Encode(b, b) } +// TestStore implements the storer interface and can be used to mock out a +// Cache's storer implememation. +type TestStore struct { + entryf func(key string) (*entry, bool) + writef func(key string, values Values) error + addf func(key string, entry *entry) + removef func(key string) + keysf func(sorted bool) []string + applyf func(f func(string, *entry) error) error + applySerialf func(f func(string, *entry) error) error + resetf func() +} + +func NewTestStore() *TestStore { return &TestStore{} } +func (s *TestStore) entry(key string) (*entry, bool) { return s.entryf(key) } +func (s *TestStore) write(key string, values Values) error { return s.writef(key, values) } +func (s *TestStore) add(key string, entry *entry) { s.addf(key, entry) } +func (s *TestStore) remove(key string) { s.removef(key) } +func (s *TestStore) keys(sorted bool) []string { return s.keysf(sorted) } +func (s *TestStore) apply(f func(string, *entry) error) error { return s.applyf(f) } +func (s *TestStore) applySerial(f func(string, *entry) error) error { return s.applySerialf(f) } +func (s *TestStore) reset() { s.resetf() } + var fvSize = uint64(NewValue(1, float64(1)).Size()) func BenchmarkCacheFloatEntries(b *testing.B) { diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 2668a931bce..b43ce866cd4 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -17,8 +17,10 @@ import ( "math" "os" "path/filepath" + "runtime" "sort" "sync" + "sync/atomic" "time" "github.com/influxdata/influxdb/tsdb" @@ -1282,12 +1284,17 @@ func (k *tsmKeyIterator) Close() error { type cacheKeyIterator struct { cache *Cache size int + order []string + i int + blocks [][]cacheBlock + ready []chan struct{} +} + +type cacheBlock struct { k string - order []string - values []Value - block []byte - minTime, maxTime time.Time + minTime, maxTime int64 + b []byte err error } @@ -1295,40 +1302,98 @@ type cacheKeyIterator struct { func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { keys := cache.Keys() - return &cacheKeyIterator{ - size: size, - cache: cache, - order: keys, + chans := make([]chan struct{}, len(keys)) + for i := 0; i < len(keys); i++ { + chans[i] = make(chan struct{}, 1) + } + + cki := &cacheKeyIterator{ + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + } + go cki.encode() + return cki +} + +func (c *cacheKeyIterator) encode() { + concurrency := runtime.GOMAXPROCS(0) + n := len(c.ready) + + // Divide the keyset across each CPU + chunkSize := 128 + idx := uint64(0) + for i := 0; i < concurrency; i++ { + // Run one goroutine per CPU and encode a section of the key space concurrently + go func() { + for { + start := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize + if start >= n { + break + } + end := start + chunkSize + if end > n { + end = n + } + c.encodeRange(start, end) + } + }() + } +} + +func (c *cacheKeyIterator) encodeRange(start, stop int) { + for i := start; i < stop; i++ { + key := c.order[i] + values := c.cache.values(key) + + for len(values) > 0 { + minTime, maxTime := values[0].UnixNano(), values[len(values)-1].UnixNano() + var b []byte + var err error + if len(values) > c.size { + maxTime = values[c.size-1].UnixNano() + b, err = Values(values[:c.size]).Encode(nil) + values = values[c.size:] + } else { + b, err = Values(values).Encode(nil) + values = values[:0] + } + c.blocks[i] = append(c.blocks[i], cacheBlock{ + k: key, + minTime: minTime, + maxTime: maxTime, + b: b, + err: err, + }) + } + // Notify this key is fully encoded + c.ready[i] <- struct{}{} } } func (c *cacheKeyIterator) Next() bool { - if len(c.values) > c.size { - c.values = c.values[c.size:] - return true + if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 { + c.blocks[c.i] = c.blocks[c.i][1:] + if len(c.blocks[c.i]) > 0 { + return true + } } + c.i++ - if len(c.order) == 0 { + if c.i >= len(c.ready) { return false } - c.k = c.order[0] - c.order = c.order[1:] - c.values = c.cache.values(c.k) - return len(c.values) > 0 + + <-c.ready[c.i] + return true } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { - minTime, maxTime := c.values[0].UnixNano(), c.values[len(c.values)-1].UnixNano() - var b []byte - var err error - if len(c.values) > c.size { - maxTime = c.values[c.size-1].UnixNano() - b, err = Values(c.values[:c.size]).Encode(nil) - } else { - b, err = Values(c.values).Encode(nil) - } - - return c.k, minTime, maxTime, b, err + blk := c.blocks[c.i][0] + return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } func (c *cacheKeyIterator) Close() error { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 5cb40784eef..e9abc5635da 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -949,7 +949,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk. func (e *Engine) compactCache(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { select { @@ -971,7 +971,6 @@ func (e *Engine) compactCache(quit <-chan struct{}) { atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) } } - t.Reset(time.Second) } } @@ -989,7 +988,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1003,12 +1002,11 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { s.Apply() } } - t.Reset(time.Second) } } func (e *Engine) compactTSMFull(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1023,7 +1021,6 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { } } - t.Reset(time.Second) } }