Skip to content

Commit

Permalink
tsm: cache: add cache throughput related statistics.
Browse files Browse the repository at this point in the history
Complementing and extending the changes in influxdata#5758.

Add 2 level statistics:

  * snapshotCount
  * cacheAgeMs

Add 2 counter statistics

  * cachedBytes
  * WALCompactionTimeMs

snapshotCount can be used to measure transient write errors that are causing snapshots to accumulate

cacheAgeMs can be used to guage the level of write activity into the cache

The differences between cachedBytes stats sampled at different times can be used to calculate cache throughput rates

The ratio (cachedBytes-diskBytes)/WALCompactionTimeMs can be used calculate WAL compaction throughput.

The ratio of difference between first and last WAL compaction time over the interval
length is an estimate of percentage of cache throughput consumed.

Signed-off-by: Jon Seymour <jon@wildducktheories.com>
  • Loading branch information
jonseymour committed Feb 29, 2016
1 parent 0cff9e0 commit 666ac76
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
46 changes: 39 additions & 7 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"sort"
"sync"
"time"

"github.com/influxdata/influxdb"
)
Expand Down Expand Up @@ -63,8 +64,17 @@ func (e *entry) deduplicate() {

// Statistics gathered by the Cache.
const (
statCacheMemoryBytes = "memBytes" // Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // Size of on-disk snapshots in bytes
// levels - point in time measures

statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time

// counters - accumulative measures

statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
)

// Cache maintains an in-memory store of Values for a set of keys.
Expand All @@ -80,7 +90,8 @@ type Cache struct {
snapshots []*Cache
snapshotsSize uint64

statMap *expvar.Map
statMap *expvar.Map
lastSnapshot time.Time

// path is only used to track stats
path string
Expand All @@ -89,10 +100,11 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64, path string) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
path: path,
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
path: path,
lastSnapshot: time.Now(),
}
}

Expand Down Expand Up @@ -163,6 +175,7 @@ func (c *Cache) Snapshot() *Cache {

c.store = make(map[string]*entry)
c.size = 0
c.lastSnapshot = time.Now()

c.snapshots = append(c.snapshots, snapshot)
c.snapshotsSize += snapshot.size
Expand All @@ -176,6 +189,9 @@ func (c *Cache) Snapshot() *Cache {
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

c.statMap.Add(statCachedBytes, int64(snapshot.size))
c.statMap.Add(statSnapshots, 1)

return snapshot
}

Expand Down Expand Up @@ -205,6 +221,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

c.statMap.Add(statSnapshots, -1)
}

// Size returns the number of point-calcuated bytes the cache currently uses.
Expand Down Expand Up @@ -421,3 +439,17 @@ func (cl *CacheLoader) Load(cache *Cache) error {
}
return nil
}

// Updates the age statistic
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := new(expvar.Int)
ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond))
c.statMap.Set(statCacheAgeMs, ageStat)
}

// Updates WAL compaction time statistic
func (c *Cache) UpdateCompactTime(d time.Duration) {
c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond))
}
13 changes: 13 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen
func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot

var started *time.Time

defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Now().Sub(*started))
}
}()

closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
e.mu.Lock()
defer e.mu.Unlock()

now := time.Now()
started = &now

if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -456,6 +468,7 @@ func (e *Engine) compactCache() {
return

default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
err := e.WriteSnapshot()
if err != nil {
Expand Down

0 comments on commit 666ac76

Please sign in to comment.