Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: recover node buffer list trie nodes for graceful kill #224

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (c *CacheConfig) triedbConfig(keepFunc pathdb.NotifyKeepFunc) *triedb.Confi
NotifyKeep: keepFunc,
JournalFilePath: c.JournalFilePath,
JournalFile: c.JournalFile,
UseBase: c.UseBase,
}
}
return config
Expand Down
30 changes: 22 additions & 8 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
engine = ethash.NewFullFaker()
)
chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(basic.scheme), gspec, nil, engine, vm.Config{}, nil, nil)
cacheConfig := DefaultCacheConfigWithScheme(basic.scheme)
cacheConfig.UseBase = true
chain, err := NewBlockChain(db, cacheConfig, gspec, nil, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
Expand Down Expand Up @@ -180,11 +182,11 @@ func (basic *snapshotTestBasic) dump() string {
}
fmt.Fprint(buffer, "\n")

//if crash {
// if crash {
krish-nr marked this conversation as resolved.
Show resolved Hide resolved
// fmt.Fprintf(buffer, "\nCRASH\n\n")
//} else {
// } else {
// fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", basic.setHead)
//}
// }
fmt.Fprintf(buffer, "------------------------------\n\n")

fmt.Fprint(buffer, "Expected in leveldb:\n G")
Expand Down Expand Up @@ -228,7 +230,10 @@ func (snaptest *snapshotTest) test(t *testing.T) {

// Restart the chain normally
chain.Stop()
newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)

cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme)
cacheConfig.UseBase = true
newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down Expand Up @@ -313,6 +318,7 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) {
SnapshotLimit: 0,
StateScheme: snaptest.scheme,
}
cacheConfig.UseBase = true
newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
Expand All @@ -321,7 +327,9 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) {
newchain.Stop()

// Restart the chain with enabling the snapshot
newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
config := DefaultCacheConfigWithScheme(snaptest.scheme)
config.UseBase = true
newchain, err = NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down Expand Up @@ -349,7 +357,9 @@ func (snaptest *setHeadSnapshotTest) test(t *testing.T) {
chain.SetHead(snaptest.setHead)
chain.Stop()

newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme)
cacheConfig.UseBase = true
newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down Expand Up @@ -385,6 +395,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
SnapshotLimit: 0,
StateScheme: snaptest.scheme,
}
config.UseBase = true
newchain, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
Expand All @@ -402,6 +413,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
SnapshotWait: false, // Don't wait rebuild
StateScheme: snaptest.scheme,
}
config.UseBase = true
tmp, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
Expand All @@ -411,7 +423,9 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
tmp.triedb.Close()
tmp.stopWithoutSaving()

newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme)
cacheConfig.UseBase = true
newchain, err = NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewFreezer(datadir string, namespace string, readonly, writeTrieNode bool,
// Create the tables.
for name, disableSnappy := range tables {
if name == stateHistoryTrieNodesData && !writeTrieNode {
log.Info("Not create trie node data")
log.Info("Not create trie node data in freezer db")
continue
}
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly)
Expand Down
27 changes: 14 additions & 13 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/txpool/bundlepool"
"math/big"
"runtime"
"sync"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/bundlepool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -133,6 +133,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice)
}

// Assemble the Ethereum object
chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles,
config.DatabaseFreezer)
if err != nil {
return nil, err
}
config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb)
if err != nil {
return nil, err
}

if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 {
if config.SnapshotCache > 0 {
config.TrieCleanCache += config.TrieDirtyCache * 3 / 5
Expand All @@ -152,21 +163,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxBufferSize/1024/1024
config.TrieDirtyCache = pathdb.MaxBufferSize / 1024 / 1024
}
log.Info("Allocated memory caches",
"state_scheme", config.StateScheme,
log.Info("Allocated memory caches", "state_scheme", config.StateScheme,
"trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024,
"trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024)
// Assemble the Ethereum object
chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles,
config.DatabaseFreezer)
if err != nil {
return nil, err
}
config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb)
if err != nil {
return nil, err
}

// Try to recover offline state pruning only in hash-based.
if config.StateScheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (db *Database) Enable(root common.Hash) error {
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval,
db.config.NotifyKeep, nil, false, false)
db.config.NotifyKeep, db.freezer, false, false)
if err != nil {
log.Error("Failed to new trie node buffer", "error", err)
return err
Expand Down
1 change: 0 additions & 1 deletion triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,6 @@ func TestJournal(t *testing.T) {
}
tester.db.Close()
pathConfig := Defaults
pathConfig.UseBase = true
tester.db = New(tester.db.diskdb, pathConfig)

// Verify states including disk layer and all diff on top.
Expand Down
41 changes: 17 additions & 24 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func (db *Database) loadLayers() layer {
if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery &&
db.config.TrieNodeBufferType == NodeBufferList && !db.useBase {
start := time.Now()
if db.freezer == nil {
log.Crit("Use unopened freezer db to recover node buffer list")
}
log.Info("Recover node buffer list from ancient db")

nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0,
Expand Down Expand Up @@ -333,23 +330,13 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp
if stored > id {
return nil, fmt.Errorf("invalid state id: stored %d resolved %d", stored, id)
}

// Resolve nodes cached in node buffer
var encoded []journalNodes
if err := journalBuf.Decode(&encoded); err != nil {
return nil, fmt.Errorf("failed to load disk nodes: %v", err)
}
nodes := make(map[common.Hash]map[string]*trienode.Node)
for _, entry := range encoded {
subset := make(map[string]*trienode.Node)
for _, n := range entry.Nodes {
if len(n.Blob) > 0 {
subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob)
} else {
subset[string(n.Path)] = trienode.NewDeleted()
}
}
nodes[entry.Owner] = subset
}
nodes := flattenTrieNodes(encoded)

if journalTypeForReader == JournalFileType {
var shaSum [32]byte
Expand All @@ -365,11 +352,24 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp

// Calculate the internal state transitions by id difference.
nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval,
db.config.NotifyKeep, nil, false, db.useBase)
db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase)
if err != nil {
log.Error("Failed to new trie node buffer", "error", err)
return nil, err
}

if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase && db.fastRecovery {
recoveredRoot, recoveredStateID, _ := nb.getLatestStatus()
if recoveredRoot != root && recoveredStateID != id {
log.Error("Recovered state root and state id are different from recording ones",
"recovered_root", recoveredRoot, "root", root, "recovered_state_id", recoveredStateID, "id", id)
return nil, errors.New("Unmatched root and state id with recovered")
}

log.Info("Disk layer finishes recovering node buffer list", "latest root hash", recoveredRoot.String(),
"latest state_id", recoveredStateID)
}

base := newDiskLayer(root, id, db, nil, nb)
nb.setClean(base.cleans)
return base, nil
Expand Down Expand Up @@ -486,14 +486,7 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error {
}
// Step three, write all unwritten nodes into the journal
bufferNodes := dl.buffer.getAllNodes()
nodes := make([]journalNodes, 0, len(bufferNodes))
for owner, subset := range bufferNodes {
entry := journalNodes{Owner: owner}
for path, node := range subset {
entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob})
}
nodes = append(nodes, entry)
}
nodes := compressTrieNodes(bufferNodes)
if err := rlp.Encode(journalBuf, nodes); err != nil {
return err
}
Expand Down
Loading