diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index f7e34414f16..e5b4c060883 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -1,11 +1,14 @@ package tsm1 import ( + "expvar" "fmt" "log" "os" "sort" "sync" + + "github.com/influxdata/influxdb" ) var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded") @@ -58,6 +61,12 @@ func (e *entry) deduplicate() { e.needSort = false } +// Statistics gathered by the Cache. +const ( + statCacheMemoryBytes = "memBytes" // Size of in-memory cache in bytes + statCacheDiskBytes = "diskBytes" // Size of on-disk snapshots in bytes +) + // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { mu sync.RWMutex @@ -70,13 +79,20 @@ type Cache struct { // they are read only and should never be modified snapshots []*Cache snapshotsSize uint64 + + statMap *expvar.Map + + // path is only used to track stats + path string } // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. -func NewCache(maxSize uint64) *Cache { +func NewCache(maxSize uint64, path string) *Cache { return &Cache{ maxSize: maxSize, store: make(map[string]*entry), + statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}), + path: path, } } @@ -95,6 +111,11 @@ func (c *Cache) Write(key string, values []Value) error { c.write(key, values) c.size = newSize + // Update the memory size stat + sizeStat := new(expvar.Int) + sizeStat.Set(int64(c.size)) + c.statMap.Set(statCacheMemoryBytes, sizeStat) + return nil } @@ -122,6 +143,11 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { c.size = newSize c.mu.Unlock() + // Update the memory size stat + sizeStat := new(expvar.Int) + sizeStat.Set(int64(newSize)) + c.statMap.Set(statCacheMemoryBytes, sizeStat) + return nil } @@ -131,7 +157,7 @@ func (c *Cache) Snapshot() *Cache { c.mu.Lock() defer c.mu.Unlock() - snapshot := NewCache(c.maxSize) + snapshot := NewCache(c.maxSize, c.path+"!snapshot") snapshot.store = c.store snapshot.size = c.size @@ -141,6 +167,15 @@ func (c *Cache) Snapshot() *Cache { c.snapshots = append(c.snapshots, snapshot) c.snapshotsSize += snapshot.size + // Update stats + memSizeStat := new(expvar.Int) + memSizeStat.Set(0) + c.statMap.Set(statCacheMemoryBytes, memSizeStat) + + diskSizeStat := new(expvar.Int) + diskSizeStat.Set(int64(c.snapshotsSize)) + c.statMap.Set(statCacheDiskBytes, diskSizeStat) + return snapshot } @@ -165,6 +200,11 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) { break } } + + // Update disk stats + diskSizeStat := new(expvar.Int) + diskSizeStat.Set(int64(c.snapshotsSize)) + c.statMap.Set(statCacheDiskBytes, diskSizeStat) } // Size returns the number of point-calcuated bytes the cache currently uses. diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 9a7a4a06e49..fffb83e7b69 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -12,7 +12,7 @@ import ( ) func TestCache_NewCache(t *testing.T) { - c := NewCache(100) + c := NewCache(100, "") if c == nil { t.Fatalf("failed to create new cache") } @@ -35,7 +35,7 @@ func TestCache_CacheWrite(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, "") if err := c.Write("foo", values); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -59,7 +59,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, "") if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -85,7 +85,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { v5 := NewValue(time.Unix(5, 0).UTC(), 3.0) values1 := Values{v3, v4, v5} - c := NewCache(0) + c := NewCache(0, "") if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -115,7 +115,7 @@ func TestCache_CacheValues(t *testing.T) { v3 := NewValue(time.Unix(1, 0).UTC(), 1.0) v4 := NewValue(time.Unix(4, 0).UTC(), 4.0) - c := NewCache(512) + c := NewCache(512, "") if deduped := c.Values("no such key"); deduped != nil { t.Fatalf("Values returned for no such key") } @@ -141,7 +141,7 @@ func TestCache_CacheSnapshot(t *testing.T) { v4 := NewValue(time.Unix(6, 0).UTC(), 5.0) v5 := NewValue(time.Unix(1, 0).UTC(), 5.0) - c := NewCache(512) + c := NewCache(512, "") if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil { t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error()) } @@ -185,7 +185,7 @@ func TestCache_CacheSnapshot(t *testing.T) { } func TestCache_CacheEmptySnapshot(t *testing.T) { - c := NewCache(512) + c := NewCache(512, "") // Grab snapshot, and ensure it's as expected. snapshot := c.Snapshot() @@ -209,7 +209,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) - c := NewCache(uint64(v1.Size())) + c := NewCache(uint64(v1.Size()), "") if err := c.Write("foo", Values{v0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -265,7 +265,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024) + cache := NewCache(1024, "") loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -288,7 +288,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024) + cache = NewCache(1024, "") loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -347,7 +347,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) { } // Load the cache using the segments. - cache := NewCache(1024) + cache := NewCache(1024, "") loader := NewCacheLoader([]string{f1.Name(), f2.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 76e4bd1ddfe..d658ea3016f 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -25,7 +25,7 @@ func TestCompactor_Snapshot(t *testing.T) { "cpu,host=B#!~#value": []tsm1.Value{v2, v3}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range points1 { if err := c.Write(k, v); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -495,7 +495,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { "cpu,host=A#!~#value": []tsm1.Value{v0}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range writes { if err := c.Write(k, v); err != nil { @@ -543,7 +543,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { "cpu,host=A#!~#value": []tsm1.Value{v0, v1}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range writes { if err := c.Write(k, v); err != nil { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 77d0c1a62e6..67cbd10e192 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -65,7 +65,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine fs := NewFileStore(path) fs.traceLogging = opt.Config.DataLoggingEnabled - cache := NewCache(uint64(opt.Config.CacheMaxMemorySize)) + cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path) c := &Compactor{ Dir: path,