From 97c8a61f2aa5696e6ba4ccc6984dfe37208baefd Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 29 Nov 2018 12:36:54 +0100 Subject: [PATCH 1/4] swarm/shed: add metrics to each shed db --- swarm/shed/db.go | 204 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 202 insertions(+), 2 deletions(-) diff --git a/swarm/shed/db.go b/swarm/shed/db.go index e128b8cbc866..0e4a9a533adc 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -23,14 +23,24 @@ package shed import ( + "errors" + "fmt" + "strconv" + "strings" + "sync" + "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" ) -// The limit for LevelDB OpenFilesCacheCapacity. -const openFileLimit = 128 +const ( + openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity. + writePauseWarningThrottler = 1 * time.Minute +) // DB provides abstractions over LevelDB in order to // implement complex structures using fields and ordered indexes. @@ -38,6 +48,17 @@ const openFileLimit = 128 // information about naming and types. type DB struct { ldb *leveldb.DB + + compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction + compReadMeter metrics.Meter // Meter for measuring the data read during compaction + compWriteMeter metrics.Meter // Meter for measuring the data written during compaction + writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction + writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction + diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read + diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written + + quitLock sync.Mutex // Mutex protecting the quit channel access + quitChan chan chan error // Quit channel to stop the metrics collection before closing the database } // NewDB constructs a new DB and validates the schema @@ -66,6 +87,9 @@ func NewDB(path string) (db *DB, err error) { return nil, err } } + + db.Meter("shed/db") + return db, nil } @@ -128,3 +152,179 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { func (db *DB) Close() (err error) { return db.ldb.Close() } + +// Meter configures the database metrics collectors +func (db *DB) Meter(prefix string) { + // Initialize all the metrics collector at the requested prefix + db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) + db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) + db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) + db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) + db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) + db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) + db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) + + // Create a quit channel for the periodic collector and run it + db.quitLock.Lock() + db.quitChan = make(chan chan error) + db.quitLock.Unlock() + + go db.meter(10 * time.Second) +} + +func (db *DB) meter(refresh time.Duration) { + // Create the counters to store current and previous compaction values + compactions := make([][]float64, 2) + for i := 0; i < 2; i++ { + compactions[i] = make([]float64, 3) + } + // Create storage for iostats. + var iostats [2]float64 + + // Create storage and warning log tracer for write delay. + var ( + delaystats [2]int64 + lastWritePaused time.Time + ) + + var ( + errc chan error + merr error + ) + + // Iterate ad infinitum and collect the stats + for i := 1; errc == nil && merr == nil; i++ { + // Retrieve the database stats + stats, err := db.ldb.GetProperty("leveldb.stats") + if err != nil { + log.Error("Failed to read database stats", "err", err) + merr = err + continue + } + // Find the compaction table, skip the header + lines := strings.Split(stats, "\n") + for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { + lines = lines[1:] + } + if len(lines) <= 3 { + log.Error("Compaction table not found") + merr = errors.New("compaction table not found") + continue + } + lines = lines[3:] + + // Iterate over all the table rows, and accumulate the entries + for j := 0; j < len(compactions[i%2]); j++ { + compactions[i%2][j] = 0 + } + for _, line := range lines { + parts := strings.Split(line, "|") + if len(parts) != 6 { + break + } + for idx, counter := range parts[3:] { + value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) + if err != nil { + log.Error("Compaction entry parsing failed", "err", err) + merr = err + continue + } + compactions[i%2][idx] += value + } + } + // Update all the requested meters + if db.compTimeMeter != nil { + db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) + } + if db.compReadMeter != nil { + db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) + } + if db.compWriteMeter != nil { + db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + } + + // Retrieve the write delay statistic + writedelay, err := db.ldb.GetProperty("leveldb.writedelay") + if err != nil { + log.Error("Failed to read database write delay statistic", "err", err) + merr = err + continue + } + var ( + delayN int64 + delayDuration string + duration time.Duration + paused bool + ) + if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { + log.Error("Write delay statistic not found") + merr = err + continue + } + duration, err = time.ParseDuration(delayDuration) + if err != nil { + log.Error("Failed to parse delay duration", "err", err) + merr = err + continue + } + if db.writeDelayNMeter != nil { + db.writeDelayNMeter.Mark(delayN - delaystats[0]) + } + if db.writeDelayMeter != nil { + db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) + } + // If a warning that db is performing compaction has been displayed, any subsequent + // warnings will be withheld for one minute not to overwhelm the user. + if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && + time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) { + log.Warn("Database compacting, degraded performance") + lastWritePaused = time.Now() + } + delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() + + // Retrieve the database iostats. + ioStats, err := db.ldb.GetProperty("leveldb.iostats") + if err != nil { + log.Error("Failed to read database iostats", "err", err) + merr = err + continue + } + var nRead, nWrite float64 + parts := strings.Split(ioStats, " ") + if len(parts) < 2 { + log.Error("Bad syntax of ioStats", "ioStats", ioStats) + merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) + continue + } + if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { + log.Error("Bad syntax of read entry", "entry", parts[0]) + merr = err + continue + } + if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { + log.Error("Bad syntax of write entry", "entry", parts[1]) + merr = err + continue + } + if db.diskReadMeter != nil { + db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) + } + if db.diskWriteMeter != nil { + db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) + } + iostats[0], iostats[1] = nRead, nWrite + + // Sleep a bit, then repeat the stats collection + select { + case errc = <-db.quitChan: + // Quit requesting, stop hammering the database + case <-time.After(refresh): + // Timeout, gather a new set of stats + } + } + + if errc == nil { + errc = <-db.quitChan + } + errc <- merr +} From 9a90c999ac25e692898994eebb625824ee504c7b Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 29 Nov 2018 12:42:13 +0100 Subject: [PATCH 2/4] swarm/shed: push metrics prefix up --- swarm/shed/db.go | 5 +++-- swarm/shed/db_test.go | 6 +++--- swarm/shed/example_store_test.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/swarm/shed/db.go b/swarm/shed/db.go index 0e4a9a533adc..78c62fdc8441 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -63,7 +63,8 @@ type DB struct { // NewDB constructs a new DB and validates the schema // if it exists in database on the given path. -func NewDB(path string) (db *DB, err error) { +// prefix is used for metrics collection for the given DB. +func NewDB(path string, prefix string) (db *DB, err error) { ldb, err := leveldb.OpenFile(path, &opt.Options{ OpenFilesCacheCapacity: openFileLimit, }) @@ -88,7 +89,7 @@ func NewDB(path string) (db *DB, err error) { } } - db.Meter("shed/db") + db.Meter(prefix) return db, nil } diff --git a/swarm/shed/db_test.go b/swarm/shed/db_test.go index 45325beeb812..65fdac4a6186 100644 --- a/swarm/shed/db_test.go +++ b/swarm/shed/db_test.go @@ -55,7 +55,7 @@ func TestDB_persistence(t *testing.T) { } defer os.RemoveAll(dir) - db, err := NewDB(dir) + db, err := NewDB(dir, "") if err != nil { t.Fatal(err) } @@ -73,7 +73,7 @@ func TestDB_persistence(t *testing.T) { t.Fatal(err) } - db2, err := NewDB(dir) + db2, err := NewDB(dir, "") if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) { t.Fatal(err) } cleanupFunc = func() { os.RemoveAll(dir) } - db, err = NewDB(dir) + db, err = NewDB(dir, "") if err != nil { cleanupFunc() t.Fatal(err) diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go index 2ed0be1413b8..908a1e446d64 100644 --- a/swarm/shed/example_store_test.go +++ b/swarm/shed/example_store_test.go @@ -52,7 +52,7 @@ type Store struct { // and possible conflicts with schema from existing database is checked // automatically. func New(path string) (s *Store, err error) { - db, err := shed.NewDB(path) + db, err := shed.NewDB(path, "") if err != nil { return nil, err } From 8ba97a02b25e8930b4dae76331acdc8edee6461b Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 29 Nov 2018 15:39:13 +0100 Subject: [PATCH 3/4] swarm/shed: rename prefix to metricsPrefix --- swarm/shed/db.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/swarm/shed/db.go b/swarm/shed/db.go index 78c62fdc8441..d1146198ab54 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -63,8 +63,8 @@ type DB struct { // NewDB constructs a new DB and validates the schema // if it exists in database on the given path. -// prefix is used for metrics collection for the given DB. -func NewDB(path string, prefix string) (db *DB, err error) { +// metricsPrefix is used for metrics collection for the given DB. +func NewDB(path string, metricsPrefix string) (db *DB, err error) { ldb, err := leveldb.OpenFile(path, &opt.Options{ OpenFilesCacheCapacity: openFileLimit, }) @@ -89,7 +89,7 @@ func NewDB(path string, prefix string) (db *DB, err error) { } } - db.Meter(prefix) + db.Meter(metricsPrefix) return db, nil } @@ -151,6 +151,7 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { // Close closes LevelDB database. func (db *DB) Close() (err error) { + close(db.quitChan) return db.ldb.Close() } From 3934490e129cceeecc80267327c423b0749e8f60 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 3 Dec 2018 14:24:20 +0100 Subject: [PATCH 4/4] swarm/shed: unexport Meter, remove Mutex for quit channel --- swarm/shed/db.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/swarm/shed/db.go b/swarm/shed/db.go index d1146198ab54..7377e12d2d35 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -27,7 +27,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/metrics" @@ -57,7 +56,6 @@ type DB struct { diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written - quitLock sync.Mutex // Mutex protecting the quit channel access quitChan chan chan error // Quit channel to stop the metrics collection before closing the database } @@ -89,7 +87,13 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) { } } - db.Meter(metricsPrefix) + // Configure meters for DB + db.configure(metricsPrefix) + + // Create a quit channel for the periodic metrics collector and run it + db.quitChan = make(chan chan error) + + go db.meter(10 * time.Second) return db, nil } @@ -155,8 +159,8 @@ func (db *DB) Close() (err error) { return db.ldb.Close() } -// Meter configures the database metrics collectors -func (db *DB) Meter(prefix string) { +// Configure configures the database metrics collectors +func (db *DB) configure(prefix string) { // Initialize all the metrics collector at the requested prefix db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) @@ -165,13 +169,6 @@ func (db *DB) Meter(prefix string) { db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) - - // Create a quit channel for the periodic collector and run it - db.quitLock.Lock() - db.quitChan = make(chan chan error) - db.quitLock.Unlock() - - go db.meter(10 * time.Second) } func (db *DB) meter(refresh time.Duration) {