diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index b7398f2138..2b177acb7f 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -28,13 +28,16 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + "github.com/prometheus/tsdb/fileutil" ) const ( @@ -332,6 +335,216 @@ func (p *Pruner) Prune(root common.Hash) error { return prune(p.snaptree, root, p.db, p.stateBloom, filterName, middleRoots, start) } +type BlockPruner struct { + db ethdb.Database + oldAncientPath string + newAncientPath string + node *node.Node + BlockAmountReserved uint64 +} + +func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner { + return &BlockPruner{ + db: db, + oldAncientPath: oldAncientPath, + newAncientPath: newAncientPath, + node: n, + BlockAmountReserved: BlockAmountReserved, + } +} + +func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + // Open old db wrapper. + chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt) + if err != nil { + log.Error("Failed to open ancient database", "err=", err) + return err + } + defer chainDb.Close() + log.Info("chainDB opened successfully") + + // Get the number of items in old ancient db. + itemsOfAncient, err := chainDb.ItemAmountInAncient() + log.Info("the number of items in ancientDB is ", "itemsOfAncient", itemsOfAncient) + + // If we can't access the freezer or it's empty, abort. + if err != nil || itemsOfAncient == 0 { + log.Error("can't access the freezer or it's empty, abort") + return errors.New("can't access the freezer or it's empty, abort") + } + + // If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop. + if itemsOfAncient < p.BlockAmountReserved { + log.Error("the number of old blocks is not enough to reserve,", "ancient items", itemsOfAncient, "the amount specified", p.BlockAmountReserved) + return errors.New("the number of old blocks is not enough to reserve") + } + + var oldOffSet uint64 + if interrupt { + // The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently, + // should use last version of offset for oldAncientDB, because current offset is + // actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup. + oldOffSet = rawdb.ReadOffSetOfLastAncientFreezer(chainDb) + } else { + // Using current version of ancientDB for oldOffSet because the db for backup is current version. + oldOffSet = rawdb.ReadOffSetOfCurrentAncientFreezer(chainDb) + } + log.Info("the oldOffSet is ", "oldOffSet", oldOffSet) + + // Get the start BlockNumber for pruning. + startBlockNumber := oldOffSet + itemsOfAncient - p.BlockAmountReserved + log.Info("new offset/new startBlockNumber is ", "new offset", startBlockNumber) + + // Create new ancientdb backup and record the new and last version of offset in kvDB as well. + // For every round, newoffset actually equals to the startBlockNumber in ancient backup db. + frdbBack, err := rawdb.NewFreezerDb(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber) + if err != nil { + log.Error("Failed to create ancient freezer backup", "err=", err) + return err + } + defer frdbBack.Close() + + offsetBatch := chainDb.NewBatch() + rawdb.WriteOffSetOfCurrentAncientFreezer(offsetBatch, startBlockNumber) + rawdb.WriteOffSetOfLastAncientFreezer(offsetBatch, oldOffSet) + if err := offsetBatch.Write(); err != nil { + log.Crit("Failed to write offset into disk", "err", err) + } + + // It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist. + lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("file lock error", "err", err) + return err + } + + log.Info("prune info", "old offset", oldOffSet, "number of items in ancientDB", itemsOfAncient, "amount to reserve", p.BlockAmountReserved) + log.Info("new offset/new startBlockNumber recorded successfully ", "new offset", startBlockNumber) + + start := time.Now() + // All ancient data after and including startBlockNumber should write into new ancientDB ancient_back. + for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffSet; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber) + block := rawdb.ReadBlock(chainDb, blockHash, blockNumber) + receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber) + borReceipts := []*types.Receipt{rawdb.ReadBorReceipt(chainDb, blockHash, blockNumber)} + + // Calculate the total difficulty of the block + td := rawdb.ReadTd(chainDb, blockHash, blockNumber) + if td == nil { + return consensus.ErrUnknownAncestor + } + // Write into new ancient_back db. + if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, []types.Receipts{borReceipts}, td); err != nil { + log.Error("failed to write new ancient", "error", err) + return err + } + // Print the log every 5s for better trace. + if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) { + log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber) + start = time.Now() + } + } + lock.Release() + log.Info("block back up done", "current start blockNumber in ancientDB", startBlockNumber) + return nil +} + +// Backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db. +func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + start := time.Now() + + if err := p.backUpOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil { + return err + } + + log.Info("Block pruning BackUp successfully", "time duration since start is", common.PrettyDuration(time.Since(start))) + return nil +} + +func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error { + log.Info("RecoverInterruption for block prune") + newExist, err := CheckFileExist(p.newAncientPath) + if err != nil { + log.Error("newAncientDb path error") + return err + } + + if newExist { + log.Info("New ancientDB_backup existed in interruption scenario") + flockOfAncientBack, err := CheckFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("Failed to check flock of ancientDB_Back %v", err) + return err + } + + // Indicating both old and new ancientDB existed concurrently. + // Delete directly for the new ancientdb to prune from start, e.g.: path ../chaindb/ancient_backup + if err := os.RemoveAll(p.newAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + if flockOfAncientBack { + // Indicating the oldOffset/newOffset have already been updated. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, true); err != nil { + log.Error("Failed to prune") + return err + } + } else { + // Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + } + + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } else { + log.Info("New ancientDB_backup did not exist in interruption scenario") + // Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual, + // in this case, the new offset have not been written into kvDB. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } + + return nil +} + +func CheckFileExist(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + // Indicating the file didn't exist. + return false, nil + } + return true, err + } + return true, nil +} + +func (p *BlockPruner) AncientDbReplacer() error { + // Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient + if err := os.RemoveAll(p.oldAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + + // Rename the new ancientdb path same to the old + if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil { + log.Error("Failed to rename new ancient directory") + return err + } + return nil +} + // RecoverPruning will resume the pruning procedure during the system restart. // This function is used in this case: user tries to prune state data, but the // system was interrupted midway because of crash or manual-kill. In this case diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 22af24789a..03a3a07d8d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -640,8 +640,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * d.ancientLimit = 0 } } - - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.ItemAmountInAncient() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly.