From 9dd3b43a03b4ea8361458097955446b02a23a0e6 Mon Sep 17 00:00:00 2001 From: fudongbai <296179868@qq.com> Date: Sun, 22 Aug 2021 18:28:27 +0800 Subject: [PATCH] implement block process part of light sync --- cmd/evm/internal/t8ntool/execution.go | 4 +- cmd/utils/flags.go | 15 +- core/blockchain.go | 113 ++++++++++-- core/chain_makers.go | 2 +- core/rawdb/accessors_chain.go | 38 ++++ core/rawdb/database.go | 25 +++ core/rawdb/freezer.go | 3 + core/rawdb/schema.go | 8 + core/rawdb/table.go | 8 + core/state/dump.go | 8 +- core/state/iterator.go | 2 +- core/state/journal.go | 2 +- core/state/snapshot/snapshot.go | 30 +++- core/state/state_object.go | 13 +- core/state/state_test.go | 4 +- core/state/statedb.go | 238 +++++++++++++++++++------- core/state/statedb_test.go | 20 +-- core/state_processor.go | 161 +++++++++++++++++ core/types/block.go | 27 +++ eth/backend.go | 5 +- eth/ethconfig/config.go | 2 + eth/ethconfig/gen_config.go | 12 ++ eth/state_accessor.go | 2 +- ethdb/database.go | 10 +- node/node.go | 42 +++++ tests/state_test_util.go | 2 +- 26 files changed, 690 insertions(+), 106 deletions(-) diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index c3f1b16efc..0471037ed8 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -223,7 +223,7 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, statedb.AddBalance(pre.Env.Coinbase, minerReward) } // Commit block - root, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber)) + root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber)) if err != nil { fmt.Fprintf(os.Stderr, "Could not commit state: %v", err) return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err)) @@ -252,7 +252,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB } } // Commit and re-open to start with a clean state. - root, _ := statedb.Commit(false) + root, _, _ := statedb.Commit(false) statedb, _ = state.New(root, sdb, nil) return statedb } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b4d93fc1f1..a049547ba5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -125,6 +125,10 @@ var ( Name: "datadir.ancient", Usage: "Data directory for ancient chain segments (default = inside chaindata)", } + DiffFlag = DirectoryFlag{ + Name: "datadir.diff", + Usage: "Data directory for difflayer segments (default = inside chaindata)", + } MinFreeDiskSpaceFlag = DirectoryFlag{ Name: "datadir.minfreedisk", Usage: "Minimum free disk space in MB, once reached triggers auto shut down (default = --cache.gc converted to MB, 0 = disabled)", @@ -425,6 +429,10 @@ var ( Name: "cache.preimages", Usage: "Enable recording the SHA3/keccak preimages of trie keys", } + PersistDiffFlag = cli.BoolFlag{ + Name: "persistdiff", + Usage: "Enable persist the difflayer", + } // Miner settings MiningEnabledFlag = cli.BoolFlag{ Name: "mine", @@ -1564,7 +1572,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(AncientFlag.Name) { cfg.DatabaseFreezer = ctx.GlobalString(AncientFlag.Name) } - + if ctx.GlobalIsSet(DiffFlag.Name) { + cfg.DatabaseDiff = ctx.GlobalString(DiffFlag.Name) + } + if ctx.GlobalIsSet(PersistDiffFlag.Name) { + cfg.PersistDiff = ctx.GlobalBool(PersistDiffFlag.Name) + } if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" { Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name) } diff --git a/core/blockchain.go b/core/blockchain.go index be0b0f04ae..a23575c653 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -82,6 +82,7 @@ var ( const ( bodyCacheLimit = 256 blockCacheLimit = 256 + diffLayerCacheLimit = 1024 receiptsCacheLimit = 10000 txLookupCacheLimit = 1024 maxFutureBlocks = 256 @@ -89,6 +90,9 @@ const ( badBlockLimit = 10 maxBeyondBlocks = 2048 + diffLayerfreezerRecheckInterval = 3 * time.Second + diffLayerfreezerBlockLimit = 864000 // 1 month for 3 second blocking time + // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // // Changelog: @@ -188,13 +192,15 @@ type BlockChain struct { currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) - stateCache state.Database // State database to reuse between imports (contains state cache) - bodyCache *lru.Cache // Cache for the most recent block bodies - bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format - receiptsCache *lru.Cache // Cache for the most recent receipts per block - blockCache *lru.Cache // Cache for the most recent entire blocks - txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. - futureBlocks *lru.Cache // future blocks are blocks added for later processing + stateCache state.Database // State database to reuse between imports (contains state cache) + bodyCache *lru.Cache // Cache for the most recent block bodies + bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format + receiptsCache *lru.Cache // Cache for the most recent receipts per block + blockCache *lru.Cache // Cache for the most recent entire blocks + diffLayerCache *lru.Cache // Cache for the diffLayers + txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. + futureBlocks *lru.Cache // future blocks are blocks added for later processing + diffQueue *prque.Prque quit chan struct{} // blockchain quit channel wg sync.WaitGroup // chain processing wait group for shutting down @@ -223,6 +229,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) receiptsCache, _ := lru.New(receiptsCacheLimit) + diffLayerCache, _ := lru.New(diffLayerCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) @@ -244,10 +251,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, blockCache: blockCache, + diffLayerCache: diffLayerCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, engine: engine, vmConfig: vmConfig, + diffQueue: prque.New(nil), } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) @@ -396,6 +405,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit) }() } + if bc.db.DiffStore() != nil { + go bc.diffLayerFreeze() + } return bc, nil } @@ -408,6 +420,13 @@ func (bc *BlockChain) CacheReceipts(hash common.Hash, receipts types.Receipts) { bc.receiptsCache.Add(hash, receipts) } +func (bc *BlockChain) CacheDiffLayer(hash common.Hash, num uint64, diffLayer *types.DiffLayer) { + bc.diffLayerCache.Add(hash, diffLayer) + if bc.db.DiffStore() != nil { + bc.diffQueue.Push(diffLayer, -(int64(num))) + } +} + func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) { bc.blockCache.Add(hash, block) } @@ -1506,10 +1525,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. wg.Done() }() // Commit all cached state changes into underlying memory database. - root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + root, diffLayer, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { return NonStatTy, err } + + if diffLayer != nil && block.Header().TxHash == types.EmptyRootHash { + diffLayer.Receipts = receipts + diffLayer.Hash = block.Hash() + bc.CacheDiffLayer(block.Hash(), block.Number().Uint64(), diffLayer) + } triedb := bc.stateCache.TrieDB() // If we're running an archive node, always flush @@ -1895,8 +1920,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er bc.reportBlock(block, receipts, err) return it.index, err } - bc.CacheReceipts(block.Hash(), receipts) - bc.CacheBlock(block.Hash(), block) // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them @@ -1916,6 +1939,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er log.Error("validate state failed", "error", err) return it.index, err } + bc.CacheReceipts(block.Hash(), receipts) + bc.CacheBlock(block.Hash(), block) proctime := time.Since(start) // Update the metrics touched during block validation @@ -2292,6 +2317,74 @@ func (bc *BlockChain) update() { } } +func (bc *BlockChain) diffLayerFreeze() { + recheck := time.Tick(diffLayerfreezerRecheckInterval) + for { + Loop: + select { + case <-bc.quit: + // persist all diffLayers + var batch ethdb.Batch + for !bc.diffQueue.Empty() { + diff, _ := bc.diffQueue.Pop() + diffLayer := diff.(*types.DiffLayer) + rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer) + if batch != nil && batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Error("Failed to write diff layer", "err", err) + return + } + batch.Reset() + } + } + if batch != nil { + if err := batch.Write(); err != nil { + log.Error("Failed to write diff layer", "err", err) + return + } + batch.Reset() + } + return + case <-recheck: + currentHeight := bc.CurrentBlock().NumberU64() + var batch ethdb.Batch + for !bc.diffQueue.Empty() { + diff, prio := bc.diffQueue.Pop() + diffLayer := diff.(*types.DiffLayer) + + if int64(currentHeight)+prio > int64(bc.triesInMemory) { + canonicalHash := bc.GetCanonicalHash(uint64(-prio)) + if canonicalHash == diffLayer.Hash { + if batch == nil { + batch = bc.db.DiffStore().NewBatch() + } + rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer) + staleHash := bc.GetCanonicalHash(uint64(-prio) - diffLayerfreezerBlockLimit) + rawdb.DeleteDiffLayer(batch, staleHash) + } + } else { + bc.diffQueue.Push(diffLayer, prio) + break + } + if batch != nil && batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Error("Failed to write diff layer", "err", err) + break Loop + } + batch.Reset() + } + } + if batch != nil { + if err := batch.Write(); err != nil { + log.Error("Failed to write diff layer", "err", err) + break Loop + } + batch.Reset() + } + } + } +} + // maintainTxIndex is responsible for the construction and deletion of the // transaction index. // diff --git a/core/chain_makers.go b/core/chain_makers.go index 6cb74d51be..9ded01a433 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -223,7 +223,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse block, _, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db - root, err := statedb.Commit(config.IsEIP158(b.header.Number)) + root, _, err := statedb.Commit(config.IsEIP158(b.header.Number)) if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 76132bf37e..6e14cbbd8b 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -447,6 +447,44 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t WriteBodyRLP(db, hash, number, data) } +func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.DiffLayer) { + data, err := rlp.EncodeToBytes(layer) + if err != nil { + log.Crit("Failed to RLP encode diff layer", "err", err) + } + WriteDiffLayerRLP(db, hash, data) +} + +func WriteDiffLayerRLP(db ethdb.KeyValueWriter, hash common.Hash, rlp rlp.RawValue) { + if err := db.Put(diffLayerKey(hash), rlp); err != nil { + log.Crit("Failed to store block body", "err", err) + } +} + +func ReadDiffLayer(db ethdb.Reader, hash common.Hash) *types.DiffLayer { + data := ReadDiffLayerRLP(db, hash) + if len(data) == 0 { + return nil + } + diff := new(types.DiffLayer) + if err := rlp.Decode(bytes.NewReader(data), diff); err != nil { + log.Error("Invalid diff layer RLP", "hash", hash, "err", err) + return nil + } + return diff +} + +func ReadDiffLayerRLP(db ethdb.Reader, hash common.Hash) rlp.RawValue { + data, _ := db.Get(diffLayerKey(hash)) + return data +} + +func DeleteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(diffLayerKey(hash)); err != nil { + log.Crit("Failed to delete diffLayer", "err", err) + } +} + // DeleteBody removes all block body data associated with a hash. func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(blockBodyKey(number, hash)); err != nil { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 725972f9ba..89be8c16e8 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -36,6 +36,7 @@ import ( type freezerdb struct { ethdb.KeyValueStore ethdb.AncientStore + diffStore ethdb.KeyValueStore } // Close implements io.Closer, closing both the fast key-value store as well as @@ -48,12 +49,28 @@ func (frdb *freezerdb) Close() error { if err := frdb.KeyValueStore.Close(); err != nil { errs = append(errs, err) } + if frdb.diffStore != nil { + if err := frdb.diffStore.Close(); err != nil { + errs = append(errs, err) + } + } if len(errs) != 0 { return fmt.Errorf("%v", errs) } return nil } +func (frdb *freezerdb) DiffStore() ethdb.KeyValueStore { + return frdb.diffStore +} + +func (frdb *freezerdb) SetDiffStore(diff ethdb.KeyValueStore) { + if frdb.diffStore != nil { + frdb.diffStore.Close() + } + frdb.diffStore = diff +} + // Freeze is a helper method used for external testing to trigger and block until // a freeze cycle completes, without having to sleep for a minute to trigger the // automatic background run. @@ -114,6 +131,14 @@ func (db *nofreezedb) Sync() error { return errNotSupported } +func (db *nofreezedb) DiffStore() ethdb.KeyValueStore { + return nil +} + +func (db *nofreezedb) SetDiffStore(diff ethdb.KeyValueStore) { + // not implement +} + // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 94b99a64eb..61067ea0a2 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -61,6 +61,9 @@ const ( // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. freezerBatchLimit = 30000 + + // the number of diffLayer that can be persisted, default value is for 1 month + diffLayerLimit = 30 * 24 * 3600 / 3 ) // freezer is an memory mapped append-only database to store immutable chain data diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 2505ce90b9..36c53fb0cd 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -90,6 +90,9 @@ var ( SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code + // difflayer database + diffLayerPrefix = []byte("d") // diffLayerPrefix + num (uint64 big endian) -> diffLayer + preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -177,6 +180,11 @@ func blockReceiptsKey(number uint64, hash common.Hash) []byte { return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) } +// diffLayerKey = diffLayerKeyPrefix + hash +func diffLayerKey(hash common.Hash) []byte { + return append(append(diffLayerPrefix, hash.Bytes()...)) +} + // txLookupKey = txLookupPrefix + hash func txLookupKey(hash common.Hash) []byte { return append(txLookupPrefix, hash.Bytes()...) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 323ef6293c..47e99bffe5 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -159,6 +159,14 @@ func (t *table) NewBatch() ethdb.Batch { return &tableBatch{t.db.NewBatch(), t.prefix} } +func (t *table) DiffStore() ethdb.KeyValueStore { + return nil +} + +func (t *table) SetDiffStore(diff ethdb.KeyValueStore) { + // not implement +} + // tableBatch is a wrapper around a database batch that prefixes each key access // with a pre-configured string. type tableBatch struct { diff --git a/core/state/dump.go b/core/state/dump.go index b25da714fd..1bc79433eb 100644 --- a/core/state/dump.go +++ b/core/state/dump.go @@ -113,10 +113,10 @@ func (d iterativeDump) OnRoot(root common.Hash) { func (s *StateDB) DumpToCollector(c DumpCollector, excludeCode, excludeStorage, excludeMissingPreimages bool, start []byte, maxResults int) (nextKey []byte) { missingPreimages := 0 - c.OnRoot(s.trie.Hash()) + c.OnRoot(s.Trie.Hash()) var count int - it := trie.NewIterator(s.trie.NodeIterator(start)) + it := trie.NewIterator(s.Trie.NodeIterator(start)) for it.Next() { var data Account if err := rlp.DecodeBytes(it.Value, &data); err != nil { @@ -128,7 +128,7 @@ func (s *StateDB) DumpToCollector(c DumpCollector, excludeCode, excludeStorage, Root: common.Bytes2Hex(data.Root[:]), CodeHash: common.Bytes2Hex(data.CodeHash), } - addrBytes := s.trie.GetKey(it.Key) + addrBytes := s.Trie.GetKey(it.Key) if addrBytes == nil { // Preimage missing missingPreimages++ @@ -151,7 +151,7 @@ func (s *StateDB) DumpToCollector(c DumpCollector, excludeCode, excludeStorage, log.Error("Failed to decode the value returned by iterator", "error", err) continue } - account.Storage[common.BytesToHash(s.trie.GetKey(storageIt.Key))] = common.Bytes2Hex(content) + account.Storage[common.BytesToHash(s.Trie.GetKey(storageIt.Key))] = common.Bytes2Hex(content) } } c.OnAccount(addr, account) diff --git a/core/state/iterator.go b/core/state/iterator.go index 6a5c73d3d1..2bab5eaa72 100644 --- a/core/state/iterator.go +++ b/core/state/iterator.go @@ -74,7 +74,7 @@ func (it *NodeIterator) step() error { } // Initialize the iterator if we've just started if it.stateIt == nil { - it.stateIt = it.state.trie.NodeIterator(nil) + it.stateIt = it.state.Trie.NodeIterator(nil) } // If we had data nodes previously, we surely have at least state nodes if it.dataIt != nil { diff --git a/core/state/journal.go b/core/state/journal.go index 366e0c9c26..05db5a5c80 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -153,7 +153,7 @@ func (ch createObjectChange) dirtied() *common.Address { func (ch resetObjectChange) revert(s *StateDB) { s.SetStateObject(ch.prev) if !ch.prevdestruct && s.snap != nil { - delete(s.snapDestructs, ch.prev.addrHash) + delete(s.SnapDestructs, ch.prev.address) } } diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 1b0d883439..20713197b1 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" @@ -324,7 +326,7 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot { // Update adds a new snapshot into the tree, if that can be linked to an existing // old parent. It is disallowed to insert a disk layer (the origin of all). -func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error { +func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) error { // Reject noop updates to avoid self-loops in the snapshot tree. This is a // special case that can only happen for Clique networks where empty blocks // don't modify the state (0 block subsidy). @@ -339,7 +341,9 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs m if parent == nil { return fmt.Errorf("parent [%#x] snapshot missing", parentRoot) } - snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage) + + hashdestructs, hashAccounts, hashStorage := transformSnap(destructs, accounts, storage) + snap := parent.(snapshot).Update(blockRoot, hashdestructs, hashAccounts, hashStorage) // Save the new snapshot for later t.lock.Lock() @@ -836,3 +840,25 @@ func (t *Tree) DiskRoot() common.Hash { return t.diskRoot() } + +// TODO need improve +func transformSnap(destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) (map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte) { + hasher := crypto.NewKeccakState() + hashDestructs := make(map[common.Hash]struct{}, len(destructs)) + hashAccounts := make(map[common.Hash][]byte, len(accounts)) + hashStorages := make(map[common.Hash]map[common.Hash][]byte, len(storage)) + for addr := range destructs { + hashDestructs[crypto.Keccak256Hash(addr[:])] = struct{}{} + } + for addr, account := range accounts { + hashAccounts[crypto.Keccak256Hash(addr[:])] = account + } + for addr, accountStore := range storage { + hashStorage := make(map[common.Hash][]byte, len(accountStore)) + for k, v := range accountStore { + hashStorage[crypto.HashData(hasher, []byte(k))] = v + } + hashStorages[crypto.Keccak256Hash(addr[:])] = hashStorage + } + return hashDestructs, hashAccounts, hashStorages +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 623d07ac13..d4e356b977 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -234,7 +234,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has // 1) resurrect happened, and new slot values were set -- those should // have been handles via pendingStorage above. // 2) we don't have new values, and can deliver empty response back - if _, destructed := s.db.snapDestructs[s.addrHash]; destructed { + if _, destructed := s.db.SnapDestructs[s.address]; destructed { return common.Hash{} } enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes())) @@ -345,10 +345,9 @@ func (s *StateObject) updateTrie(db Database) Trie { }(time.Now()) } // The snapshot storage map for the object - var storage map[common.Hash][]byte + var storage map[string][]byte // Insert all the pending updates into the trie tr := s.getTrie(db) - hasher := s.db.hasher usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { @@ -371,12 +370,12 @@ func (s *StateObject) updateTrie(db Database) Trie { s.db.snapMux.Lock() if storage == nil { // Retrieve the old storage map, if available, create a new one otherwise - if storage = s.db.snapStorage[s.addrHash]; storage == nil { - storage = make(map[common.Hash][]byte) - s.db.snapStorage[s.addrHash] = storage + if storage = s.db.SnapStorage[s.address]; storage == nil { + storage = make(map[string][]byte) + s.db.SnapStorage[s.address] = storage } } - storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00 + storage[string(key[:])] = v // v will be nil if value is 0x00 s.db.snapMux.Unlock() } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure diff --git a/core/state/state_test.go b/core/state/state_test.go index 9f003fefb5..d97377c0c5 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -167,8 +167,8 @@ func TestSnapshot2(t *testing.T) { so0.deleted = false state.SetStateObject(so0) - root, _ := state.Commit(false) - state, _ = New(root, state.db, state.snaps) + root, _, _ := state.Commit(false) + state, _ = New(root, state.db, state.Snaps) // and one with deleted == true so1 := state.getStateObject(stateobjaddr1) diff --git a/core/state/statedb.go b/core/state/statedb.go index 7940613cd6..fc7be765b8 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -74,16 +74,20 @@ func (n *proofList) Delete(key []byte) error { type StateDB struct { db Database prefetcher *triePrefetcher - originalRoot common.Hash // The pre-state root, before any changes were made - trie Trie + OriginalRoot common.Hash // The pre-state root, before any changes were made + Trie Trie hasher crypto.KeccakState + DiffLayer *types.DiffLayer + DiffTries map[common.Address]Trie + DiffCode map[common.Hash][]byte + DiffEnabled bool snapMux sync.Mutex - snaps *snapshot.Tree + Snaps *snapshot.Tree snap snapshot.Snapshot - snapDestructs map[common.Hash]struct{} - snapAccounts map[common.Hash][]byte - snapStorage map[common.Hash]map[common.Hash][]byte + SnapDestructs map[common.Address]struct{} + SnapAccounts map[common.Address][]byte + SnapStorage map[common.Address]map[string][]byte // This map holds 'live' objects, which will get modified while processing a state transition. stateObjects map[common.Address]*StateObject @@ -139,8 +143,8 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { sdb := &StateDB{ db: db, - originalRoot: root, - snaps: snaps, + OriginalRoot: root, + Snaps: snaps, stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots), stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots), stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots), @@ -153,12 +157,12 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, if err != nil { return nil, err } - sdb.trie = tr - if sdb.snaps != nil { - if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { - sdb.snapDestructs = make(map[common.Hash]struct{}) - sdb.snapAccounts = make(map[common.Hash][]byte) - sdb.snapStorage = make(map[common.Hash]map[common.Hash][]byte) + sdb.Trie = tr + if sdb.Snaps != nil { + if sdb.snap = sdb.Snaps.Snapshot(root); sdb.snap != nil { + sdb.SnapDestructs = make(map[common.Address]struct{}) + sdb.SnapAccounts = make(map[common.Address][]byte) + sdb.SnapStorage = make(map[common.Address]map[string][]byte) } } return sdb, nil @@ -173,7 +177,7 @@ func (s *StateDB) StartPrefetcher(namespace string) { s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.OriginalRoot, namespace) } } @@ -333,7 +337,7 @@ func (s *StateDB) GetProof(addr common.Address) ([][]byte, error) { // GetProofByHash returns the Merkle proof for a given account. func (s *StateDB) GetProofByHash(addrHash common.Hash) ([][]byte, error) { var proof proofList - err := s.trie.Prove(addrHash[:], 0, &proof) + err := s.Trie.Prove(addrHash[:], 0, &proof) return proof, err } @@ -491,7 +495,7 @@ func (s *StateDB) updateStateObject(obj *StateObject) { panic(fmt.Errorf("can't encode object at %x: %v", addr[:], err)) } } - if err = s.trie.TryUpdate(addr[:], data); err != nil { + if err = s.Trie.TryUpdate(addr[:], data); err != nil { s.setError(fmt.Errorf("updateStateObject (%x) error: %v", addr[:], err)) } } @@ -504,7 +508,7 @@ func (s *StateDB) deleteStateObject(obj *StateObject) { } // Delete the account from the trie addr := obj.Address() - if err := s.trie.TryDelete(addr[:]); err != nil { + if err := s.Trie.TryDelete(addr[:]); err != nil { s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err)) } } @@ -632,18 +636,18 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject { } // If snapshot unavailable or reading from it failed, load from the database if s.snap == nil || err != nil { - if s.trie == nil { - tr, err := s.db.OpenTrie(s.originalRoot) + if s.Trie == nil { + tr, err := s.db.OpenTrie(s.OriginalRoot) if err != nil { s.setError(fmt.Errorf("failed to open trie tree")) return nil } - s.trie = tr + s.Trie = tr } if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountReads += time.Since(start) }(time.Now()) } - enc, err := s.trie.TryGet(addr.Bytes()) + enc, err := s.Trie.TryGet(addr.Bytes()) if err != nil { s.setError(fmt.Errorf("getDeleteStateObject (%x) error: %v", addr.Bytes(), err)) return nil @@ -683,9 +687,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject) var prevdestruct bool if s.snap != nil && prev != nil { - _, prevdestruct = s.snapDestructs[prev.addrHash] + _, prevdestruct = s.SnapDestructs[prev.address] if !prevdestruct { - s.snapDestructs[prev.addrHash] = struct{}{} + s.SnapDestructs[prev.address] = struct{}{} } } newobj = newObject(s, addr, Account{}) @@ -727,7 +731,7 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common it := trie.NewIterator(so.getTrie(db.db).NodeIterator(nil)) for it.Next() { - key := common.BytesToHash(db.trie.GetKey(it.Key)) + key := common.BytesToHash(db.Trie.GetKey(it.Key)) if value, dirty := so.dirtyStorage[key]; dirty { if !cb(key, value) { return nil @@ -754,7 +758,7 @@ func (s *StateDB) Copy() *StateDB { // Copy all the basic fields, initialize the memory ones state := &StateDB{ db: s.db, - trie: s.db.CopyTrie(s.trie), + Trie: s.db.CopyTrie(s.Trie), stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)), stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), @@ -822,29 +826,29 @@ func (s *StateDB) Copy() *StateDB { if s.prefetcher != nil { state.prefetcher = s.prefetcher.copy() } - if s.snaps != nil { + if s.Snaps != nil { // In order for the miner to be able to use and make additions // to the snapshot tree, we need to copy that aswell. // Otherwise, any block mined by ourselves will cause gaps in the tree, // and force the miner to operate trie-backed only - state.snaps = s.snaps + state.Snaps = s.Snaps state.snap = s.snap // deep copy needed - state.snapDestructs = make(map[common.Hash]struct{}) - for k, v := range s.snapDestructs { - state.snapDestructs[k] = v + state.SnapDestructs = make(map[common.Address]struct{}) + for k, v := range s.SnapDestructs { + state.SnapDestructs[k] = v } - state.snapAccounts = make(map[common.Hash][]byte) - for k, v := range s.snapAccounts { - state.snapAccounts[k] = v + state.SnapAccounts = make(map[common.Address][]byte) + for k, v := range s.SnapAccounts { + state.SnapAccounts[k] = v } - state.snapStorage = make(map[common.Hash]map[common.Hash][]byte) - for k, v := range s.snapStorage { - temp := make(map[common.Hash][]byte) + state.SnapStorage = make(map[common.Address]map[string][]byte) + for k, v := range s.SnapStorage { + temp := make(map[string][]byte) for kk, vv := range v { temp[kk] = vv } - state.snapStorage[k] = temp + state.SnapStorage[k] = temp } } return state @@ -903,9 +907,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // transactions within the same block might self destruct and then // ressurrect an account; but the snapshotter needs both events. if s.snap != nil { - s.snapDestructs[obj.addrHash] = struct{}{} // We need to maintain account deletions explicitly (will remain set indefinitely) - delete(s.snapAccounts, obj.addrHash) // Clear out any previously updated account data (may be recreated via a ressurrect) - delete(s.snapStorage, obj.addrHash) // Clear out any previously updated storage data (may be recreated via a ressurrect) + s.SnapDestructs[obj.address] = struct{}{} // We need to maintain account deletions explicitly (will remain set indefinitely) + delete(s.SnapAccounts, obj.address) // Clear out any previously updated account data (may be recreated via a ressurrect) + delete(s.SnapStorage, obj.address) // Clear out any previously updated storage data (may be recreated via a ressurrect) } } else { obj.finalise(true) // Prefetch slots in the background @@ -922,7 +926,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { } } if s.prefetcher != nil && len(addressesToPrefetch) > 0 { - s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + s.prefetcher.prefetch(s.OriginalRoot, addressesToPrefetch, emptyAddr) } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() @@ -932,6 +936,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // It is called in between transactions to get the root hash that // goes into transaction receipts. func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { + if s.DiffEnabled { + return s.Trie.Hash() + } // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) @@ -983,7 +990,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // at transaction boundary level to ensure we capture state clearing. if s.snap != nil && !obj.deleted { s.snapMux.Lock() - s.snapAccounts[obj.addrHash] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash) + s.SnapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash) s.snapMux.Unlock() } data, err := rlp.EncodeToBytes(obj) @@ -1000,16 +1007,16 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. if prefetcher != nil { - if trie := prefetcher.trie(s.originalRoot); trie != nil { - s.trie = trie + if trie := prefetcher.trie(s.OriginalRoot); trie != nil { + s.Trie = trie } } - if s.trie == nil { - tr, err := s.db.OpenTrie(s.originalRoot) + if s.Trie == nil { + tr, err := s.db.OpenTrie(s.OriginalRoot) if err != nil { panic(fmt.Sprintf("Failed to open trie tree")) } - s.trie = tr + s.Trie = tr } usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { @@ -1021,7 +1028,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure } if prefetcher != nil { - prefetcher.used(s.originalRoot, usedAddrs) + prefetcher.used(s.OriginalRoot, usedAddrs) } if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) @@ -1030,7 +1037,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) } - root := s.trie.Hash() + root := s.Trie.Hash() return root } @@ -1051,14 +1058,76 @@ func (s *StateDB) clearJournalAndRefund() { s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires } +func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) { + codeWriter := s.db.TrieDB().DiskDB().NewBatch() + for codeHash, code := range s.DiffCode { + rawdb.WriteCode(codeWriter, codeHash, code) + if codeWriter.ValueSize() >= ethdb.IdealBatchSize { + if err := codeWriter.Write(); err != nil { + return common.Hash{}, nil, err + } + codeWriter.Reset() + } + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } + } + for account, diff := range s.DiffTries { + root, err := diff.Commit(nil) + if err != nil { + return common.Hash{}, nil, err + } + s.db.CacheStorage(crypto.Keccak256Hash(account[:]), root, diff) + } + // Write the account trie changes, measuing the amount of wasted time + var start time.Time + if metrics.EnabledExpensive { + start = time.Now() + } + // The onleaf func is called _serially_, so we can reuse the same account + // for unmarshalling every time. + var account Account + root, err := s.Trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + if err := rlp.DecodeBytes(leaf, &account); err != nil { + return nil + } + // TODO, not sure here + if account.Root != emptyRoot { + s.db.TrieDB().Reference(account.Root, parent) + } + return nil + }) + if err != nil { + return common.Hash{}, nil, err + } + if metrics.EnabledExpensive { + s.AccountCommits += time.Since(start) + } + if root != emptyRoot { + s.db.CacheAccount(root, s.Trie) + } + + s.snap, s.SnapDestructs, s.SnapAccounts, s.SnapStorage = nil, nil, nil, nil + s.DiffTries, s.DiffCode = nil, nil + return root, s.DiffLayer, nil +} + // Commit writes the state to the underlying in-memory trie database. -func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { +func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) { if s.dbErr != nil { - return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) + return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } // Finalize any pending changes and merge everything into the tries root := s.IntermediateRoot(deleteEmptyObjects) - + if s.DiffEnabled { + return s.LightCommit() + } + var diffLayer *types.DiffLayer + if s.snap != nil { + diffLayer = &types.DiffLayer{} + } commitFuncs := []func() error{ func() error { // Commit objects to the trie, measuring the elapsed time @@ -1086,6 +1155,19 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { }() } + if s.snap != nil { + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + if obj.code != nil && obj.dirtyCode { + diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ + Hash: common.BytesToHash(obj.CodeHash()), + Code: obj.code, + }) + } + } + } + } + for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object @@ -1122,7 +1204,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. var account Account - root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + root, err := s.Trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { if err := rlp.DecodeBytes(leaf, &account); err != nil { return nil } @@ -1138,7 +1220,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { s.AccountCommits += time.Since(start) } if root != emptyRoot { - s.db.CacheAccount(root, s.trie) + s.db.CacheAccount(root, s.Trie) } return nil }, @@ -1150,18 +1232,23 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { } // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { + if err := s.Snaps.Update(root, parent, s.SnapDestructs, s.SnapAccounts, s.SnapStorage); err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) } // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state - if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err) + if err := s.Snaps.Cap(root, s.Snaps.CapLimit()); err != nil { + log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.Snaps.CapLimit(), "err", err) } } - s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil + } + return nil + }, + func() error { + if s.snap != nil { + diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() } return nil }, @@ -1176,11 +1263,40 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { for i := 0; i < len(commitFuncs); i++ { r := <-commitRes if r != nil { - return common.Hash{}, r + return common.Hash{}, nil, r } } + s.snap, s.SnapDestructs, s.SnapAccounts, s.SnapStorage = nil, nil, nil, nil + return root, diffLayer, nil +} - return root, nil +func (s *StateDB) SnapToDiffLayer() ([]common.Address, []types.DiffAccount, []types.DiffStorage) { + destructs := make([]common.Address, 0, len(s.SnapDestructs)) + for account := range s.SnapDestructs { + destructs = append(destructs, account) + } + accounts := make([]types.DiffAccount, 0, len(s.SnapAccounts)) + for accountHash, account := range s.SnapAccounts { + accounts = append(accounts, types.DiffAccount{ + Account: accountHash, + Blob: account, + }) + } + storages := make([]types.DiffStorage, 0, len(s.SnapStorage)) + for accountHash, storage := range s.SnapStorage { + keys := make([]string, 0, len(storage)) + values := make([][]byte, 0, len(storage)) + for k, v := range storage { + keys = append(keys, k) + values = append(values, v) + } + storages = append(storages, types.DiffStorage{ + Account: accountHash, + Keys: keys, + Vals: values, + }) + } + return destructs, accounts, storages } // PrepareAccessList handles the preparatory steps for executing a state transition with diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 9524e3730d..3b423ef67f 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -102,7 +102,7 @@ func TestIntermediateLeaks(t *testing.T) { } // Commit and cross check the databases. - transRoot, err := transState.Commit(false) + transRoot, _, err := transState.Commit(false) if err != nil { t.Fatalf("failed to commit transition state: %v", err) } @@ -110,7 +110,7 @@ func TestIntermediateLeaks(t *testing.T) { t.Errorf("can not commit trie %v to persistent database", transRoot.Hex()) } - finalRoot, err := finalState.Commit(false) + finalRoot, _, err := finalState.Commit(false) if err != nil { t.Fatalf("failed to commit final state: %v", err) } @@ -473,8 +473,8 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func TestTouchDelete(t *testing.T) { s := newStateTest() s.state.GetOrNewStateObject(common.Address{}) - root, _ := s.state.Commit(false) - s.state, _ = New(root, s.state.db, s.state.snaps) + root, _, _ := s.state.Commit(false) + s.state, _ = New(root, s.state.db, s.state.Snaps) snapshot := s.state.Snapshot() s.state.AddBalance(common.Address{}, new(big.Int)) @@ -675,8 +675,8 @@ func TestDeleteCreateRevert(t *testing.T) { addr := common.BytesToAddress([]byte("so")) state.SetBalance(addr, big.NewInt(1)) - root, _ := state.Commit(false) - state, _ = New(root, state.db, state.snaps) + root, _, _ := state.Commit(false) + state, _ = New(root, state.db, state.Snaps) // Simulate self-destructing in one transaction, then create-reverting in another state.Suicide(addr) @@ -687,8 +687,8 @@ func TestDeleteCreateRevert(t *testing.T) { state.RevertToSnapshot(id) // Commit the entire state and make sure we don't crash and have the correct state - root, _ = state.Commit(true) - state, _ = New(root, state.db, state.snaps) + root, _, _ = state.Commit(true) + state, _ = New(root, state.db, state.Snaps) if state.getStateObject(addr) != nil { t.Fatalf("self-destructed contract came alive") @@ -712,7 +712,7 @@ func TestMissingTrieNodes(t *testing.T) { a2 := common.BytesToAddress([]byte("another")) state.SetBalance(a2, big.NewInt(100)) state.SetCode(a2, []byte{1, 2, 4}) - root, _ = state.Commit(false) + root, _, _ = state.Commit(false) t.Logf("root: %x", root) // force-flush state.Database().TrieDB().Cap(0) @@ -736,7 +736,7 @@ func TestMissingTrieNodes(t *testing.T) { } // Modify the state state.SetBalance(addr, big.NewInt(2)) - root, err := state.Commit(false) + root, _, err := state.Commit(false) if err == nil { t.Fatalf("expected error, got root :%x", root) } diff --git a/core/state_processor.go b/core/state_processor.go index 858796b67a..8b9ea352f8 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -17,8 +17,18 @@ package core import ( + "bytes" + "errors" "fmt" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/core/rawdb" + + "github.com/ethereum/go-ethereum/core/state/snapshot" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -49,6 +59,157 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen } } +type LightStateProcessor struct { + StateProcessor +} + +func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { + // TODO fetch differ from somewhere else + var diffLayer *types.DiffLayer + if diffLayer == nil { + return p.StateProcessor.Process(block, statedb, cfg) + } + diffCode := make(map[common.Hash][]byte, len(diffLayer.Codes)) + + for _, des := range diffLayer.Destructs { + statedb.SnapDestructs[des] = struct{}{} + } + for _, account := range diffLayer.Accounts { + statedb.SnapAccounts[account.Account] = account.Blob + } + for _, storage := range diffLayer.Storages { + if len(storage.Keys) != len(storage.Vals) { + return nil, nil, 0, errors.New("invalid diffLayer: length of keys and values mismatch") + } + statedb.SnapStorage[storage.Account] = make(map[string][]byte, len(storage.Keys)) + n := len(storage.Keys) + for i := 0; i < n; i++ { + statedb.SnapStorage[storage.Account][storage.Keys[i]] = storage.Vals[i] + } + } + for _, c := range diffLayer.Codes { + // Verify code hash + diffCode[c.Hash] = c.Code + } + + for des := range statedb.SnapDestructs { + statedb.Trie.TryDelete(des[:]) + } + + // TODO need improve + for diffAccount, blob := range statedb.SnapAccounts { + addrHash := crypto.Keccak256Hash(diffAccount[:]) + latestAccount, err := snapshot.FullAccount(blob) + if err != nil { + return nil, nil, 0, err + } + + // fetch previous state + var previousAccount state.Account + enc, err := statedb.Trie.TryGet(diffAccount[:]) + if err != nil { + return nil, nil, 0, err + } + if len(enc) != 0 { + if err := rlp.DecodeBytes(enc, &previousAccount); err != nil { + return nil, nil, 0, err + } + } + + if previousAccount.Nonce == latestAccount.Nonce && + bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) && + previousAccount.Balance.Cmp(latestAccount.Balance) == 0 && + previousAccount.Root == common.BytesToHash(latestAccount.Root) { + log.Warn("receive redundant account change in diff layer") + delete(statedb.SnapAccounts, diffAccount) + delete(statedb.SnapStorage, diffAccount) + continue + } + + // update code + codeHash := common.BytesToHash(latestAccount.CodeHash) + if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) && len(latestAccount.CodeHash) != 0 { + if code, exist := diffCode[codeHash]; exist { + if crypto.Keccak256Hash(code) == codeHash { + return nil, nil, 0, errors.New("code and codeHash mismatch") + } + statedb.DiffCode[codeHash] = code + } else { + rawCode := rawdb.ReadCode(p.bc.db, codeHash) + if len(rawCode) == 0 { + return nil, nil, 0, errors.New("missing code in difflayer") + } + } + } + + //update storage + latestRoot := common.BytesToHash(latestAccount.Root) + if latestRoot != previousAccount.Root && latestRoot != (common.Hash{}) { + accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root) + if err != nil { + return nil, nil, 0, err + } + if storageChange, exist := statedb.SnapStorage[diffAccount]; exist { + for k, v := range storageChange { + if len(v) != 0 { + accountTrie.TryUpdate([]byte(k), v) + } else { + accountTrie.TryDelete([]byte(k)) + } + } + } else { + return nil, nil, 0, errors.New("missing storage change in difflayer") + } + // check storage root + accountRootHash := accountTrie.Hash() + if latestRoot != accountRootHash { + return nil, nil, 0, errors.New("account storage root mismatch") + } + statedb.DiffTries[diffAccount] = accountTrie + } else { + delete(statedb.SnapStorage, diffAccount) + } + + // update state trie + err = statedb.Trie.TryUpdate(diffAccount[:], blob) + if err != nil { + return nil, nil, 0, err + } + } + + // remove redundant storage change + for account, _ := range statedb.SnapStorage { + if _, exist := statedb.SnapAccounts[account]; !exist { + log.Warn("receive redundant storage change in diff layer") + delete(statedb.SnapStorage, account) + } + } + + // prune DiffLayer + if len(diffCode) != len(diffLayer.Codes) { + diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode)) + for hash, code := range diffCode { + diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ + Hash: hash, + Code: code, + }) + } + } + if len(statedb.SnapAccounts) != len(diffLayer.Accounts) || len(statedb.SnapStorage) != len(diffLayer.Storages) { + diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer() + } + statedb.DiffLayer = diffLayer + + var allLogs []*types.Log + var gasUsed uint64 + for _, receipt := range diffLayer.Receipts { + allLogs = append(allLogs, receipt.Logs...) + gasUsed += receipt.GasUsed + } + + return diffLayer.Receipts, allLogs, gasUsed, nil +} + // Process processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles. diff --git a/core/types/block.go b/core/types/block.go index b33493ef7d..cf0d7caa04 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -366,3 +366,30 @@ func (b *Block) Hash() common.Hash { } type Blocks []*Block + +// journalDestruct is an account deletion entry in a diffLayer's disk journal. +type DiffLayer struct { + Hash common.Hash + StateRoot common.Hash + Receipts Receipts // Receipts are duplicated stored to simplify the logic + Codes []DiffCode + Destructs []common.Address + Accounts []DiffAccount + Storages []DiffStorage +} + +type DiffCode struct { + Hash common.Hash + Code []byte +} + +type DiffAccount struct { + Account common.Address + Blob []byte +} + +type DiffStorage struct { + Account common.Address + Keys []string + Vals [][]byte +} diff --git a/eth/backend.go b/eth/backend.go index b52591fd71..273de40b6e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -128,9 +128,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { ethashConfig.NotifyFull = config.Miner.NotifyFull // Assemble the Ethereum object - chainDb, err := stack.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/", false) + chainDb, err := stack.OpenAndMergeDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, config.DatabaseDiff, "eth/db/chaindata/", false, config.PersistDiff) if err != nil { return nil, err + } + if config.PersistDiff { + } chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.OverrideBerlin) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 40dece429a..3166a8866a 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -159,6 +159,8 @@ type Config struct { DatabaseHandles int `toml:"-"` DatabaseCache int DatabaseFreezer string + DatabaseDiff string + PersistDiff bool TrieCleanCache int TrieCleanCacheJournal string `toml:",omitempty"` // Disk journal directory for trie cache to survive node restarts diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index fa31b78335..3cc9759787 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -40,6 +40,7 @@ func (c Config) MarshalTOML() (interface{}, error) { DatabaseHandles int `toml:"-"` DatabaseCache int DatabaseFreezer string + DatabaseDiff string TrieCleanCache int TrieCleanCacheJournal string `toml:",omitempty"` TrieCleanCacheRejournal time.Duration `toml:",omitempty"` @@ -48,6 +49,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TriesInMemory uint64 `toml:",omitempty"` SnapshotCache int Preimages bool + PersistDiff bool Miner miner.Config Ethash ethash.Config TxPool core.TxPoolConfig @@ -84,6 +86,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseCache = c.DatabaseCache enc.DatabaseFreezer = c.DatabaseFreezer + enc.DatabaseDiff = c.DatabaseDiff enc.TrieCleanCache = c.TrieCleanCache enc.TrieCleanCacheJournal = c.TrieCleanCacheJournal enc.TrieCleanCacheRejournal = c.TrieCleanCacheRejournal @@ -92,6 +95,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TriesInMemory = c.TriesInMemory enc.SnapshotCache = c.SnapshotCache enc.Preimages = c.Preimages + enc.PersistDiff = c.PersistDiff enc.Miner = c.Miner enc.Ethash = c.Ethash enc.TxPool = c.TxPool @@ -133,6 +137,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { DatabaseHandles *int `toml:"-"` DatabaseCache *int DatabaseFreezer *string + DatabaseDiff *string + PersistDiff *bool TrieCleanCache *int TrieCleanCacheJournal *string `toml:",omitempty"` TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` @@ -224,6 +230,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DatabaseFreezer != nil { c.DatabaseFreezer = *dec.DatabaseFreezer } + if dec.DatabaseDiff != nil { + c.DatabaseDiff = *dec.DatabaseDiff + } + if dec.PersistDiff != nil { + c.PersistDiff = *dec.PersistDiff + } if dec.TrieCleanCache != nil { c.TrieCleanCache = *dec.TrieCleanCache } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 84cfaf4d73..3367378831 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -117,7 +117,7 @@ func (eth *Ethereum) stateAtBlock(block *types.Block, reexec uint64, base *state return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie - root, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) + root, _, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) if err != nil { return nil, err } diff --git a/ethdb/database.go b/ethdb/database.go index 0dc14624b9..21d52062ff 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -17,7 +17,9 @@ // Package ethdb defines the interfaces for an Ethereum data store. package ethdb -import "io" +import ( + "io" +) // KeyValueReader wraps the Has and Get method of a backing data store. type KeyValueReader interface { @@ -118,11 +120,17 @@ type AncientStore interface { io.Closer } +type DiffStore interface { + DiffStore() KeyValueStore + SetDiffStore(diff KeyValueStore) +} + // Database contains all the methods required by the high level database to not // only access the key-value data store but also the chain freezer. type Database interface { Reader Writer + DiffStore Batcher Iteratee Stater diff --git a/node/node.go b/node/node.go index f2602dee47..720ed8fb34 100644 --- a/node/node.go +++ b/node/node.go @@ -27,6 +27,8 @@ import ( "strings" "sync" + "github.com/ethereum/go-ethereum/ethdb/leveldb" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" @@ -578,6 +580,22 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r return db, err } +func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, diff, namespace string, readonly, persistDiff bool) (ethdb.Database, error) { + chainDB, err := n.OpenDatabaseWithFreezer(name, cache, handles, freezer, namespace, readonly) + if err != nil { + return nil, err + } + if persistDiff { + diffStore, err := n.OpenDiffDatabase(name, handles, diff, namespace, readonly) + if err != nil { + chainDB.Close() + return nil, err + } + chainDB.SetDiffStore(diffStore) + } + return chainDB, nil +} + // OpenDatabaseWithFreezer opens an existing database with the given name (or // creates one if no previous can be found) from within the node's data directory, // also attaching a chain freezer to it that moves ancient chain data from the @@ -611,6 +629,30 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, return db, err } +func (n *Node) OpenDiffDatabase(name string, handles int, diff, namespace string, readonly bool) (*leveldb.Database, error) { + n.lock.Lock() + defer n.lock.Unlock() + if n.state == closedState { + return nil, ErrNodeStopped + } + + var db *leveldb.Database + var err error + if n.config.DataDir == "" { + panic("datadir is missing") + } else { + root := n.ResolvePath(name) + switch { + case diff == "": + diff = filepath.Join(root, "diff") + case !filepath.IsAbs(diff): + diff = n.ResolvePath(diff) + } + db, err = leveldb.New(diff, 0, handles, namespace, readonly) + } + return db, err +} + // ResolvePath returns the absolute path of a resource in the instance directory. func (n *Node) ResolvePath(x string) string { return n.config.ResolvePath(x) diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 19c79b6eed..77d4fd08d4 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -226,7 +226,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo } } // Commit and re-open to start with a clean state. - root, _ := statedb.Commit(false) + root, _, _ := statedb.Commit(false) var snaps *snapshot.Tree if snapshotter {