Skip to content

Commit

Permalink
ethdb/pebble: several updates for pebble (ethereum#49)
Browse files Browse the repository at this point in the history
* ethdb/pebble: fix size count in batch

* ethdb/pebble: disable seek compaction

* ethdb/pebble: more fixes
  • Loading branch information
rjl493456442 committed Jan 31, 2023
1 parent 556cfef commit 080eb52
Showing 1 changed file with 87 additions and 82 deletions.
169 changes: 87 additions & 82 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
)

const (
// minCache is the minimum amount of memory in megabytes to allocate to leveldb
// minCache is the minimum amount of memory in megabytes to allocate to pebble
// read and write caching, split half and half.
minCache = 16

// minHandles is the minimum number of files handles to allocate to the open
// database files.
minHandles = 16

// metricsGatheringInterval specifies the interval to retrieve leveldb database
// metricsGatheringInterval specifies the interval to retrieve pebble database
// compaction, io and pause stats to report to the user.
metricsGatheringInterval = 3 * time.Second
)
Expand Down Expand Up @@ -78,7 +78,6 @@ type Database struct {
activeComp int // current number of active compactions
compStartTime time.Time // the start time of the earliest currently-active compaction
compTime int64 // total time spent in compaction in ns
seekCompCount int64 // total number of compactions caused by reads
level0Comp uint32 // total number of level-zero compactions
nonLevel0Comp uint32 // total number of non level-zero compactions
writeDelayStartTime time.Time // the start time of the latest write stall
Expand All @@ -90,16 +89,11 @@ func (d *Database) onCompactionBegin(info pebble.CompactionInfo) {
if d.activeComp == 0 {
d.compStartTime = time.Now()
}
if info.Reason == "read" {
atomic.AddInt64(&d.seekCompCount, 1)
}

for _, level := range info.Input {
if level.Level == 0 {
atomic.AddUint32(&d.level0Comp, 1)
} else {
atomic.AddUint32(&d.nonLevel0Comp, 1)
}
l0 := info.Input[0]
if l0.Level == 0 {
atomic.AddUint32(&d.level0Comp, 1)
} else {
atomic.AddUint32(&d.nonLevel0Comp, 1)
}
d.activeComp++
}
Expand All @@ -110,7 +104,6 @@ func (d *Database) onCompactionEnd(info pebble.CompactionInfo) {
} else if d.activeComp == 0 {
panic("should not happen")
}

d.activeComp--
}

Expand Down Expand Up @@ -154,25 +147,33 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
// Taken from https://github.com/cockroachdb/pebble/blob/master/open.go#L38
maxMemTableSize := 4 << 30 // 4 GB
memTableSize := cache * 1024 * 1024 / 4

// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
if memTableSize > maxMemTableSize {
memTableSize = maxMemTableSize
}

// Open the db and recover any potential corruptions
db, err := pebble.Open(file, &pebble.Options{
opt := &pebble.Options{
// Pebble has a single combined cache area and the write
// buffers are taken from this too. Assign all available
// memory allowance for cache.
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
MaxOpenFiles: handles,

// The size of memory table(as well as the write buffer).
// Note, there may have more than two memory tables in the system.
// MemTableStopWritesThreshold can be configured to avoid the memory abuse.
MemTableSize: memTableSize,

// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
MemTableStopWritesThreshold: memTableLimit * memTableSize,

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
Expand All @@ -186,7 +187,13 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
},
ReadOnly: readonly,
EventListener: eventListener,
})
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1

// Open the db and recover any potential corruptions
db, err := pebble.Open(file, opt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,24 +225,24 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (

// Close stops the metrics collection, flushes any pending data to disk and closes
// all io accesses to the underlying key-value store.
func (db *Database) Close() error {
db.quitLock.Lock()
defer db.quitLock.Unlock()
func (d *Database) Close() error {
d.quitLock.Lock()
defer d.quitLock.Unlock()

if db.quitChan != nil {
if d.quitChan != nil {
errc := make(chan error)
db.quitChan <- errc
d.quitChan <- errc
if err := <-errc; err != nil {
db.log.Error("Metrics collection failed", "err", err)
d.log.Error("Metrics collection failed", "err", err)
}
db.quitChan = nil
d.quitChan = nil
}
return db.db.Close()
return d.db.Close()
}

// Has retrieves if a key is present in the key-value store.
func (db *Database) Has(key []byte) (bool, error) {
_, closer, err := db.db.Get(key)
func (d *Database) Has(key []byte) (bool, error) {
_, closer, err := d.db.Get(key)
if err == pebble.ErrNotFound {
return false, nil
} else if err != nil {
Expand All @@ -246,8 +253,8 @@ func (db *Database) Has(key []byte) (bool, error) {
}

// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
dat, closer, err := db.db.Get(key)
func (d *Database) Get(key []byte) ([]byte, error) {
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
}
Expand All @@ -258,28 +265,28 @@ func (db *Database) Get(key []byte) ([]byte, error) {
}

// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
return db.db.Set(key, value, pebble.NoSync)
func (d *Database) Put(key []byte, value []byte) error {
return d.db.Set(key, value, pebble.NoSync)
}

// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
return db.db.Delete(key, nil)
func (d *Database) Delete(key []byte) error {
return d.db.Delete(key, nil)
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *Database) NewBatch() ethdb.Batch {
func (d *Database) NewBatch() ethdb.Batch {
return &batch{
b: db.db.NewBatch(),
b: d.db.NewBatch(),
}
}

// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
// TODO can't do this with pebble. Batches are allocated in a pool so maybe this doesn't matter?
func (db *Database) NewBatchWithSize(_ int) ethdb.Batch {
func (d *Database) NewBatchWithSize(_ int) ethdb.Batch {
return &batch{
b: db.db.NewBatch(),
b: d.db.NewBatch(),
}
}

Expand Down Expand Up @@ -327,17 +334,17 @@ func (snap *snapshot) Release() {
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
func (db *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap := db.db.NewSnapshot()
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap := d.db.NewSnapshot()
return &snapshot{db: snap}, nil
}

// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
iterRange := bytesPrefixRange(prefix, start)
iter := db.db.NewIter(&pebble.IterOptions{
iter := d.db.NewIter(&pebble.IterOptions{
LowerBound: iterRange.Start,
UpperBound: iterRange.Limit,
})
Expand All @@ -346,7 +353,7 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
}

// Stat returns a particular internal stat of the database.
func (db *Database) Stat(property string) (string, error) {
func (d *Database) Stat(property string) (string, error) {
return "", nil
}

Expand All @@ -357,18 +364,18 @@ func (db *Database) Stat(property string) (string, error) {
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (db *Database) Compact(start []byte, limit []byte) error {
return db.db.Compact(start, limit, false)
func (d *Database) Compact(start []byte, limit []byte) error {
return d.db.Compact(start, limit, false)
}

// Path returns the path to the database directory.
func (db *Database) Path() string {
return db.fn
func (d *Database) Path() string {
return d.fn
}

// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
func (db *Database) meter(refresh time.Duration) {
func (d *Database) meter(refresh time.Duration) {
var errc chan error
timer := time.NewTimer(refresh)
defer timer.Stop()
Expand All @@ -392,14 +399,12 @@ func (db *Database) meter(refresh time.Duration) {
nWrite int64
)

metrics := db.db.Metrics()

compTime := atomic.LoadInt64(&db.compTime)
writeDelayCount := atomic.LoadInt64(&db.writeDelayCount)
writeDelayTime := atomic.LoadInt64(&db.writeDelayTime)
seekCompCount := atomic.LoadInt64(&db.seekCompCount)
nonLevel0CompCount := int64(atomic.LoadUint32(&db.nonLevel0Comp))
level0CompCount := int64(atomic.LoadUint32(&db.level0Comp))
metrics := d.db.Metrics()
compTime := atomic.LoadInt64(&d.compTime)
writeDelayCount := atomic.LoadInt64(&d.writeDelayCount)
writeDelayTime := atomic.LoadInt64(&d.writeDelayTime)
nonLevel0CompCount := int64(atomic.LoadUint32(&d.nonLevel0Comp))
level0CompCount := int64(atomic.LoadUint32(&d.level0Comp))

writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
Expand All @@ -418,41 +423,41 @@ func (db *Database) meter(refresh time.Duration) {
compReads[i%2] = compRead
nWrites[i%2] = nWrite

if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
if d.writeDelayNMeter != nil {
d.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
if d.writeDelayMeter != nil {
d.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
}
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
if d.compTimeMeter != nil {
d.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
if d.compReadMeter != nil {
d.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
if d.compWriteMeter != nil {
d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
}
if db.diskSizeGauge != nil {
db.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage()))
if d.diskSizeGauge != nil {
d.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage()))
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
if d.diskReadMeter != nil {
d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
if d.diskWriteMeter != nil {
d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
}
// See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054
manuallyAllocated := metrics.BlockCache.Size + int64(metrics.MemTable.Size) + int64(metrics.MemTable.ZombieSize)
db.manualMemAllocGauge.Update(manuallyAllocated)
db.memCompGauge.Update(metrics.Flush.Count)
db.nonlevel0CompGauge.Update(nonLevel0CompCount)
db.level0CompGauge.Update(level0CompCount)
db.seekCompGauge.Update(seekCompCount)
d.manualMemAllocGauge.Update(manuallyAllocated)
d.memCompGauge.Update(metrics.Flush.Count)
d.nonlevel0CompGauge.Update(nonLevel0CompCount)
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(metrics.Compact.ReadCount)

// Sleep a bit, then repeat the stats collection
select {
case errc = <-db.quitChan:
case errc = <-d.quitChan:
// Quit requesting, stop hammering the database
case <-timer.C:
timer.Reset(refresh)
Expand All @@ -472,14 +477,14 @@ type batch struct {
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.b.Set(key, value, nil)
b.size += len(value)
b.size += len(key) + len(value)
return nil
}

// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key, nil)
b.size++
b.size += len(key)
return nil
}

Expand Down

0 comments on commit 080eb52

Please sign in to comment.