Skip to content

Commit

Permalink
Revert node DB cache (#3581) (#3674) (#3675)
Browse files Browse the repository at this point in the history
Revert "Prevent frequent commits to the node DB in sentries (#2505)".
This reverts commit 65a9a26.
  • Loading branch information
battlmonstr authored Mar 10, 2022
1 parent 6fd8f98 commit a52e53a
Showing 1 changed file with 38 additions and 218 deletions.
256 changes: 38 additions & 218 deletions p2p/enode/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

"github.com/c2h5oh/datasize"
"github.com/google/btree"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"

Expand Down Expand Up @@ -74,23 +73,9 @@ var zeroIP = make(net.IP, 16)
// DB is the node database, storing previously seen nodes and any collected metadata about
// them for QoS purposes.
type DB struct {
kvCache *btree.BTree
kvCacheLock sync.RWMutex
path string // Remember path for log messages
kv kv.RwDB // Interface to the database itself
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop
}

// DbItem is type of items stored in the kvCache's btrees
type DbItem struct {
key []byte
val []byte
}

func (di *DbItem) Less(than btree.Item) bool {
i := than.(*DbItem)
return bytes.Compare(di.key, i.key) < 0
kv kv.RwDB // Interface to the database itself
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop
}

// OpenDB opens a node database for storing and retrieving infos about known peers in the
Expand All @@ -117,7 +102,6 @@ func newMemoryDB(logger log.Logger) (*DB, error) {
if err != nil {
return nil, err
}
db.kvCache = btree.New(32)
return db, nil
}

Expand Down Expand Up @@ -162,7 +146,7 @@ func newPersistentDB(logger log.Logger, path string) (*DB, error) {
}
return newPersistentDB(logger, path)
}
return &DB{path: path, kvCache: btree.New(32), kv: db, quit: make(chan struct{})}, nil
return &DB{kv: db, quit: make(chan struct{})}, nil
}

// nodeKey returns the database key for a node record.
Expand Down Expand Up @@ -229,54 +213,9 @@ func localItemKey(id ID, field string) []byte {
return key
}

func (db *DB) getFromCache(key []byte) []byte {
db.kvCacheLock.RLock()
defer db.kvCacheLock.RUnlock()
i := db.kvCache.Get(&DbItem{key: key})
if i != nil {
di := i.(*DbItem)
return di.val
}
return nil
}

func (db *DB) setToCache(key, val []byte) {
db.kvCacheLock.Lock()
defer db.kvCacheLock.Unlock()
db.kvCache.ReplaceOrInsert(&DbItem{key: key, val: val})
if db.kvCache.Len() > 16*1024 {
db.commitCache(false /* logit */)
}
}

func (db *DB) searchCache(key []byte) (foundKey, foundVal, nextKey []byte) {
if key == nil {
return nil, nil, nil
}
db.kvCacheLock.RLock()
defer db.kvCacheLock.RUnlock()
db.kvCache.AscendGreaterOrEqual(&DbItem{key: key}, func(i btree.Item) bool {
di := i.(*DbItem)
if foundKey == nil {
foundKey = di.key
foundVal = di.val
return true
}
nextKey = di.key
return false
})
return
}

// fetchInt64 retrieves an integer associated with a particular key.
func (db *DB) fetchInt64(key []byte) int64 {
var val int64
if blob := db.getFromCache(key); blob != nil {
if v, read := binary.Varint(blob); read > 0 {
return v
}
return 0
}
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
blob, errGet := tx.GetOne(kv.Inodes, key)
if errGet != nil {
Expand All @@ -299,17 +238,14 @@ func (db *DB) fetchInt64(key []byte) int64 {
func (db *DB) storeInt64(key []byte, n int64) error {
blob := make([]byte, binary.MaxVarintLen64)
blob = blob[:binary.PutVarint(blob, n)]
db.setToCache(common.CopyBytes(key), blob)
return nil
return db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.Inodes, common.CopyBytes(key), blob)
})
}

// fetchUint64 retrieves an integer associated with a particular key.
func (db *DB) fetchUint64(key []byte) uint64 {
var val uint64
if blob := db.getFromCache(key); blob != nil {
val, _ = binary.Uvarint(blob)
return val
}
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
blob, errGet := tx.GetOne(kv.Inodes, key)
if errGet != nil {
Expand All @@ -329,28 +265,26 @@ func (db *DB) fetchUint64(key []byte) uint64 {
func (db *DB) storeUint64(key []byte, n uint64) error {
blob := make([]byte, binary.MaxVarintLen64)
blob = blob[:binary.PutUvarint(blob, n)]
db.setToCache(common.CopyBytes(key), blob)
return nil
return db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.Inodes, common.CopyBytes(key), blob)
})
}

// Node retrieves a node with a given id from the database.
func (db *DB) Node(id ID) *Node {
var blob []byte
blob = db.getFromCache(nodeKey(id))
if blob == nil {
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
v, errGet := tx.GetOne(kv.Inodes, nodeKey(id))
if errGet != nil {
return errGet
}
if v != nil {
blob = make([]byte, len(v))
copy(blob, v)
}
return nil
}); err != nil {
return nil
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
v, errGet := tx.GetOne(kv.Inodes, nodeKey(id))
if errGet != nil {
return errGet
}
if v != nil {
blob = make([]byte, len(v))
copy(blob, v)
}
return nil
}); err != nil {
return nil
}
if blob == nil {
return nil
Expand All @@ -377,7 +311,11 @@ func (db *DB) UpdateNode(node *Node) error {
if err != nil {
return err
}
db.setToCache(nodeKey(node.ID()), blob)
if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.Inodes, nodeKey(node.ID()), blob)
}); err != nil {
return err
}
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
}

Expand All @@ -397,34 +335,21 @@ func (db *DB) Resolve(n *Node) *Node {

// DeleteNode deletes all information associated with a node.
func (db *DB) DeleteNode(id ID) {
deleteRange(db, nodeKey(id))
deleteRange(db.kv, nodeKey(id))
}

func deleteRange(db *DB, prefix []byte) {
db.kvCacheLock.Lock()
defer db.kvCacheLock.Unlock()
// First delete relevant entries from the cache
db.kvCache.AscendGreaterOrEqual(&DbItem{key: prefix}, func(i btree.Item) bool {
di := i.(*DbItem)
if !bytes.HasPrefix(di.key, prefix) {
return false
}
di.val = nil // Mark for deletion
return true
})
// Now mark all other entries for deletion
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(kv.Inodes)
func deleteRange(db kv.RwDB, prefix []byte) {
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
c, err := tx.RwCursor(kv.Inodes)
if err != nil {
return err
}
for k, _, err := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _, err = c.Next() {
if err != nil {
return err
}
if f := db.kvCache.Get(&DbItem{key: k}); f == nil {
// Only copy key if item is missing in the cache
db.kvCache.ReplaceOrInsert(&DbItem{key: common.CopyBytes(k), val: nil})
if err := c.Delete(k, nil); err != nil {
return nil
}
}
return nil
Expand Down Expand Up @@ -477,8 +402,7 @@ func (db *DB) expireNodes() {
p := []byte(dbNodePrefix)
var prevId ID
var empty = true
ci := cachedIter{c: c, db: db}
for k, v, err := ci.Seek(p); bytes.HasPrefix(k, p); k, v, err = ci.Next() {
for k, v, err := c.Seek(p); bytes.HasPrefix(k, p); k, v, err = c.Next() {
if err != nil {
return err
}
Expand Down Expand Up @@ -513,7 +437,7 @@ func (db *DB) expireNodes() {
log.Warn("nodeDB.expireNodes failed", "err", err)
}
for _, td := range toDelete {
deleteRange(db, td)
deleteRange(db.kv, td)
}
}

Expand Down Expand Up @@ -594,72 +518,6 @@ func (db *DB) storeLocalSeq(id ID, n uint64) {
db.storeUint64(localItemKey(id, dbLocalSeq), n)
}

type cachedIter struct {
c kv.Cursor
db *DB
cKey, cVal []byte
cacheKey, cacheVal, cacheNextKey []byte
}

func (ci *cachedIter) Seek(searchKey []byte) (k, v []byte, err error) {
ci.cKey, ci.cVal, err = ci.c.Seek(searchKey)
if err != nil {
return nil, nil, err
}
ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(searchKey)
return ci.Next()
}

func (ci *cachedIter) Next() (k, v []byte, err error) {
for {
if ci.cKey == nil && ci.cacheKey == nil {
k = nil
v = nil
return
}
if ci.cKey == nil {
k = ci.cacheKey
v = ci.cacheVal
ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey)
if v != nil {
// if v == nil, it is deleted entry and we try the next record
return
}
continue
}
if ci.cacheKey == nil {
k = ci.cKey
v = ci.cVal
ci.cKey, ci.cVal, err = ci.c.Next()
return
}
switch bytes.Compare(ci.cKey, ci.cacheKey) {
case -1:
k = ci.cKey
v = ci.cVal
ci.cKey, ci.cVal, err = ci.c.Next()
return
case 0:
k = ci.cacheKey
v = ci.cacheVal
ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey)
ci.cKey, ci.cVal, err = ci.c.Next()
if v != nil {
// if v == nil, it is deleted entry and we try the next record
return
}
case 1:
k = ci.cacheKey
v = ci.cacheVal
ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey)
if v != nil {
// if v == nil, it is deleted entry and we try the next record
return
}
}
}
}

// QuerySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
Expand All @@ -674,7 +532,6 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
if err != nil {
return err
}
ci := &cachedIter{db: db, c: c}
seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
// Seek to a random entry. The first byte is incremented by a
Expand All @@ -684,7 +541,7 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
rand.Read(id[:])
id[0] = ctr + id[0]%16
var n *Node
for k, v, err := ci.Seek(nodeKey(id)); k != nil && n == nil; k, v, err = ci.Next() {
for k, v, err := c.Seek(nodeKey(id)); k != nil && n == nil; k, v, err = c.Next() {
if err != nil {
return err
}
Expand All @@ -700,12 +557,9 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
db.ensureExpirer()
pongKey := nodeItemKey(n.ID(), n.IP(), dbNodePong)
var lastPongReceived int64
blob := db.getFromCache(pongKey)
if blob == nil {
var errGet error
if blob, errGet = tx.GetOne(kv.Inodes, pongKey); errGet != nil {
return errGet
}
blob, errGet := tx.GetOne(kv.Inodes, pongKey)
if errGet != nil {
return errGet
}
if blob != nil {
if v, read := binary.Varint(blob); read > 0 {
Expand All @@ -729,37 +583,6 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
return nodes
}

func (db *DB) commitCache(logit bool) {
entriesUpdated := 0
entriesDeleted := 0
if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
var err error
db.kvCache.Ascend(func(i btree.Item) bool {
di := i.(*DbItem)
if di.val == nil {
if err = tx.Delete(kv.Inodes, di.key, nil); err != nil {
return false
}
entriesUpdated++
} else {
if err = tx.Put(kv.Inodes, di.key, di.val); err != nil {
return false
}
entriesDeleted++
}
return true
})
return err
}); err != nil {
log.Warn("p2p node database update failed", "path", db.path, "err", err)
} else {
if logit {
log.Info("Successfully update p2p node database", "path", db.path, "updated", entriesUpdated, "deleted", entriesDeleted)
}
db.kvCache.Clear(true)
}
}

// close flushes and closes the database files.
func (db *DB) Close() {
select {
Expand All @@ -771,8 +594,5 @@ func (db *DB) Close() {
return
}
close(db.quit)
db.kvCacheLock.Lock()
defer db.kvCacheLock.Unlock()
db.commitCache(true /* logit */)
db.kv.Close()
}

0 comments on commit a52e53a

Please sign in to comment.