From c0c639650998e407e58d7267749a576e885506d9 Mon Sep 17 00:00:00 2001 From: user Date: Fri, 25 Mar 2022 15:22:21 +0800 Subject: [PATCH 01/14] fix pruner block tool bug, add some check logic --- cmd/geth/snapshot.go | 3 ++- core/headerchain.go | 3 +++ core/rawdb/chain_iterator.go | 5 ++++- eth/state_accessor.go | 3 +++ internal/ethapi/api.go | 6 ++++++ 5 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 20920a0f94bf..c00334ee9e7c 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -278,7 +278,8 @@ func pruneBlock(ctx *cli.Context) error { var newAncientPath string oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name) if !filepath.IsAbs(oldAncientPath) { - oldAncientPath = stack.ResolvePath(oldAncientPath) + // force absolute paths, which often fail due to the splicing of relative paths + return errors.New("datadir.ancient not abs path") } path, _ := filepath.Split(oldAncientPath) diff --git a/core/headerchain.go b/core/headerchain.go index fe4770a4696a..3e50c1eb07d1 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -250,6 +250,9 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWrit headHeader = hc.GetHeader(headHash, headNumber) ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { + if frozen, _ := hc.chainDb.Ancients(); frozen == headNumber { + break + } rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber) headHash = headHeader.ParentHash headNumber = headHeader.Number.Uint64() - 1 diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 22d5188e91c6..883e17b782f4 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -95,7 +95,10 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool number uint64 rlp rlp.RawValue } - if to == from { + if offset := db.AncientOffSet(); offset > from { + from = offset + } + if to <= from { return nil } threads := to - from diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 24a0e776f6b3..156e3f1dafa1 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -74,6 +74,9 @@ func (eth *Ethereum) stateAtBlock(block *types.Block, reexec uint64, base *state // The optional base statedb is given, mark the start point as parent block statedb, database, report = base, base.Database(), false current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if current == nil { + return nil, fmt.Errorf("missing parent block %v %d", block.ParentHash(), block.NumberU64()-1) + } } else { // Otherwise try to reexec blocks until we find a state or reach our limit current = block diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 091e9e7e82ad..7a3f0ad8bd10 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1774,10 +1774,16 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceiptsByBlockNumber(ctx conte if err != nil { return nil, err } + if receipts == nil { + return nil, fmt.Errorf("block %d receipts not found", blockNumber) + } block, err := s.b.BlockByHash(ctx, blockHash) if err != nil { return nil, err } + if block == nil { + return nil, fmt.Errorf("block %d not found", blockNumber) + } txs := block.Transactions() if len(txs) != len(receipts) { return nil, fmt.Errorf("txs length doesn't equal to receipts' length") From 91d7cd5fd25bda42931cd48ad5d538361f417cb0 Mon Sep 17 00:00:00 2001 From: Leon <316032931@qq.com> Date: Mon, 28 Mar 2022 10:12:38 +0800 Subject: [PATCH 02/14] [R4R]Prefetch state data on mining process (#803) * perf(miner):add mining prefetcher * fix ineffassign * fix comments * fix comments * fix comments: add AsMessagePrefetch to skip nonce check * fix comment:refactor check order of method Forward * fix comments:rename variables * fix comments: rename * rename * fix comments: refactor * update --- core/state_prefetcher.go | 58 ++++++++++++++++++++++++++++ core/types.go | 2 + core/types/transaction.go | 62 ++++++++++++++++++++++++++++++ core/types/transaction_test.go | 69 ++++++++++++++++++++++++++++++++++ miner/worker.go | 12 +++++- 5 files changed, 202 insertions(+), 1 deletion(-) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index d559a03a0f3b..0e18bdc8bbf7 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -27,6 +27,7 @@ import ( ) const prefetchThread = 2 +const checkInterval = 10 // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state @@ -88,6 +89,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c } } +// PrefetchMining processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage +func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) { + var signer = types.MakeSigner(p.config, header.Number) + + txCh := make(chan *types.Transaction, 2*prefetchThread) + for i := 0; i < prefetchThread; i++ { + go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { + idx := 0 + newStatedb := statedb.Copy() + gaspool := new(GasPool).AddGas(gasLimit) + blockContext := NewEVMBlockContext(header, p.bc, nil) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for { + select { + case tx := <-startCh: + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessageNoNonceCheck(signer) + if err != nil { + return // Also invalid block, bail out + } + idx++ + newStatedb.Prepare(tx.Hash(), header.Hash(), idx) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + gaspool = new(GasPool).AddGas(gasLimit) + case <-stopCh: + return + } + } + }(txCh, interruptCh) + } + go func(txset *types.TransactionsByPriceAndNonce) { + count := 0 + for { + tx := txset.Peek() + if tx == nil { + return + } + select { + case <-interruptCh: + return + default: + } + if count++; count%checkInterval == 0 { + if *txCurr == nil { + return + } + txset.Forward(*txCurr) + } + txCh <- tx + txset.Shift() + } + }(txs) +} + // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. diff --git a/core/types.go b/core/types.go index 5ed4817e688e..61722aea74cd 100644 --- a/core/types.go +++ b/core/types.go @@ -40,6 +40,8 @@ type Prefetcher interface { // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. + PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/transaction.go b/core/types/transaction.go index 74c011544b87..e95cec25a6f4 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa } } +// Copy copys a new TransactionsPriceAndNonce with the same *transaction +func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce { + heads := make([]*Transaction, len(t.heads)) + copy(heads, t.heads) + txs := make(map[common.Address]Transactions, len(t.txs)) + for acc, txsTmp := range t.txs { + txs[acc] = txsTmp + } + return &TransactionsByPriceAndNonce{ + heads: heads, + txs: txs, + signer: t.signer, + } +} + // Peek returns the next transaction by price. func (t *TransactionsByPriceAndNonce) Peek() *Transaction { if len(t.heads) == 0 { @@ -488,6 +503,44 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int { return len(t.heads) } +//Forward moves current transaction to be the one which is one index after tx +func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { + if tx == nil { + t.heads = t.heads[0:0] + return + } + //check whether target tx exists in t.heads + for _, head := range t.heads { + if tx == head { + //shift t to the position one after tx + txTmp := t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + //get the sender address of tx + acc, _ := Sender(t.signer, tx) + //check whether target tx exists in t.txs + if txs, ok := t.txs[acc]; ok { + for _, txTmp := range txs { + //found the same pointer in t.txs as tx and then shift t to the position one after tx + if txTmp == tx { + txTmp = t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + } +} + // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. @@ -535,6 +588,15 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) { return msg, err } +// AsMessageNoNonceCheck returns the transaction with checkNonce field set to be false. +func (tx *Transaction) AsMessageNoNonceCheck(s Signer) (Message, error) { + msg, err := tx.AsMessage(s) + if err == nil { + msg.checkNonce = false + } + return msg, err +} + func (m Message) From() common.Address { return m.from } func (m Message) To() *common.Address { return m.to } func (m Message) GasPrice() *big.Int { return m.gasPrice } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 3cece9c2358d..c81b7b86478c 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) { } } +func TestTransactionForward(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address]Transactions{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + + tx.time = time.Unix(0, int64(len(keys)-start)) + tx2.time = time.Unix(1, int64(len(keys)-start)) + + groups[addr] = append(groups[addr], tx) + groups[addr] = append(groups[addr], tx2) + + } + // Sort the transactions + txset := NewTransactionsByPriceAndNonce(signer, groups) + txsetCpy := txset.Copy() + + txs := Transactions{} + for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() { + txs = append(txs, tx) + txsetCpy.Shift() + } + + tmp := txset.Copy() + for j := 0; j < 10; j++ { + txset = tmp.Copy() + txsetCpy = tmp.Copy() + i := 0 + for ; i < j; i++ { + txset.Shift() + } + tx := txset.Peek() + txsetCpy.Forward(tx) + txCpy := txsetCpy.Peek() + if txCpy == nil { + if tx == nil { + continue + } + txset.Shift() + if txset.Peek() != nil { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } else { + continue + } + } + txset.Shift() + for ; i < len(txs)-1; i++ { + tx = txset.Peek() + txCpy = txsetCpy.Peek() + if txCpy != tx { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } + txsetCpy.Shift() + txset.Shift() + } + + } +} + // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() diff --git a/miner/worker.go b/miner/worker.go index 28ef170e406c..2224dc007128 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -130,6 +130,7 @@ type intervalAdjust struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { + prefetcher core.Prefetcher config *Config chainConfig *params.ChainConfig engine consensus.Engine @@ -196,6 +197,7 @@ type worker struct { func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { worker := &worker{ + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), config: config, chainConfig: chainConfig, engine: engine, @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) + interruptCh := make(chan struct{}) + defer close(interruptCh) + tx := &types.Transaction{} + txCurr := &tx + //prefetch txs from all pending txs + txsPrefetch := txs.Copy() + w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) + LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -814,7 +824,7 @@ LOOP: } } // Retrieve the next transaction and abort if all done - tx := txs.Peek() + tx = txs.Peek() if tx == nil { break } From 58f3b2cd8419d63390a644844be4f0838db4fafc Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Mon, 28 Mar 2022 10:51:53 +0800 Subject: [PATCH 03/14] fix code of difflayer not assgin before return (#808) --- core/state/statedb.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 5ea84f403292..c523de6a92b7 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1333,19 +1333,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er }() } - if s.snap != nil { - for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { - if obj.code != nil && obj.dirtyCode { - diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ - Hash: common.BytesToHash(obj.CodeHash()), - Code: obj.code, - }) - } - } - } - } - for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object @@ -1422,6 +1409,12 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if obj.code != nil && obj.dirtyCode { rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false + if s.snap != nil { + diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ + Hash: common.BytesToHash(obj.CodeHash()), + Code: obj.code, + }) + } if codeWriter.ValueSize() > ethdb.IdealBatchSize { if err := codeWriter.Write(); err != nil { return err From 5f1aabeb38a21bff9ffe25e1dd138b2b9d335007 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Mon, 28 Mar 2022 10:52:05 +0800 Subject: [PATCH 04/14] fix race condition on preimage (#797) --- trie/database.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/trie/database.go b/trie/database.go index b6a3154d483d..649af6dbf9fe 100644 --- a/trie/database.go +++ b/trie/database.go @@ -722,17 +722,18 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H batch := db.diskdb.NewBatch() // Move all of the accumulated preimages into a write batch + db.lock.RLock() if db.preimages != nil { rawdb.WritePreimages(batch, db.preimages) // Since we're going to replay trie node writes into the clean cache, flush out // any batched pre-images before continuing. if err := batch.Write(); err != nil { + db.lock.RUnlock() return err } batch.Reset() } // Move the trie itself into the batch, flushing if enough data is accumulated - db.lock.RLock() nodes, storage := len(db.dirties), db.dirtiesSize db.lock.RUnlock() From 4ff96978cbbcea9e49c98dccb1fdc7199cc85ee1 Mon Sep 17 00:00:00 2001 From: flywukong <2229306838@qq.com> Date: Mon, 28 Mar 2022 11:59:41 +0800 Subject: [PATCH 05/14] [R4R]add sharedStorage for prefetching to L1 (#792) * add sharedStorage for prefetching to L1 * remote originStorage in stateObjects * fix core * fix bug of sync map * remove read lock when get & set keys * statedb copy use CopyWithSharedStorage * reduce lock access * fix comment * avoid sharedPool effects on other modules * remove tryPreload * fix comment * fix var name * fix lint * fix L1 miss data && data condition * fix comment --- core/blockchain.go | 2 +- core/state/shared_pool.go | 39 +++++++++++++++ core/state/state_object.go | 56 +++++++++++++++++----- core/state/statedb.go | 98 +++++++++----------------------------- core/state_prefetcher.go | 1 + core/state_processor.go | 1 - 6 files changed, 108 insertions(+), 89 deletions(-) create mode 100644 core/state/shared_pool.go diff --git a/core/blockchain.go b/core/blockchain.go index 6c87ffc70889..fbf9af9db786 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2101,7 +2101,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps) if err != nil { return it.index, err } diff --git a/core/state/shared_pool.go b/core/state/shared_pool.go new file mode 100644 index 000000000000..ba96c2c27d98 --- /dev/null +++ b/core/state/shared_pool.go @@ -0,0 +1,39 @@ +package state + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" +) + +// sharedPool is used to store maps of originStorage of stateObjects +type StoragePool struct { + sync.RWMutex + sharedMap map[common.Address]*sync.Map +} + +func NewStoragePool() *StoragePool { + sharedMap := make(map[common.Address]*sync.Map) + return &StoragePool{ + sync.RWMutex{}, + sharedMap, + } +} + +// getStorage Check whether the storage exist in pool, +// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState() +func (s *StoragePool) getStorage(address common.Address) *sync.Map { + s.RLock() + storageMap, ok := s.sharedMap[address] + s.RUnlock() + if !ok { + s.Lock() + defer s.Unlock() + if storageMap, ok = s.sharedMap[address]; !ok { + m := new(sync.Map) + s.sharedMap[address] = m + return m + } + } + return storageMap +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 298f4305baae..c5212e91cc24 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -79,7 +80,9 @@ type StateObject struct { trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded - originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction + sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction + originStorage Storage + pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block dirtyStorage Storage // Storage entries that have been modified in the current transaction execution fakeStorage Storage // Fake storage which constructed by caller for debugging purpose. @@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject { if data.Root == (common.Hash{}) { data.Root = emptyRoot } + var storageMap *sync.Map + // Check whether the storage exist in pool, new originStorage if not exist + if db != nil && db.storagePool != nil { + storageMap = db.GetStorage(address) + } + return &StateObject{ - db: db, - address: address, - addrHash: crypto.Keccak256Hash(address[:]), - data: data, - originStorage: make(Storage), - pendingStorage: make(Storage), - dirtyStorage: make(Storage), + db: db, + address: address, + addrHash: crypto.Keccak256Hash(address[:]), + data: data, + sharedOriginStorage: storageMap, + originStorage: make(Storage), + pendingStorage: make(Storage), + dirtyStorage: make(Storage), } } @@ -194,6 +204,29 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash { return s.GetCommittedState(db, key) } +func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) { + if value, cached := s.originStorage[key]; cached { + return value, true + } + // if L1 cache miss, try to get it from shared pool + if s.sharedOriginStorage != nil { + val, ok := s.sharedOriginStorage.Load(key) + if !ok { + return common.Hash{}, false + } + s.originStorage[key] = val.(common.Hash) + return val.(common.Hash), true + } + return common.Hash{}, false +} + +func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) { + if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil { + s.sharedOriginStorage.Store(key, value) + } + s.originStorage[key] = value +} + // GetCommittedState retrieves a value from the committed account storage trie. func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash { // If the fake storage is set, only lookup the state here(in the debugging mode) @@ -204,7 +237,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has if value, pending := s.pendingStorage[key]; pending { return value } - if value, cached := s.originStorage[key]; cached { + + if value, cached := s.getOriginStorage(key); cached { return value } // If no live objects are available, attempt to use snapshots @@ -263,7 +297,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has } value.SetBytes(content) } - s.originStorage[key] = value + s.setOriginStorage(key, value) return value } @@ -320,6 +354,7 @@ func (s *StateObject) finalise(prefetch bool) { slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure } } + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) } @@ -356,7 +391,6 @@ func (s *StateObject) updateTrie(db Database) Trie { continue } s.originStorage[key] = value - var v []byte if (value == common.Hash{}) { s.setError(tr.TryDelete(key[:])) diff --git a/core/state/statedb.go b/core/state/statedb.go index c523de6a92b7..c8cb410b0e3d 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -39,10 +39,7 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -const ( - preLoadLimit = 128 - defaultNumOfSlots = 100 -) +const defaultNumOfSlots = 100 type revision struct { id int @@ -101,6 +98,8 @@ type StateDB struct { stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects + writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer. // DB error. // State objects are used by the consensus core and VM which are // unable to deal with database-level errors. Any error that occurs @@ -147,6 +146,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return newStateDB(root, db, snaps) } +// NewWithSharedPool creates a new state with sharedStorge on layer 1.5 +func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { + statedb, err := newStateDB(root, db, snaps) + if err != nil { + return nil, err + } + statedb.storagePool = NewStoragePool() + return statedb, nil +} + func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { sdb := &StateDB{ db: db, @@ -178,6 +187,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, return sdb, nil } +func (s *StateDB) EnableWriteOnSharedStorage() { + s.writeOnSharedStorage = true +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. @@ -591,78 +604,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject { return nil } -func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) { - accounts := make(map[common.Address]bool, block.Transactions().Len()) - accountsSlice := make([]common.Address, 0, block.Transactions().Len()) - for _, tx := range block.Transactions() { - from, err := types.Sender(signer, tx) - if err != nil { - break - } - accounts[from] = true - if tx.To() != nil { - accounts[*tx.To()] = true - } - } - for account := range accounts { - accountsSlice = append(accountsSlice, account) - } - if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() { - objsChan := make(chan []*StateObject, runtime.NumCPU()) - for i := 0; i < runtime.NumCPU(); i++ { - start := i * len(accountsSlice) / runtime.NumCPU() - end := (i + 1) * len(accountsSlice) / runtime.NumCPU() - if i+1 == runtime.NumCPU() { - end = len(accountsSlice) - } - go func(start, end int) { - objs := s.preloadStateObject(accountsSlice[start:end]) - objsChan <- objs - }(start, end) - } - for i := 0; i < runtime.NumCPU(); i++ { - objs := <-objsChan - for _, obj := range objs { - s.SetStateObject(obj) - } - } - } -} - -func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject { - // Prefer live objects if any is available - if s.snap == nil { - return nil - } - hasher := crypto.NewKeccakState() - objs := make([]*StateObject, 0, len(address)) - for _, addr := range address { - // If no live objects are available, attempt to use snapshots - if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil { - if acc == nil { - continue - } - data := &Account{ - Nonce: acc.Nonce, - Balance: acc.Balance, - CodeHash: acc.CodeHash, - Root: common.BytesToHash(acc.Root), - } - if len(data.CodeHash) == 0 { - data.CodeHash = emptyCodeHash - } - if data.Root == (common.Hash{}) { - data.Root = emptyRoot - } - // Insert into the live set - obj := newObject(s, addr, *data) - objs = append(objs, obj) - } - // Do not enable this feature when snapshot is not enabled. - } - return objs -} - // getDeletedStateObject is similar to getStateObject, but instead of returning // nil for a deleted state object, it returns the actual object with the deleted // flag set. This is needed by the state journal to revert to the correct s- @@ -828,6 +769,7 @@ func (s *StateDB) Copy() *StateDB { stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)), stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), + storagePool: s.storagePool, refund: s.refund, logs: make(map[common.Hash][]*types.Log, len(s.logs)), logSize: s.logSize, @@ -1626,3 +1568,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address { } return accounts } + +func (s *StateDB) GetStorage(address common.Address) *sync.Map { + return s.storagePool.getStorage(address) +} diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 0e18bdc8bbf7..e45a6306df79 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -68,6 +68,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c for i := 0; i < prefetchThread; i++ { go func(idx int) { newStatedb := statedb.Copy() + newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) diff --git a/core/state_processor.go b/core/state_processor.go index 14fe9b4b9213..cf2275e72266 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -385,7 +385,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg gp = new(GasPool).AddGas(block.GasLimit()) ) signer := types.MakeSigner(p.bc.chainConfig, block.Number()) - statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { From d8949871fb623373f8908c9ab2018838ad0f26a9 Mon Sep 17 00:00:00 2001 From: WayToFuture <61674316+forcodedancing@users.noreply.github.com> Date: Mon, 28 Mar 2022 16:46:39 +0800 Subject: [PATCH 06/14] [R4R] state verification pipeline (#795) * pipeline state verification * update codes and add logs for debug * refactor * update and add logs * refactor * refactor * remove unneeded logs * fix a blocking issue * fix sync issue when force kill * remove logs * refactor based on comments * refactor based on comments * refactor based on comments * refactor based on comments * refactor based on comments * fix a deadlock issue * fix merkle root mismatch issue during sync * refactor based on review comments * remove unnecessary code * remove unnecessary code * refactor based on review comments * change based on comments * refactor * uew dummyRoot to replace emptyRoot * add nil check * add comments * remove unneeded codes * format comments Co-authored-by: forcodedancing --- core/block_validator.go | 8 ++- core/blockchain.go | 2 +- core/blockchain_test.go | 2 +- core/state/snapshot/difflayer.go | 40 +++++++++++- core/state/snapshot/disklayer.go | 13 ++++ core/state/snapshot/journal.go | 1 + core/state/snapshot/snapshot.go | 22 ++++++- core/state/state_object.go | 21 +++++-- core/state/statedb.go | 102 ++++++++++++++++++++++++++++--- core/state_processor.go | 2 +- core/types.go | 2 +- 11 files changed, 190 insertions(+), 25 deletions(-) diff --git a/core/block_validator.go b/core/block_validator.go index b109c1e54b3b..c6a35f1fdfef 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // transition, such as amount of used gas, the receipt roots and the state root // itself. ValidateState returns a database batch if the validation was a success // otherwise nil and an error is returned. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -135,13 +135,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return nil }, } - if skipHeavyVerify { + if statedb.IsPipeCommit() { validateFuns = append(validateFuns, func() error { if err := statedb.WaitPipeVerification(); err != nil { return err } + statedb.CorrectAccountsRoot() statedb.Finalise(v.config.IsEIP158(header.Number)) - statedb.AccountsIntermediateRoot() + // State verification pipeline - accounts root are not calculated here, just populate needed fields for process + statedb.PopulateSnapAccountAndStorage() return nil }) } else { diff --git a/core/blockchain.go b/core/blockchain.go index fbf9af9db786..47ac859441fc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Validate the state using the default validator substart = time.Now() if !statedb.IsLightProcessed() { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) return it.index, err diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 50d02e0acc74..07cb51933a0b 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index 65b2729d9cb8..d2b1b2778bb3 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,8 +118,9 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) - verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed - valid bool // mark the difflayer is valid or not. + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. + accountCorrected bool // mark the accountData has been corrected ort not diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s storageList: make(map[common.Hash][]common.Hash), verifiedCh: verified, } + switch parent := parent.(type) { case *diskLayer: dl.rebloom(parent) @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s default: panic("unknown parent type") } + // Sanity check that accounts or storage slots are never nil for accountHash, blob := range accounts { if blob == nil { @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool { } } +func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) { + dl.lock.Lock() + defer dl.lock.Unlock() + + dl.accountData = accounts + dl.accountCorrected = true +} + +func (dl *diffLayer) AccountsCorrected() bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.accountCorrected +} + // Parent returns the subsequent layer of a diff layer. func (dl *diffLayer) Parent() snapshot { return dl.parent @@ -314,6 +332,24 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) { return account, nil } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diffLayer) Accounts() (map[common.Hash]*Account, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + accounts := make(map[common.Hash]*Account, len(dl.accountData)) + for hash, data := range dl.accountData { + account := new(Account) + if err := rlp.DecodeBytes(data, account); err != nil { + return nil, err + } + accounts[hash] = account + } + + return accounts, nil +} + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. // diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index c1de41782ca7..6d46496a7179 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool { return true } +func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) { +} + +func (dl *diskLayer) AccountsCorrected() bool { + return true +} + // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil @@ -73,6 +80,12 @@ func (dl *diskLayer) Stale() bool { return dl.stale } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diskLayer) Accounts() (map[common.Hash]*Account, error) { + return nil, nil +} + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. func (dl *diskLayer) Account(hash common.Hash) (*Account, error) { diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 35c69cfd6bd5..587f78a474ee 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -288,6 +288,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { if dl.Stale() { return common.Hash{}, ErrSnapshotStale } + // Everything below was journalled, persist this layer too if err := rlp.Encode(buffer, dl.root); err != nil { return common.Hash{}, err diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 8ac93f28e4c0..7ad4bcc91b3c 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -107,13 +107,23 @@ type Snapshot interface { // Verified returns whether the snapshot is verified Verified() bool - // Store the verification result + // MarkValid stores the verification result MarkValid() + // CorrectAccounts updates account data for storing the correct data during pipecommit + CorrectAccounts(map[common.Hash][]byte) + + // AccountsCorrected checks whether the account data has been corrected during pipecommit + AccountsCorrected() bool + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) + // Accounts directly retrieves all accounts in current snapshot in + // the snapshot slim data format. + Accounts() (map[common.Hash]*Account, error) + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. AccountRLP(hash common.Hash) ([]byte, error) @@ -240,6 +250,11 @@ func (t *Tree) waitBuild() { } } +// Layers returns the number of layers +func (t *Tree) Layers() int { + return len(t.layers) +} + // Disable interrupts any pending snapshot generator, deletes all the snapshot // layers in memory and marks snapshots disabled globally. In order to resume // the snapshot functionality, the caller must invoke Rebuild. @@ -666,6 +681,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) { if snap == nil { return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root) } + // Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data + if !snap.WaitAndGetVerifyRes() { + return common.Hash{}, ErrSnapshotStale + } + // Run the journaling t.lock.Lock() defer t.lock.Unlock() diff --git a/core/state/state_object.go b/core/state/state_object.go index c5212e91cc24..a89feebf2814 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -64,10 +64,11 @@ func (s Storage) Copy() Storage { // Account values can be accessed and modified through the object. // Finally, call CommitTrie to write the modified storage trie into a database. type StateObject struct { - address common.Address - addrHash common.Hash // hash of ethereum address of the account - data Account - db *StateDB + address common.Address + addrHash common.Hash // hash of ethereum address of the account + data Account + db *StateDB + rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode // DB error. // State objects are used by the consensus core and VM which are @@ -355,7 +356,17 @@ func (s *StateObject) finalise(prefetch bool) { } } - if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { + // The account root need to be updated before prefetch, otherwise the account root is empty + if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() { + if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil { + if acc != nil && len(acc.Root) != 0 { + s.data.Root = common.BytesToHash(acc.Root) + s.rootCorrected = true + } + } + } + + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot { s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) } if len(s.dirtyStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index c8cb410b0e3d..2621907f6c95 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -50,6 +50,10 @@ var ( // emptyRoot is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + // dummyRoot is the dummy account root before corrected in pipecommit sync mode, + // the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28 + dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root")) + emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes()) ) @@ -229,11 +233,16 @@ func (s *StateDB) MarkLightProcessed() { // Enable the pipeline commit function of statedb func (s *StateDB) EnablePipeCommit() { - if s.snap != nil { + if s.snap != nil && s.snaps.Layers() > 1 { s.pipeCommit = true } } +// IsPipeCommit checks whether pipecommit is enabled on the statedb or not +func (s *StateDB) IsPipeCommit() bool { + return s.pipeCommit +} + // Mark that the block is full processed func (s *StateDB) MarkFullProcessed() { s.fullProcessed = true @@ -965,6 +974,65 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { return s.StateIntermediateRoot() } +//CorrectAccountsRoot will fix account roots in pipecommit mode +func (s *StateDB) CorrectAccountsRoot() { + if accounts, err := s.snap.Accounts(); err == nil && accounts != nil { + for _, obj := range s.stateObjects { + if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot { + if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist && len(account.Root) != 0 { + obj.data.Root = common.BytesToHash(account.Root) + } + } + } + } +} + +//PopulateSnapAccountAndStorage tries to populate required accounts and storages for pipecommit +func (s *StateDB) PopulateSnapAccountAndStorage() { + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + if s.snap != nil && !obj.deleted { + root := obj.data.Root + storageChanged := s.populateSnapStorage(obj) + if storageChanged { + root = dummyRoot + } + s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash) + } + } + } +} + +//populateSnapStorage tries to populate required storages for pipecommit, and returns a flag to indicate whether the storage root changed or not +func (s *StateDB) populateSnapStorage(obj *StateObject) bool { + for key, value := range obj.dirtyStorage { + obj.pendingStorage[key] = value + } + if len(obj.pendingStorage) == 0 { + return false + } + var storage map[string][]byte + for key, value := range obj.pendingStorage { + var v []byte + if (value != common.Hash{}) { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + } + // If state snapshotting is active, cache the data til commit + if obj.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = obj.db.snapStorage[obj.address]; storage == nil { + storage = make(map[string][]byte) + obj.db.snapStorage[obj.address] = storage + } + } + storage[string(key[:])] = v // v will be nil if value is 0x00 + } + } + return true +} + func (s *StateDB) AccountsIntermediateRoot() { tasks := make(chan func()) finishCh := make(chan struct{}) @@ -1050,6 +1118,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { } s.trie = tr } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; obj.deleted { @@ -1062,6 +1131,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { if prefetcher != nil { prefetcher.used(s.originalRoot, usedAddrs) } + if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) } @@ -1239,6 +1309,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er var diffLayer *types.DiffLayer var verified chan struct{} var snapUpdated chan struct{} + if s.snap != nil { diffLayer = &types.DiffLayer{} } @@ -1250,9 +1321,24 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er commmitTrie := func() error { commitErr := func() error { + if s.pipeCommit { + <-snapUpdated + // Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct, capture the correct data here + s.AccountsIntermediateRoot() + if parent := s.snap.Root(); parent != s.expectedRoot { + accountData := make(map[common.Hash][]byte) + for k, v := range s.snapAccounts { + accountData[crypto.Keccak256Hash(k[:])] = v + } + s.snaps.Snapshot(s.expectedRoot).CorrectAccounts(accountData) + } + } + if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot { + log.Error("Invalid merkle root", "remote", s.expectedRoot, "local", s.stateRoot) return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) } + tasks := make(chan func()) taskResults := make(chan error, len(s.stateObjectsDirty)) tasksNum := 0 @@ -1328,12 +1414,10 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { if commitErr == nil { - <-snapUpdated s.snaps.Snapshot(s.stateRoot).MarkValid() } else { // The blockchain will do the further rewind if write block not finish yet if failPostCommitFunc != nil { - <-snapUpdated failPostCommitFunc() } log.Error("state verification failed", "err", commitErr) @@ -1383,11 +1467,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { defer close(snapUpdated) } + diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != s.expectedRoot { - if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil { + err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified) + + if err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) } + // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state @@ -1401,12 +1489,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er } return nil }, - func() error { - if s.snap != nil { - diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() - } - return nil - }, } if s.pipeCommit { go commmitTrie() diff --git a/core/state_processor.go b/core/state_processor.go index cf2275e72266..26f5f84fb9e1 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -337,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } // Do validate in advance so that we can fall back to full process - if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil { + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } diff --git a/core/types.go b/core/types.go index 61722aea74cd..c9061233e6f3 100644 --- a/core/types.go +++ b/core/types.go @@ -31,7 +31,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error } // Prefetcher is an interface for pre-caching transaction signatures and state. From cde35b0b3692fca263841620e142b7cbde4f876a Mon Sep 17 00:00:00 2001 From: flywukong <2229306838@qq.com> Date: Tue, 29 Mar 2022 14:22:10 +0800 Subject: [PATCH 07/14] add sharedStorage to the prefetcher of miner (#818) * add sharedStorage to the prefetcher of miner * fix miner prefetcher copy --- core/blockchain.go | 5 +++++ core/state/state_object.go | 4 ++-- core/state_prefetcher.go | 1 + miner/worker.go | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 47ac859441fc..eeaf1b7e0aaa 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -842,6 +842,11 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { return state.New(root, bc.stateCache, bc.snaps) } +// StateAtWithSharedPool returns a new mutable state based on a particular point in time with sharedStorage +func (bc *BlockChain) StateAtWithSharedPool(root common.Hash) (*state.StateDB, error) { + return state.NewWithSharedPool(root, bc.stateCache, bc.snaps) +} + // StateCache returns the caching database underpinning the blockchain instance. func (bc *BlockChain) StateCache() state.Database { return bc.stateCache diff --git a/core/state/state_object.go b/core/state/state_object.go index a89feebf2814..b40a8a2f85b4 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -81,8 +81,8 @@ type StateObject struct { trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded - sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction - originStorage Storage + sharedOriginStorage *sync.Map // Point to the entry of the stateObject in sharedPool + originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block dirtyStorage Storage // Storage entries that have been modified in the current transaction execution diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index e45a6306df79..4909cc30815e 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -101,6 +101,7 @@ func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { idx := 0 newStatedb := statedb.Copy() + newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(gasLimit) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) diff --git a/miner/worker.go b/miner/worker.go index 2224dc007128..34b3c2fc8945 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -661,7 +661,7 @@ func (w *worker) resultLoop() { func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { // Retrieve the parent state to execute on top and start a prefetcher for // the miner to speed block sealing up a bit - state, err := w.chain.StateAt(parent.Root()) + state, err := w.chain.StateAtWithSharedPool(parent.Root()) if err != nil { return err } From 4598334a9a0710a811fc9250ade6573cf443ad07 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Wed, 30 Mar 2022 11:22:00 +0800 Subject: [PATCH 08/14] disable diffsync when pipecommit is enabled (#820) --- cmd/utils/flags.go | 2 +- eth/backend.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8f5141907fe9..85d1cef88737 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -129,7 +129,7 @@ var ( } PipeCommitFlag = cli.BoolFlag{ Name: "pipecommit", - Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)", + Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false), diffsync will be disable if pipeline commit is enabled", } RangeLimitFlag = cli.BoolFlag{ Name: "rangelimit", diff --git a/eth/backend.go b/eth/backend.go index ab93006437d1..3f782ff6a81c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -200,7 +200,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } ) bcOps := make([]core.BlockChainOption, 0) - if config.DiffSync { + // TODO diffsync performance is not as expected, disable it when pipecommit is enabled for now + if config.DiffSync && !config.PipeCommit { bcOps = append(bcOps, core.EnableLightProcessor) } if config.PipeCommit { From c57b02c293abeee794c48edd14539a3f2f5387e3 Mon Sep 17 00:00:00 2001 From: flywukong <2229306838@qq.com> Date: Fri, 1 Apr 2022 21:56:23 +0800 Subject: [PATCH 09/14] change prefetch thread num (#830) --- core/state_prefetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 4909cc30815e..bf0a6b80c64b 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -26,7 +26,7 @@ import ( "github.com/ethereum/go-ethereum/params" ) -const prefetchThread = 2 +const prefetchThread = 3 const checkInterval = 10 // statePrefetcher is a basic Prefetcher, which blindly executes a block on top From f5a1c073bc9ac786652457719e5540f08abf87ec Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Sat, 2 Apr 2022 11:52:03 +0800 Subject: [PATCH 10/14] fix deadlock when failed to verify state root (#834) --- core/state/statedb.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 2621907f6c95..2cdd757a22e8 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1415,14 +1415,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { if commitErr == nil { s.snaps.Snapshot(s.stateRoot).MarkValid() + close(verified) } else { // The blockchain will do the further rewind if write block not finish yet + close(verified) if failPostCommitFunc != nil { failPostCommitFunc() } log.Error("state verification failed", "err", commitErr) } - close(verified) } return commitErr } From 05925da696fdfbcfa97a42e3f14b18361d237938 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Sat, 2 Apr 2022 17:53:19 +0800 Subject: [PATCH 11/14] fix deadlock on miner module when failed to commit trie (#835) --- core/state/statedb.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 2cdd757a22e8..76983b2a9c79 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1366,10 +1366,8 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er // Write any contract code associated with the state object tasks <- func() { // Write any storage changes in the state object to its storage trie - if err := obj.CommitTrie(s.db); err != nil { - taskResults <- err - } - taskResults <- nil + err := obj.CommitTrie(s.db) + taskResults <- err } tasksNum++ } From 1ff47214341cd83ae0cf8b58b5b633d78e345c38 Mon Sep 17 00:00:00 2001 From: qinglin89 <316032931@qq.com> Date: Wed, 6 Apr 2022 13:29:41 +0800 Subject: [PATCH 12/14] fix:replace fake transaction with first peek for txCurr --- miner/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 34b3c2fc8945..6715213e94c0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -782,10 +782,10 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin interruptCh := make(chan struct{}) defer close(interruptCh) - tx := &types.Transaction{} - txCurr := &tx //prefetch txs from all pending txs txsPrefetch := txs.Copy() + tx := txsPrefetch.Peek() + txCurr := &tx w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) LOOP: From 343b3150303f1768940efdc161cde49fa38a2552 Mon Sep 17 00:00:00 2001 From: WayToFuture <61674316+forcodedancing@users.noreply.github.com> Date: Wed, 6 Apr 2022 18:21:54 +0800 Subject: [PATCH 13/14] put error check to the correct location (#842) Co-authored-by: forcodedancing --- core/state_processor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/state_processor.go b/core/state_processor.go index 26f5f84fb9e1..8e9422a8e873 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -123,13 +123,13 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps) + if err != nil { + return statedb, nil, nil, 0, err + } statedb.SetExpectedStateRoot(block.Root()) if p.bc.pipeCommit { statedb.EnablePipeCommit() } - if err != nil { - return statedb, nil, nil, 0, err - } // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") } From 1aeadc143aa876a64ab6783e2cce8b62c66b14f7 Mon Sep 17 00:00:00 2001 From: WayToFuture <61674316+forcodedancing@users.noreply.github.com> Date: Fri, 8 Apr 2022 15:59:40 +0800 Subject: [PATCH 14/14] [R4R] prepare for release of v1.1.9 (#849) * prepare release of v1.1.9 * prepare release of v1.1.9 * prepare release of v1.1.9 Co-authored-by: forcodedancing --- .github/release.env | 4 ++-- CHANGELOG.md | 19 +++++++++++++++++++ params/version.go | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/.github/release.env b/.github/release.env index 2034c1d3fa7a..62e94fa4bd5b 100644 --- a/.github/release.env +++ b/.github/release.env @@ -1,2 +1,2 @@ -MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/mainnet.zip" -TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/testnet.zip" +MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.8/mainnet.zip" +TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.8/testnet.zip" diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dfa55dfb8a3..9100249fa08d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ # Changelog +## v1.1.9 + +IMPROVEMENT +* [\#792](https://github.com/binance-chain/bsc/pull/792) add shared storage for prefetching state data +* [\#795](https://github.com/binance-chain/bsc/pull/795) implement state verification pipeline in pipecommit +* [\#803](https://github.com/binance-chain/bsc/pull/803) prefetch state data during the mining process +* [\#812](https://github.com/bnb-chain/bsc/pull/812) skip verification on account storage root to tolerate with fastnode when doing diffsync +* [\#818](https://github.com/bnb-chain/bsc/pull/818) add shared storage to the prefetcher of miner +* [\#820](https://github.com/bnb-chain/bsc/pull/820) disable diffsync when pipecommit is enabled +* [\#830](https://github.com/bnb-chain/bsc/pull/830) change the number of prefetch threads + +BUGFIX +* [\#797](https://github.com/bnb-chain/bsc/pull/797) fix race condition on preimage in pipecommit +* [\#808](https://github.com/bnb-chain/bsc/pull/808) fix code of difflayer not assign when new smart contract created +* [\#817](https://github.com/bnb-chain/bsc/pull/817) fix bugs of prune block tool +* [\#834](https://github.com/bnb-chain/bsc/pull/834) fix deadlock when failed to verify state root in pipecommit +* [\#835](https://github.com/bnb-chain/bsc/pull/835) fix deadlock on miner module when failed to commit trie +* [\#842](https://github.com/bnb-chain/bsc/pull/842) fix invalid nil check of statedb in diffsync + ## v1.1.8 FEATURES * [\#668](https://github.com/binance-chain/bsc/pull/668) implement State Verification && Snapshot Commit pipeline diff --git a/params/version.go b/params/version.go index 8faa4bb6448d..634f527876f2 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 1 // Minor version component of the current release - VersionPatch = 8 // Patch version component of the current release + VersionPatch = 9 // Patch version component of the current release VersionMeta = "" // Version metadata to append to the version string )