Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache snapshotting performance improvements #7830

Merged
merged 8 commits into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV
- [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language.
- [#7326](https://github.com/influxdata/influxdb/issues/7326): Verbose output for SSL connection errors.
- [#7830](https://github.com/influxdata/influxdb/pull/7830): Cache snapshotting performance improvements

### Bugfixes

Expand Down
73 changes: 37 additions & 36 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,34 @@ const (
statCacheWriteDropped = "writeDropped"
)

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key string) (*entry, bool) // Get an entry by its key.
write(key string, values Values) error // Write an entry to the store.
add(key string, entry *entry) // Add a new entry to the store.
remove(key string) // Remove an entry from the store.
keys(sorted bool) []string // Return an optionally sorted slice of entry keys.
apply(f func(string, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func(string, *entry) error) error // Apply f to all entries in serial.
reset() // Reset the store to an initial unused state.
}

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
// TODO(edd): size is protected by mu but due to a bug in atomic size needs
// to be the first word in the struct, as that's the only place where you're
// guaranteed to be 64-bit aligned on a 32 bit system. See:
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
size uint64
commit sync.Mutex
// Due to a bug in atomic size needs to be the first word in the struct, as
// that's the only place where you're guaranteed to be 64-bit aligned on a
// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
size uint64
snapshotSize uint64

mu sync.RWMutex
store *ring
store storer
maxSize uint64

// snapshots are the cache objects that are currently being written to tsm files
// they're kept in memory while flushing so they can be queried along with the cache.
// they are read only and should never be modified
snapshot *Cache
snapshotSize uint64
snapshotting bool

// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
Expand Down Expand Up @@ -280,13 +291,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
addedSize += uint64(Values(v).Size())
}

// Set everything under one RLock. We'll optimistially set size here, and
// then decrement it later if there is a write error.
c.increaseSize(addedSize)
limit := c.maxSize
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize

// Enough room in the cache?
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
Expand All @@ -297,6 +304,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
store := c.store
c.mu.RUnlock()

// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
if err := store.write(k, v); err != nil {
// The write failed, hold onto the error and adjust the size delta.
Expand Down Expand Up @@ -345,35 +354,26 @@ func (c *Cache) Snapshot() (*Cache, error) {
}
}

// Append the current cache values to the snapshot. Because we're accessing
// the Cache we need to call f on each partition in serial.
if err := c.store.applySerial(func(k string, e *entry) error {
e.mu.RLock()
defer e.mu.RUnlock()
snapshotEntry, ok := c.snapshot.store.entry(k)
if ok {
if err := snapshotEntry.add(e.values); err != nil {
return err
}
} else {
c.snapshot.store.add(k, e)
snapshotEntry = e
}
atomic.AddUint64(&c.snapshotSize, uint64(Values(e.values).Size()))
return nil
}); err != nil {
return nil, err
// Did a prior snapshot exist that failed? If so, return the existing
// snapshot to retry.
if c.snapshot.Size() > 0 {
return c.snapshot, nil
}

snapshotSize := c.Size() // record the number of bytes written into a snapshot
c.snapshot.store, c.store = c.store, c.snapshot.store
snapshotSize := c.Size()

// Save the size of the snapshot on the snapshot cache
atomic.StoreUint64(&c.snapshot.size, snapshotSize)
// Save the size of the snapshot on the live cache
atomic.StoreUint64(&c.snapshotSize, snapshotSize)

// Reset the cache's store.
c.store.reset()
atomic.StoreUint64(&c.size, 0)
c.lastSnapshot = time.Now()

c.updateMemSize(-int64(snapshotSize)) // decrement the number of bytes in cache
c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateSnapshots()

return c.snapshot, nil
Expand Down Expand Up @@ -401,14 +401,15 @@ func (c *Cache) ClearSnapshot(success bool) {

if success {
c.snapshotAttempts = 0
atomic.StoreUint64(&c.snapshotSize, 0)
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache

// Reset the snapshot's store, and reset the snapshot to a fresh Cache.
c.snapshot.store.reset()
c.snapshot = &Cache{
store: c.snapshot.store,
}

atomic.StoreUint64(&c.snapshotSize, 0)
c.updateSnapshots()
}
}
Expand Down
97 changes: 95 additions & 2 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsm1

import (
"errors"
"fmt"
"io/ioutil"
"math"
Expand Down Expand Up @@ -99,6 +100,49 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

// Tests that the cache stats and size are correctly maintained during writes.
func TestCache_WriteMulti_Stats(t *testing.T) {
limit := uint64(1)
c := NewCache(limit, "")
ms := NewTestStore()
c.store = ms

// Not enough room in the cache.
v := NewValue(1, 1.0)
values := map[string][]Value{"foo": []Value{v, v}}
if got, exp := c.WriteMulti(values), ErrCacheMemorySizeLimitExceeded(uint64(v.Size()*2), limit); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %q, expected %q", got, exp)
}

// Fail one of the values in the write.
c = NewCache(50, "")
c.store = ms

ms.writef = func(key string, v Values) error {
if key == "foo" {
return errors.New("write failed")
}
return nil
}

values = map[string][]Value{"foo": []Value{v, v}, "bar": []Value{v}}
if got, exp := c.WriteMulti(values), errors.New("write failed"); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
}

// Cache size decreased correctly.
if got, exp := c.Size(), uint64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Write stats updated
if got, exp := c.stats.WriteDropped, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := c.stats.WriteErr, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand Down Expand Up @@ -388,6 +432,32 @@ func TestCache_CacheSnapshot(t *testing.T) {
}
}

// Tests that Snapshot updates statistics correctly.
func TestCache_Snapshot_Stats(t *testing.T) {
limit := uint64(16)
c := NewCache(limit, "")

values := map[string][]Value{"foo": []Value{NewValue(1, 1.0)}}
if err := c.WriteMulti(values); err != nil {
t.Fatal(err)
}

_, err := c.Snapshot()
if err != nil {
t.Fatal(err)
}

// Store size should have been reset.
if got, exp := c.Size(), uint64(0); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Cached bytes should have been increased.
if got, exp := c.stats.CachedBytes, int64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512, "")

Expand Down Expand Up @@ -425,7 +495,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys)
}
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
t.Fatalf("wrong error writing key bar to cache")
t.Fatalf("wrong error writing key bar to cache: %v", err)
}

// Grab snapshot, write should still fail since we're still using the memory.
Expand All @@ -434,7 +504,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
t.Fatalf("failed to snapshot cache: %v", err)
}
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
t.Fatalf("wrong error writing key bar to cache")
t.Fatalf("wrong error writing key bar to cache: %v", err)
}

// Clear the snapshot and the write should now succeed.
Expand Down Expand Up @@ -695,6 +765,29 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
return entry.Type(), snappy.Encode(b, b)
}

// TestStore implements the storer interface and can be used to mock out a
// Cache's storer implememation.
type TestStore struct {
entryf func(key string) (*entry, bool)
writef func(key string, values Values) error
addf func(key string, entry *entry)
removef func(key string)
keysf func(sorted bool) []string
applyf func(f func(string, *entry) error) error
applySerialf func(f func(string, *entry) error) error
resetf func()
}

func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key string) (*entry, bool) { return s.entryf(key) }
func (s *TestStore) write(key string, values Values) error { return s.writef(key, values) }
func (s *TestStore) add(key string, entry *entry) { s.addf(key, entry) }
func (s *TestStore) remove(key string) { s.removef(key) }
func (s *TestStore) keys(sorted bool) []string { return s.keysf(sorted) }
func (s *TestStore) apply(f func(string, *entry) error) error { return s.applyf(f) }
func (s *TestStore) applySerial(f func(string, *entry) error) error { return s.applySerialf(f) }
func (s *TestStore) reset() { s.resetf() }

var fvSize = uint64(NewValue(1, float64(1)).Size())

func BenchmarkCacheFloatEntries(b *testing.B) {
Expand Down
Loading