diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b9c4a49160..7273f3fbe21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev - [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup - [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped. +- [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour ### Bugfixes diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 85cb740ac52..574cf50030e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -1,11 +1,15 @@ package tsm1 import ( + "expvar" "fmt" "log" "os" "sort" "sync" + "time" + + "github.com/influxdata/influxdb" ) var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded") @@ -62,6 +66,21 @@ func (e *entry) deduplicate() { e.needSort = false } +// Statistics gathered by the Cache. +const ( + // levels - point in time measures + + statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes + statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes + statSnapshots = "snapshotCount" // level: Number of active snapshots. + statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time + + // counters - accumulative measures + + statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots. + statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots +) + // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { mu sync.RWMutex @@ -74,30 +93,47 @@ type Cache struct { // they are read only and should never be modified snapshots []*Cache snapshotsSize uint64 + + statMap *expvar.Map // nil for snapshots. + lastSnapshot time.Time } // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. -func NewCache(maxSize uint64) *Cache { - return &Cache{ - maxSize: maxSize, - store: make(map[string]*entry), +// Only used for engine caches, never for snapshots +func NewCache(maxSize uint64, path string) *Cache { + c := &Cache{ + maxSize: maxSize, + store: make(map[string]*entry), + statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}), + lastSnapshot: time.Now(), } + c.UpdateAge() + c.UpdateCompactTime(0) + c.updateCachedBytes(0) + c.updateMemSize(0) + c.updateSnapshots() + return c } // Write writes the set of values for the key to the cache. This function is goroutine-safe. // It returns an error if the cache has exceeded its max size. func (c *Cache) Write(key string, values []Value) error { c.mu.Lock() - defer c.mu.Unlock() // Enough room in the cache? - newSize := c.size + uint64(Values(values).Size()) + addedSize := Values(values).Size() + newSize := c.size + uint64(addedSize) if c.maxSize > 0 && newSize+c.snapshotsSize > c.maxSize { + c.mu.Unlock() return ErrCacheMemoryExceeded } c.write(key, values) c.size = newSize + c.mu.Unlock() + + // Update the memory size stat + c.updateMemSize(int64(addedSize)) return nil } @@ -126,6 +162,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { c.size = newSize c.mu.Unlock() + // Update the memory size stat + c.updateMemSize(int64(totalSz)) + return nil } @@ -135,16 +174,22 @@ func (c *Cache) Snapshot() *Cache { c.mu.Lock() defer c.mu.Unlock() - snapshot := NewCache(c.maxSize) - snapshot.store = c.store - snapshot.size = c.size + snapshot := &Cache{ + store: c.store, + size: c.size, + } c.store = make(map[string]*entry) c.size = 0 + c.lastSnapshot = time.Now() c.snapshots = append(c.snapshots, snapshot) c.snapshotsSize += snapshot.size + c.updateMemSize(-int64(snapshot.size)) + c.updateCachedBytes(snapshot.size) + c.updateSnapshots() + return snapshot } @@ -169,6 +214,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) { break } } + + c.updateSnapshots() } // Size returns the number of point-calcuated bytes the cache currently uses. @@ -385,3 +432,39 @@ func (cl *CacheLoader) Load(cache *Cache) error { } return nil } + +// Updates the age statistic +func (c *Cache) UpdateAge() { + c.mu.RLock() + defer c.mu.RUnlock() + ageStat := new(expvar.Int) + ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond)) + c.statMap.Set(statCacheAgeMs, ageStat) +} + +// Updates WAL compaction time statistic +func (c *Cache) UpdateCompactTime(d time.Duration) { + c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond)) +} + +// Update the cachedBytes counter +func (c *Cache) updateCachedBytes(b uint64) { + c.statMap.Add(statCachedBytes, int64(b)) +} + +// Update the memSize level +func (c *Cache) updateMemSize(b int64) { + c.statMap.Add(statCacheMemoryBytes, b) +} + +// Update the snapshotsCount and the diskSize levels +func (c *Cache) updateSnapshots() { + // Update disk stats + diskSizeStat := new(expvar.Int) + diskSizeStat.Set(int64(c.snapshotsSize)) + c.statMap.Set(statCacheDiskBytes, diskSizeStat) + + snapshotsStat := new(expvar.Int) + snapshotsStat.Set(int64(len(c.snapshots))) + c.statMap.Set(statSnapshots, snapshotsStat) +} diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index d74b7a9055b..0f5b48a9da5 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -12,7 +12,7 @@ import ( ) func TestCache_NewCache(t *testing.T) { - c := NewCache(100) + c := NewCache(100, "") if c == nil { t.Fatalf("failed to create new cache") } @@ -35,7 +35,7 @@ func TestCache_CacheWrite(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, "") if err := c.Write("foo", values); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -59,7 +59,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, "") if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -80,7 +80,7 @@ func TestCache_CacheValues(t *testing.T) { v3 := NewValue(time.Unix(1, 0).UTC(), 1.0) v4 := NewValue(time.Unix(4, 0).UTC(), 4.0) - c := NewCache(512) + c := NewCache(512, "") if deduped := c.Values("no such key"); deduped != nil { t.Fatalf("Values returned for no such key") } @@ -106,7 +106,7 @@ func TestCache_CacheSnapshot(t *testing.T) { v4 := NewValue(time.Unix(6, 0).UTC(), 5.0) v5 := NewValue(time.Unix(1, 0).UTC(), 5.0) - c := NewCache(512) + c := NewCache(512, "") if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil { t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error()) } @@ -150,7 +150,7 @@ func TestCache_CacheSnapshot(t *testing.T) { } func TestCache_CacheEmptySnapshot(t *testing.T) { - c := NewCache(512) + c := NewCache(512, "") // Grab snapshot, and ensure it's as expected. snapshot := c.Snapshot() @@ -174,7 +174,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) - c := NewCache(uint64(v1.Size())) + c := NewCache(uint64(v1.Size()), "") if err := c.Write("foo", Values{v0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -230,7 +230,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024) + cache := NewCache(1024, "") loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -253,7 +253,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024) + cache = NewCache(1024, "") loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -312,7 +312,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) { } // Load the cache using the segments. - cache := NewCache(1024) + cache := NewCache(1024, "") loader := NewCacheLoader([]string{f1.Name(), f2.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -362,7 +362,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) { func BenchmarkCacheFloatEntries(b *testing.B) { for i := 0; i < b.N; i++ { - cache := NewCache(10000) + cache := NewCache(10000, "") for j := 0; j < 10000; j++ { v := NewValue(time.Unix(1, 0), float64(j)) cache.Write("test", []Value{v}) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e0c24a65185..3c6cbf3bd57 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -25,7 +25,7 @@ func TestCompactor_Snapshot(t *testing.T) { "cpu,host=B#!~#value": []tsm1.Value{v2, v3}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range points1 { if err := c.Write(k, v); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -497,7 +497,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { "cpu,host=A#!~#value": []tsm1.Value{v0}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range writes { if err := c.Write(k, v); err != nil { @@ -545,7 +545,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { "cpu,host=A#!~#value": []tsm1.Value{v0, v1}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, "") for k, v := range writes { if err := c.Write(k, v); err != nil { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b01e51c361c..2319e062c61 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -70,7 +70,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine fs := NewFileStore(path) fs.traceLogging = opt.Config.DataLoggingEnabled - cache := NewCache(uint64(opt.Config.CacheMaxMemorySize)) + cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path) c := &Compactor{ Dir: path, @@ -410,10 +410,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot + + var started *time.Time + + defer func() { + if started != nil { + e.Cache.UpdateCompactTime(time.Now().Sub(*started)) + } + }() + closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { e.mu.Lock() defer e.mu.Unlock() + now := time.Now() + started = &now + if err := e.WAL.CloseSegment(); err != nil { return nil, nil, nil, err } @@ -477,6 +489,7 @@ func (e *Engine) compactCache() { return default: + e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { err := e.WriteSnapshot() if err != nil { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index a2fd019c21a..2b39ef64dd3 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -1,6 +1,7 @@ package tsm1 import ( + "expvar" "fmt" "log" "os" @@ -10,6 +11,8 @@ import ( "strings" "sync" "time" + + "github.com/influxdata/influxdb" ) type TSMFile interface { @@ -79,6 +82,11 @@ type TSMFile interface { BlockIterator() *BlockIterator } +// Statistics gathered by the FileStore. +const ( + statFileStoreBytes = "diskBytes" +) + type FileStore struct { mu sync.RWMutex lastModified time.Time @@ -90,6 +98,8 @@ type FileStore struct { Logger *log.Logger traceLogging bool + + statMap *expvar.Map } type FileStat struct { @@ -118,7 +128,8 @@ func NewFileStore(dir string) *FileStore { return &FileStore{ dir: dir, lastModified: time.Now(), - Logger: log.New(os.Stderr, "[filestore]", log.LstdFlags), + Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags), + statMap: influxdb.NewStatistics("tsm1_filestore:"+dir, "tsm1_filestore", map[string]string{"path": dir}), } } @@ -154,6 +165,9 @@ func (f *FileStore) NextGeneration() int { func (f *FileStore) Add(files ...TSMFile) { f.mu.Lock() defer f.mu.Unlock() + for _, file := range files { + f.statMap.Add(statFileStoreBytes, int64(file.Size())) + } f.files = append(f.files, files...) sort.Sort(tsmReaders(f.files)) } @@ -175,6 +189,9 @@ func (f *FileStore) Remove(paths ...string) { if keep { active = append(active, file) + } else { + // Removing the file, remove the file size from the total file store bytes + f.statMap.Add(statFileStoreBytes, -int64(file.Size())) } } f.files = active @@ -263,6 +280,11 @@ func (f *FileStore) Open() error { return fmt.Errorf("error opening file %s: %v", fn, err) } + // Accumulate file store size stat + if fi, err := file.Stat(); err == nil { + f.statMap.Add(statFileStoreBytes, fi.Size()) + } + go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReaderWithOptions(TSMReaderOptions{ @@ -412,6 +434,15 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { f.files = active sort.Sort(tsmReaders(f.files)) + // Recalculate the disk size stat + var totalSize int64 + for _, file := range f.files { + totalSize += int64(file.Size()) + } + sizeStat := new(expvar.Int) + sizeStat.Set(totalSize) + f.statMap.Set(statFileStoreBytes, sizeStat) + return nil } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index fcde1c42afc..755fd395e4c 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -3,6 +3,7 @@ package tsm1 import ( "encoding/binary" "errors" + "expvar" "fmt" "io" "log" @@ -16,6 +17,7 @@ import ( "time" "github.com/golang/snappy" + "github.com/influxdata/influxdb" ) const ( @@ -54,6 +56,12 @@ const ( var ErrWALClosed = fmt.Errorf("WAL closed") +// Statistics gathered by the WAL. +const ( + statWALOldBytes = "oldSegmentsDiskBytes" + statWALCurrentBytes = "currentSegmentDiskBytes" +) + type WAL struct { mu sync.RWMutex lastWriteTime time.Time @@ -76,6 +84,8 @@ type WAL struct { // LoggingEnabled specifies if detailed logs should be output LoggingEnabled bool + + statMap *expvar.Map } func NewWAL(path string) *WAL { @@ -87,6 +97,8 @@ func NewWAL(path string) *WAL { SegmentSize: DefaultSegmentSize, logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags), closing: make(chan struct{}), + + statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}), } } @@ -130,12 +142,26 @@ func (l *WAL) Open() error { if stat.Size() == 0 { os.Remove(lastSegment) + segments = segments[:len(segments)-1] } if err := l.newSegmentFile(); err != nil { return err } } + var totalOldDiskSize int64 + for _, seg := range segments { + stat, err := os.Stat(seg) + if err != nil { + return err + } + + totalOldDiskSize += stat.Size() + } + sizeStat := new(expvar.Int) + sizeStat.Set(totalOldDiskSize) + l.statMap.Set(statWALOldBytes, sizeStat) + l.closing = make(chan struct{}) l.lastWriteTime = time.Now() @@ -196,6 +222,26 @@ func (l *WAL) Remove(files []string) error { for _, fn := range files { os.RemoveAll(fn) } + + // Refresh the on-disk size stats + segments, err := segmentFileNames(l.path) + if err != nil { + return err + } + + var totalOldDiskSize int64 + for _, seg := range segments { + stat, err := os.Stat(seg) + if err != nil { + return err + } + + totalOldDiskSize += stat.Size() + } + sizeStat := new(expvar.Int) + sizeStat.Set(totalOldDiskSize) + l.statMap.Set(statWALOldBytes, sizeStat) + return nil } @@ -240,6 +286,11 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { return -1, fmt.Errorf("error writing WAL entry: %v", err) } + // Update stats for current segment size + curSize := new(expvar.Int) + curSize.Set(int64(l.currentSegmentWriter.size)) + l.statMap.Set(statWALCurrentBytes, curSize) + l.lastWriteTime = time.Now() return l.currentSegmentID, l.currentSegmentWriter.sync() @@ -324,6 +375,7 @@ func (l *WAL) newSegmentFile() error { if err := l.currentSegmentWriter.close(); err != nil { return err } + l.statMap.Add(statWALOldBytes, int64(l.currentSegmentWriter.size)) } fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension)) @@ -333,6 +385,11 @@ func (l *WAL) newSegmentFile() error { } l.currentSegmentWriter = NewWALSegmentWriter(fd) + // Reset the current segment size stat + curSize := new(expvar.Int) + curSize.Set(0) + l.statMap.Set(statWALCurrentBytes, curSize) + return nil }