Skip to content

Commit

Permalink
Add stats to tsm1.Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-rushakoff authored and jonseymour committed Feb 29, 2016
1 parent 3c66683 commit 0cff9e0
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 17 deletions.
44 changes: 42 additions & 2 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package tsm1

import (
"expvar"
"fmt"
"log"
"os"
"sort"
"sync"

"github.com/influxdata/influxdb"
)

var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
Expand Down Expand Up @@ -58,6 +61,12 @@ func (e *entry) deduplicate() {
e.needSort = false
}

// Statistics gathered by the Cache.
const (
statCacheMemoryBytes = "memBytes" // Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // Size of on-disk snapshots in bytes
)

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
mu sync.RWMutex
Expand All @@ -70,13 +79,20 @@ type Cache struct {
// they are read only and should never be modified
snapshots []*Cache
snapshotsSize uint64

statMap *expvar.Map

// path is only used to track stats
path string
}

// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64) *Cache {
func NewCache(maxSize uint64, path string) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
path: path,
}
}

Expand All @@ -95,6 +111,11 @@ func (c *Cache) Write(key string, values []Value) error {
c.write(key, values)
c.size = newSize

// Update the memory size stat
sizeStat := new(expvar.Int)
sizeStat.Set(int64(c.size))
c.statMap.Set(statCacheMemoryBytes, sizeStat)

return nil
}

Expand Down Expand Up @@ -122,6 +143,11 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
c.size = newSize
c.mu.Unlock()

// Update the memory size stat
sizeStat := new(expvar.Int)
sizeStat.Set(int64(newSize))
c.statMap.Set(statCacheMemoryBytes, sizeStat)

return nil
}

Expand All @@ -131,7 +157,7 @@ func (c *Cache) Snapshot() *Cache {
c.mu.Lock()
defer c.mu.Unlock()

snapshot := NewCache(c.maxSize)
snapshot := NewCache(c.maxSize, c.path+"!snapshot")
snapshot.store = c.store
snapshot.size = c.size

Expand All @@ -141,6 +167,15 @@ func (c *Cache) Snapshot() *Cache {
c.snapshots = append(c.snapshots, snapshot)
c.snapshotsSize += snapshot.size

// Update stats
memSizeStat := new(expvar.Int)
memSizeStat.Set(0)
c.statMap.Set(statCacheMemoryBytes, memSizeStat)

diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

return snapshot
}

Expand All @@ -165,6 +200,11 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
break
}
}

// Update disk stats
diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)
}

// Size returns the number of point-calcuated bytes the cache currently uses.
Expand Down
22 changes: 11 additions & 11 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestCache_NewCache(t *testing.T) {
c := NewCache(100)
c := NewCache(100, "")
if c == nil {
t.Fatalf("failed to create new cache")
}
Expand All @@ -35,7 +35,7 @@ func TestCache_CacheWrite(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")

if err := c.Write("foo", values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand All @@ -59,7 +59,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand All @@ -85,7 +85,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
v5 := NewValue(time.Unix(5, 0).UTC(), 3.0)
values1 := Values{v3, v4, v5}

c := NewCache(0)
c := NewCache(0, "")

if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCache_CacheValues(t *testing.T) {
v3 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)

c := NewCache(512)
c := NewCache(512, "")
if deduped := c.Values("no such key"); deduped != nil {
t.Fatalf("Values returned for no such key")
}
Expand All @@ -141,7 +141,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)

c := NewCache(512)
c := NewCache(512, "")
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
}

func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512)
c := NewCache(512, "")

// Grab snapshot, and ensure it's as expected.
snapshot := c.Snapshot()
Expand All @@ -209,7 +209,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)

c := NewCache(uint64(v1.Size()))
c := NewCache(uint64(v1.Size()), "")

if err := c.Write("foo", Values{v0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Load the cache using the segment.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand All @@ -288,7 +288,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Reload the cache using the segment.
cache = NewCache(1024)
cache = NewCache(1024, "")
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}

// Load the cache using the segments.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCompactor_Snapshot(t *testing.T) {
"cpu,host=B#!~#value": []tsm1.Value{v2, v3},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")
for k, v := range points1 {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0, v1},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
fs := NewFileStore(path)
fs.traceLogging = opt.Config.DataLoggingEnabled

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)

c := &Compactor{
Dir: path,
Expand Down

0 comments on commit 0cff9e0

Please sign in to comment.