Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track cache, WAL, filestore stats within tsm1 engine #5758

Merged
merged 12 commits into from
Feb 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup
- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped.
- [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour

### Bugfixes

Expand Down
101 changes: 92 additions & 9 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package tsm1

import (
"expvar"
"fmt"
"log"
"os"
"sort"
"sync"
"time"

"github.com/influxdata/influxdb"
)

var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
Expand Down Expand Up @@ -62,6 +66,21 @@ func (e *entry) deduplicate() {
e.needSort = false
}

// Statistics gathered by the Cache.
const (
// levels - point in time measures

statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time

// counters - accumulative measures

statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
)

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
mu sync.RWMutex
Expand All @@ -74,30 +93,47 @@ type Cache struct {
// they are read only and should never be modified
snapshots []*Cache
snapshotsSize uint64

statMap *expvar.Map // nil for snapshots.
lastSnapshot time.Time
}

// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
// Only used for engine caches, never for snapshots
func NewCache(maxSize uint64, path string) *Cache {
c := &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
lastSnapshot: time.Now(),
}
c.UpdateAge()
c.UpdateCompactTime(0)
c.updateCachedBytes(0)
c.updateMemSize(0)
c.updateSnapshots()
return c
}

// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache has exceeded its max size.
func (c *Cache) Write(key string, values []Value) error {
c.mu.Lock()
defer c.mu.Unlock()

// Enough room in the cache?
newSize := c.size + uint64(Values(values).Size())
addedSize := Values(values).Size()
newSize := c.size + uint64(addedSize)
if c.maxSize > 0 && newSize+c.snapshotsSize > c.maxSize {
c.mu.Unlock()
return ErrCacheMemoryExceeded
}

c.write(key, values)
c.size = newSize
c.mu.Unlock()

// Update the memory size stat
c.updateMemSize(int64(addedSize))

return nil
}
Expand Down Expand Up @@ -126,6 +162,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
c.size = newSize
c.mu.Unlock()

// Update the memory size stat
c.updateMemSize(int64(totalSz))

return nil
}

Expand All @@ -135,16 +174,22 @@ func (c *Cache) Snapshot() *Cache {
c.mu.Lock()
defer c.mu.Unlock()

snapshot := NewCache(c.maxSize)
snapshot.store = c.store
snapshot.size = c.size
snapshot := &Cache{
store: c.store,
size: c.size,
}

c.store = make(map[string]*entry)
c.size = 0
c.lastSnapshot = time.Now()

c.snapshots = append(c.snapshots, snapshot)
c.snapshotsSize += snapshot.size

c.updateMemSize(-int64(snapshot.size))
c.updateCachedBytes(snapshot.size)
c.updateSnapshots()

return snapshot
}

Expand All @@ -169,6 +214,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
break
}
}

c.updateSnapshots()
}

// Size returns the number of point-calcuated bytes the cache currently uses.
Expand Down Expand Up @@ -385,3 +432,39 @@ func (cl *CacheLoader) Load(cache *Cache) error {
}
return nil
}

// Updates the age statistic
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := new(expvar.Int)
ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond))
c.statMap.Set(statCacheAgeMs, ageStat)
}

// Updates WAL compaction time statistic
func (c *Cache) UpdateCompactTime(d time.Duration) {
c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond))
}

// Update the cachedBytes counter
func (c *Cache) updateCachedBytes(b uint64) {
c.statMap.Add(statCachedBytes, int64(b))
}

// Update the memSize level
func (c *Cache) updateMemSize(b int64) {
c.statMap.Add(statCacheMemoryBytes, b)
}

// Update the snapshotsCount and the diskSize levels
func (c *Cache) updateSnapshots() {
// Update disk stats
diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

snapshotsStat := new(expvar.Int)
snapshotsStat.Set(int64(len(c.snapshots)))
c.statMap.Set(statSnapshots, snapshotsStat)
}
22 changes: 11 additions & 11 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestCache_NewCache(t *testing.T) {
c := NewCache(100)
c := NewCache(100, "")
if c == nil {
t.Fatalf("failed to create new cache")
}
Expand All @@ -35,7 +35,7 @@ func TestCache_CacheWrite(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")

if err := c.Write("foo", values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand All @@ -59,7 +59,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand All @@ -80,7 +80,7 @@ func TestCache_CacheValues(t *testing.T) {
v3 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)

c := NewCache(512)
c := NewCache(512, "")
if deduped := c.Values("no such key"); deduped != nil {
t.Fatalf("Values returned for no such key")
}
Expand All @@ -106,7 +106,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)

c := NewCache(512)
c := NewCache(512, "")
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
}

func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512)
c := NewCache(512, "")

// Grab snapshot, and ensure it's as expected.
snapshot := c.Snapshot()
Expand All @@ -174,7 +174,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)

c := NewCache(uint64(v1.Size()))
c := NewCache(uint64(v1.Size()), "")

if err := c.Write("foo", Values{v0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Load the cache using the segment.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand All @@ -253,7 +253,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Reload the cache using the segment.
cache = NewCache(1024)
cache = NewCache(1024, "")
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}

// Load the cache using the segments.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand Down Expand Up @@ -362,7 +362,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {

func BenchmarkCacheFloatEntries(b *testing.B) {
for i := 0; i < b.N; i++ {
cache := NewCache(10000)
cache := NewCache(10000, "")
for j := 0; j < 10000; j++ {
v := NewValue(time.Unix(1, 0), float64(j))
cache.Write("test", []Value{v})
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCompactor_Snapshot(t *testing.T) {
"cpu,host=B#!~#value": []tsm1.Value{v2, v3},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")
for k, v := range points1 {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0, v1},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
Expand Down
15 changes: 14 additions & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
fs := NewFileStore(path)
fs.traceLogging = opt.Config.DataLoggingEnabled

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)

c := &Compactor{
Dir: path,
Expand Down Expand Up @@ -410,10 +410,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen
func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot

var started *time.Time

defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Now().Sub(*started))
}
}()

closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
e.mu.Lock()
defer e.mu.Unlock()

now := time.Now()
started = &now

if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -477,6 +489,7 @@ func (e *Engine) compactCache() {
return

default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
err := e.WriteSnapshot()
if err != nil {
Expand Down
Loading