From 666ac768850f4f59f8eac811fdf81443ef4c6bcc Mon Sep 17 00:00:00 2001 From: Jon Seymour Date: Sat, 20 Feb 2016 22:18:57 +1100 Subject: [PATCH] tsm: cache: add cache throughput related statistics. Complementing and extending the changes in #5758. Add 2 level statistics: * snapshotCount * cacheAgeMs Add 2 counter statistics * cachedBytes * WALCompactionTimeMs snapshotCount can be used to measure transient write errors that are causing snapshots to accumulate cacheAgeMs can be used to guage the level of write activity into the cache The differences between cachedBytes stats sampled at different times can be used to calculate cache throughput rates The ratio (cachedBytes-diskBytes)/WALCompactionTimeMs can be used calculate WAL compaction throughput. The ratio of difference between first and last WAL compaction time over the interval length is an estimate of percentage of cache throughput consumed. Signed-off-by: Jon Seymour --- tsdb/engine/tsm1/cache.go | 46 ++++++++++++++++++++++++++++++++------ tsdb/engine/tsm1/engine.go | 13 +++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index e5b4c060883..d016692bf74 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -7,6 +7,7 @@ import ( "os" "sort" "sync" + "time" "github.com/influxdata/influxdb" ) @@ -63,8 +64,17 @@ func (e *entry) deduplicate() { // Statistics gathered by the Cache. const ( - statCacheMemoryBytes = "memBytes" // Size of in-memory cache in bytes - statCacheDiskBytes = "diskBytes" // Size of on-disk snapshots in bytes + // levels - point in time measures + + statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes + statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes + statSnapshots = "snapshotCount" // level: Number of active snapshots. + statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time + + // counters - accumulative measures + + statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots. + statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots ) // Cache maintains an in-memory store of Values for a set of keys. @@ -80,7 +90,8 @@ type Cache struct { snapshots []*Cache snapshotsSize uint64 - statMap *expvar.Map + statMap *expvar.Map + lastSnapshot time.Time // path is only used to track stats path string @@ -89,10 +100,11 @@ type Cache struct { // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. func NewCache(maxSize uint64, path string) *Cache { return &Cache{ - maxSize: maxSize, - store: make(map[string]*entry), - statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}), - path: path, + maxSize: maxSize, + store: make(map[string]*entry), + statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}), + path: path, + lastSnapshot: time.Now(), } } @@ -163,6 +175,7 @@ func (c *Cache) Snapshot() *Cache { c.store = make(map[string]*entry) c.size = 0 + c.lastSnapshot = time.Now() c.snapshots = append(c.snapshots, snapshot) c.snapshotsSize += snapshot.size @@ -176,6 +189,9 @@ func (c *Cache) Snapshot() *Cache { diskSizeStat.Set(int64(c.snapshotsSize)) c.statMap.Set(statCacheDiskBytes, diskSizeStat) + c.statMap.Add(statCachedBytes, int64(snapshot.size)) + c.statMap.Add(statSnapshots, 1) + return snapshot } @@ -205,6 +221,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) { diskSizeStat := new(expvar.Int) diskSizeStat.Set(int64(c.snapshotsSize)) c.statMap.Set(statCacheDiskBytes, diskSizeStat) + + c.statMap.Add(statSnapshots, -1) } // Size returns the number of point-calcuated bytes the cache currently uses. @@ -421,3 +439,17 @@ func (cl *CacheLoader) Load(cache *Cache) error { } return nil } + +// Updates the age statistic +func (c *Cache) UpdateAge() { + c.mu.RLock() + defer c.mu.RUnlock() + ageStat := new(expvar.Int) + ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond)) + c.statMap.Set(statCacheAgeMs, ageStat) +} + +// Updates WAL compaction time statistic +func (c *Cache) UpdateCompactTime(d time.Duration) { + c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond)) +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 67cbd10e192..0c1497de224 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -389,10 +389,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot + + var started *time.Time + + defer func() { + if started != nil { + e.Cache.UpdateCompactTime(time.Now().Sub(*started)) + } + }() + closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { e.mu.Lock() defer e.mu.Unlock() + now := time.Now() + started = &now + if err := e.WAL.CloseSegment(); err != nil { return nil, nil, nil, err } @@ -456,6 +468,7 @@ func (e *Engine) compactCache() { return default: + e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { err := e.WriteSnapshot() if err != nil {