From ae838ef323ce3f17945a70c66ce211af2633c719 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 10 Jan 2017 20:47:26 -0700 Subject: [PATCH 1/8] Simplify Cache.Snapshot This simplifies the cache.Snapshot func to swap the hot cache to the snapshot cache instead of copy and appending entries. This reduces the amount of time the cache is write locked which should reduce cache contention for the read only code paths. --- tsdb/engine/tsm1/cache.go | 30 +++++++++++------------------- tsdb/engine/tsm1/cache_test.go | 4 ++-- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 1afaca5f3f3..0b2d8424b9e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -345,27 +345,19 @@ func (c *Cache) Snapshot() (*Cache, error) { } } - // Append the current cache values to the snapshot. Because we're accessing - // the Cache we need to call f on each partition in serial. - if err := c.store.applySerial(func(k string, e *entry) error { - e.mu.RLock() - defer e.mu.RUnlock() - snapshotEntry, ok := c.snapshot.store.entry(k) - if ok { - if err := snapshotEntry.add(e.values); err != nil { - return err - } - } else { - c.snapshot.store.add(k, e) - snapshotEntry = e - } - atomic.AddUint64(&c.snapshotSize, uint64(Values(e.values).Size())) - return nil - }); err != nil { - return nil, err + // Did a prior snapshot exist that failed? If so, return the existing + // snapshot to retry. + if c.snapshot.Size() > 0 { + return c.snapshot, nil } - snapshotSize := c.Size() // record the number of bytes written into a snapshot + c.snapshot.store, c.store = c.store, c.snapshot.store + snapshotSize := c.Size() + + // Save the size of the snapshot on the snapshot cache + c.snapshot.size = snapshotSize + // Save the size of the snapshot on the live cache + c.snapshotSize = snapshotSize // Reset the cache's store. c.store.reset() diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 561bfb50012..6b658ca8d8e 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -425,7 +425,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys) } if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") { - t.Fatalf("wrong error writing key bar to cache") + t.Fatalf("wrong error writing key bar to cache: %v", err) } // Grab snapshot, write should still fail since we're still using the memory. @@ -434,7 +434,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { t.Fatalf("failed to snapshot cache: %v", err) } if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") { - t.Fatalf("wrong error writing key bar to cache") + t.Fatalf("wrong error writing key bar to cache: %v", err) } // Clear the snapshot and the write should now succeed. From c433ff331f689e46d244d6073fe2ac2c90d83a1f Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 11 Jan 2017 11:02:22 -0700 Subject: [PATCH 2/8] Encode snapshots concurrently The CacheKeyIterator (used for snapshot compactions), iterated over each key and serially encoded the values for that key as the TSM file is written. With many series, this can be slow and will only use 1 CPU core even if more are available. This changes it so that the key space is split amongst a number of goroutines that start encoding all keys in parallel to improve throughput. --- tsdb/engine/tsm1/compact.go | 123 ++++++++++++++++++++++++++++-------- 1 file changed, 96 insertions(+), 27 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 2668a931bce..9f0b0810f0f 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -17,8 +17,10 @@ import ( "math" "os" "path/filepath" + "runtime" "sort" "sync" + "sync/atomic" "time" "github.com/influxdata/influxdb/tsdb" @@ -1282,12 +1284,17 @@ func (k *tsmKeyIterator) Close() error { type cacheKeyIterator struct { cache *Cache size int + order []string + i int + blocks [][]cacheBlock + ready []chan struct{} +} + +type cacheBlock struct { k string - order []string - values []Value - block []byte - minTime, maxTime time.Time + minTime, maxTime int64 + b []byte err error } @@ -1295,40 +1302,102 @@ type cacheKeyIterator struct { func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { keys := cache.Keys() - return &cacheKeyIterator{ - size: size, - cache: cache, - order: keys, + chans := make([]chan struct{}, len(keys)) + for i := 0; i < len(keys); i++ { + chans[i] = make(chan struct{}, 1) + } + + cki := &cacheKeyIterator{ + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + } + go cki.encode() + return cki +} + +func (c *cacheKeyIterator) encode() { + concurrency := runtime.NumCPU() + n := len(c.ready) + + // Divide the keyset across each CPU + chunkSize := 128 + idx := uint64(0) + var wg sync.WaitGroup + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + // Run one goroutine per CPU and encode a section of the key space concurrently + go func() { + for { + start := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize + if start >= n { + break + } + end := start + chunkSize + if end > n { + end = n + } + c.encodeRange(start, end) + } + wg.Done() + }() + } + wg.Wait() +} + +func (c *cacheKeyIterator) encodeRange(start, stop int) { + for i := start; i < stop; i++ { + key := c.order[i] + values := c.cache.values(key) + + for len(values) > 0 { + minTime, maxTime := values[0].UnixNano(), values[len(values)-1].UnixNano() + var b []byte + var err error + if len(values) > c.size { + maxTime = values[c.size-1].UnixNano() + b, err = Values(values[:c.size]).Encode(nil) + values = values[c.size:] + } else { + b, err = Values(values).Encode(nil) + values = values[:0] + } + c.blocks[i] = append(c.blocks[i], cacheBlock{ + k: key, + minTime: minTime, + maxTime: maxTime, + b: b, + err: err, + }) + } + // Notify this key is full encoded + c.ready[i] <- struct{}{} } } func (c *cacheKeyIterator) Next() bool { - if len(c.values) > c.size { - c.values = c.values[c.size:] - return true + if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 { + c.blocks[c.i] = c.blocks[c.i][1:] + if len(c.blocks[c.i]) > 0 { + return true + } } + c.i++ - if len(c.order) == 0 { + if c.i >= len(c.order) { return false } - c.k = c.order[0] - c.order = c.order[1:] - c.values = c.cache.values(c.k) - return len(c.values) > 0 + + <-c.ready[c.i] + return true } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { - minTime, maxTime := c.values[0].UnixNano(), c.values[len(c.values)-1].UnixNano() - var b []byte - var err error - if len(c.values) > c.size { - maxTime = c.values[c.size-1].UnixNano() - b, err = Values(c.values[:c.size]).Encode(nil) - } else { - b, err = Values(c.values).Encode(nil) - } - - return c.k, minTime, maxTime, b, err + blk := c.blocks[c.i][0] + return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } func (c *cacheKeyIterator) Close() error { From 40b017f4a459693d46e1def5e4c3451db8f8b1a8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 11 Jan 2017 17:54:51 -0700 Subject: [PATCH 3/8] Fix Cache stats size collection The memory stats as well as the size of the cache were not accurate. There was also a problem where the cache size would be increased optimisitically, but if the cache size limit was hit, it would not be decreased. This would cause the cache size to grow without bounds with every failed write. --- tsdb/engine/tsm1/cache.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 0b2d8424b9e..6ce27963050 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -280,9 +280,6 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { addedSize += uint64(Values(v).Size()) } - // Set everything under one RLock. We'll optimistially set size here, and - // then decrement it later if there is a write error. - c.increaseSize(addedSize) limit := c.maxSize n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize @@ -292,6 +289,10 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { return ErrCacheMemorySizeLimitExceeded(n, limit) } + // Set everything under one RLock. We'll optimistially set size here, and + // then decrement it later if there is a write error. + c.increaseSize(addedSize) + var werr error c.mu.RLock() store := c.store @@ -364,8 +365,7 @@ func (c *Cache) Snapshot() (*Cache, error) { atomic.StoreUint64(&c.size, 0) c.lastSnapshot = time.Now() - c.updateMemSize(-int64(snapshotSize)) // decrement the number of bytes in cache - c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot + c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot c.updateSnapshots() return c.snapshot, nil @@ -393,7 +393,7 @@ func (c *Cache) ClearSnapshot(success bool) { if success { c.snapshotAttempts = 0 - atomic.StoreUint64(&c.snapshotSize, 0) + c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache // Reset the snapshot's store, and reset the snapshot to a fresh Cache. c.snapshot.store.reset() @@ -401,6 +401,7 @@ func (c *Cache) ClearSnapshot(success bool) { store: c.snapshot.store, } + atomic.StoreUint64(&c.snapshotSize, 0) c.updateSnapshots() } } From 1e56b5416b35a09472d9086bd53009b2b82c458a Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 11 Jan 2017 17:57:40 -0700 Subject: [PATCH 4/8] Fix compactions sometimes getting stuck I ran into an issue where the cache snapshotting seemed to stop completely causing the cache to fill up and never recover. I believe this is due to the the Timer being reused incorrectly. Instead, use a Ticker that will fire more regularly and not require the resetting logic (which was wrong). --- tsdb/engine/tsm1/engine.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 5cb40784eef..e9abc5635da 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -949,7 +949,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk. func (e *Engine) compactCache(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { select { @@ -971,7 +971,6 @@ func (e *Engine) compactCache(quit <-chan struct{}) { atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) } } - t.Reset(time.Second) } } @@ -989,7 +988,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1003,12 +1002,11 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { s.Apply() } } - t.Reset(time.Second) } } func (e *Engine) compactTSMFull(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1023,7 +1021,6 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { } } - t.Reset(time.Second) } } From 33be1e1952692d30b82168832c45f5431d283552 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 12 Jan 2017 09:02:59 -0700 Subject: [PATCH 5/8] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d79fffd7aaf..16f8c1fb2ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco - [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV - [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language. - [#7326](https://github.com/influxdata/influxdb/issues/7326): Verbose output for SSL connection errors. +- [#7830](https://github.com/influxdata/influxdb/pull/7830): Cache snapshotting performance improvements ### Bugfixes From 73ed864e1de15ba576d189a75153dd674a2895c4 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 12 Jan 2017 16:02:16 +0000 Subject: [PATCH 6/8] Add cache tests --- tsdb/engine/tsm1/cache.go | 36 ++++++++----- tsdb/engine/tsm1/cache_test.go | 93 ++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 14 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 6ce27963050..0366b3190f8 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -166,16 +166,27 @@ const ( statCacheWriteDropped = "writeDropped" ) +// storer is the interface that descibes a cache's store. +type storer interface { + entry(key string) (*entry, bool) // Get an entry by its key. + write(key string, values Values) error // Write an entry to the store. + add(key string, entry *entry) // Add a new entry to the store. + remove(key string) // Remove an entry from the store. + keys(sorted bool) []string // Return an optionally sorted slice of entry keys. + apply(f func(string, *entry) error) error // Apply f to all entries in the store in parallel. + applySerial(f func(string, *entry) error) error // Apply f to all entries in serial. + reset() // Reset the store to an initial unused state. +} + // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { - // TODO(edd): size is protected by mu but due to a bug in atomic size needs - // to be the first word in the struct, as that's the only place where you're - // guaranteed to be 64-bit aligned on a 32 bit system. See: - // https://golang.org/pkg/sync/atomic/#pkg-note-BUG - size uint64 - commit sync.Mutex + // Due to a bug in atomic size needs to be the first word in the struct, as + // that's the only place where you're guaranteed to be 64-bit aligned on a + // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG + size uint64 + mu sync.RWMutex - store *ring + store storer maxSize uint64 // snapshots are the cache objects that are currently being written to tsm files @@ -280,24 +291,21 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { addedSize += uint64(Values(v).Size()) } - limit := c.maxSize - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize - // Enough room in the cache? + limit := c.maxSize // maxSize is safe for reading without a lock. + n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) return ErrCacheMemorySizeLimitExceeded(n, limit) } - // Set everything under one RLock. We'll optimistially set size here, and - // then decrement it later if there is a write error. - c.increaseSize(addedSize) - var werr error c.mu.RLock() store := c.store c.mu.RUnlock() + // We'll optimistially set size here, and then decrement it for write errors. + c.increaseSize(addedSize) for k, v := range values { if err := store.write(k, v); err != nil { // The write failed, hold onto the error and adjust the size delta. diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 6b658ca8d8e..a7bf6db4900 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -1,6 +1,7 @@ package tsm1 import ( + "errors" "fmt" "io/ioutil" "math" @@ -99,6 +100,49 @@ func TestCache_CacheWriteMulti(t *testing.T) { } } +// Tests that the cache stats and size are correctly maintained during writes. +func TestCache_WriteMulti_Stats(t *testing.T) { + limit := uint64(1) + c := NewCache(limit, "") + ms := NewTestStore() + c.store = ms + + // Not enough room in the cache. + v := NewValue(1, 1.0) + values := map[string][]Value{"foo": []Value{v, v}} + if got, exp := c.WriteMulti(values), ErrCacheMemorySizeLimitExceeded(uint64(v.Size()*2), limit); !reflect.DeepEqual(got, exp) { + t.Fatalf("got %q, expected %q", got, exp) + } + + // Fail one of the values in the write. + c = NewCache(50, "") + c.store = ms + + ms.writef = func(key string, v Values) error { + if key == "foo" { + return errors.New("write failed") + } + return nil + } + + values = map[string][]Value{"foo": []Value{v, v}, "bar": []Value{v}} + if got, exp := c.WriteMulti(values), errors.New("write failed"); !reflect.DeepEqual(got, exp) { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Cache size decreased correctly. + if got, exp := c.Size(), uint64(16); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Write stats updated + if got, exp := c.stats.WriteDropped, int64(1); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } else if got, exp := c.stats.WriteErr, int64(1); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } +} + func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) @@ -388,6 +432,32 @@ func TestCache_CacheSnapshot(t *testing.T) { } } +// Tests that Snapshot updates statistics correctly. +func TestCache_Snapshot_Stats(t *testing.T) { + limit := uint64(16) + c := NewCache(limit, "") + + values := map[string][]Value{"foo": []Value{NewValue(1, 1.0)}} + if err := c.WriteMulti(values); err != nil { + t.Fatal(err) + } + + _, err := c.Snapshot() + if err != nil { + t.Fatal(err) + } + + // Store size should have been reset. + if got, exp := c.Size(), uint64(0); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // Cached bytes should have been increased. + if got, exp := c.stats.CachedBytes, int64(16); got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } +} + func TestCache_CacheEmptySnapshot(t *testing.T) { c := NewCache(512, "") @@ -695,6 +765,29 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) { return entry.Type(), snappy.Encode(b, b) } +// TestStore implements the storer interface and can be used to mock out a +// Cache's storer implememation. +type TestStore struct { + entryf func(key string) (*entry, bool) + writef func(key string, values Values) error + addf func(key string, entry *entry) + removef func(key string) + keysf func(sorted bool) []string + applyf func(f func(string, *entry) error) error + applySerialf func(f func(string, *entry) error) error + resetf func() +} + +func NewTestStore() *TestStore { return &TestStore{} } +func (s *TestStore) entry(key string) (*entry, bool) { return s.entryf(key) } +func (s *TestStore) write(key string, values Values) error { return s.writef(key, values) } +func (s *TestStore) add(key string, entry *entry) { s.addf(key, entry) } +func (s *TestStore) remove(key string) { s.removef(key) } +func (s *TestStore) keys(sorted bool) []string { return s.keysf(sorted) } +func (s *TestStore) apply(f func(string, *entry) error) error { return s.applyf(f) } +func (s *TestStore) applySerial(f func(string, *entry) error) error { return s.applySerialf(f) } +func (s *TestStore) reset() { s.resetf() } + var fvSize = uint64(NewValue(1, float64(1)).Size()) func BenchmarkCacheFloatEntries(b *testing.B) { From 06a8fd6ca2389428d4ead90a296139f54f2fe684 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 12 Jan 2017 09:55:38 -0700 Subject: [PATCH 7/8] Simplifications and cleanup --- tsdb/engine/tsm1/cache.go | 4 ++-- tsdb/engine/tsm1/compact.go | 10 +++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 0366b3190f8..eb3184b9977 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -364,9 +364,9 @@ func (c *Cache) Snapshot() (*Cache, error) { snapshotSize := c.Size() // Save the size of the snapshot on the snapshot cache - c.snapshot.size = snapshotSize + atomic.StoreUint64(&c.snapshot.size, snapshotSize) // Save the size of the snapshot on the live cache - c.snapshotSize = snapshotSize + atomic.StoreUint64(&c.snapshotSize, snapshotSize) // Reset the cache's store. c.store.reset() diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 9f0b0810f0f..b43ce866cd4 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1320,14 +1320,12 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { } func (c *cacheKeyIterator) encode() { - concurrency := runtime.NumCPU() + concurrency := runtime.GOMAXPROCS(0) n := len(c.ready) // Divide the keyset across each CPU chunkSize := 128 idx := uint64(0) - var wg sync.WaitGroup - wg.Add(concurrency) for i := 0; i < concurrency; i++ { // Run one goroutine per CPU and encode a section of the key space concurrently go func() { @@ -1342,10 +1340,8 @@ func (c *cacheKeyIterator) encode() { } c.encodeRange(start, end) } - wg.Done() }() } - wg.Wait() } func (c *cacheKeyIterator) encodeRange(start, stop int) { @@ -1373,7 +1369,7 @@ func (c *cacheKeyIterator) encodeRange(start, stop int) { err: err, }) } - // Notify this key is full encoded + // Notify this key is fully encoded c.ready[i] <- struct{}{} } } @@ -1387,7 +1383,7 @@ func (c *cacheKeyIterator) Next() bool { } c.i++ - if c.i >= len(c.order) { + if c.i >= len(c.ready) { return false } From 11f264563a97a03dff8e0b7d900cb15caf09f445 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 12 Jan 2017 12:01:49 -0700 Subject: [PATCH 8/8] Fix 32bit alignment --- tsdb/engine/tsm1/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index eb3184b9977..afe9b629a55 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -183,7 +183,8 @@ type Cache struct { // Due to a bug in atomic size needs to be the first word in the struct, as // that's the only place where you're guaranteed to be 64-bit aligned on a // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG - size uint64 + size uint64 + snapshotSize uint64 mu sync.RWMutex store storer @@ -193,7 +194,6 @@ type Cache struct { // they're kept in memory while flushing so they can be queried along with the cache. // they are read only and should never be modified snapshot *Cache - snapshotSize uint64 snapshotting bool // This number is the number of pending or failed WriteSnaphot attempts since the last successful one.