From 25c1d01a3f92477da7d22859cf8437e14c7a9fdb Mon Sep 17 00:00:00 2001 From: Krish Date: Thu, 22 Aug 2024 16:34:42 +0800 Subject: [PATCH 1/8] fix sequencer recover --- core/blockchain.go | 55 +++++++++++++++ eth/catalyst/api.go | 6 ++ eth/catalyst/queue.go | 18 +++++ eth/downloader/downloader.go | 4 ++ eth/downloader/fetchers.go | 14 ++++ eth/sync.go | 2 +- miner/fix_manager.go | 130 +++++++++++++++++++++++++++++++++++ miner/miner.go | 11 ++- miner/payload_building.go | 83 ++++++++++++++++++++++ miner/worker.go | 16 ++++- 10 files changed, 335 insertions(+), 4 deletions(-) create mode 100644 miner/fix_manager.go diff --git a/core/blockchain.go b/core/blockchain.go index 2250ccd6c6..a40364168d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2204,6 +2204,10 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i return 0, nil } +func (bc *BlockChain) RecoverAncestors(block *types.Block) (common.Hash, error) { + return bc.recoverAncestorsWithSethead(block) +} + // recoverAncestors finds the closest ancestor with available state and re-execute // all the ancestor blocks since that. // recoverAncestors is only used post-merge. @@ -2255,6 +2259,57 @@ func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) return block.Hash(), nil } +// recoverAncestors finds the closest ancestor with available state and re-execute +// all the ancestor blocks since that. +// recoverAncestors is only used post-merge. +// We return the hash of the latest block that we could correctly validate. +func (bc *BlockChain) recoverAncestorsWithSethead(block *types.Block) (common.Hash, error) { + // Gather all the sidechain hashes (full blocks may be memory heavy) + var ( + hashes []common.Hash + numbers []uint64 + parent = block + ) + for parent != nil && !bc.HasState(parent.Root()) { + if bc.stateRecoverable(parent.Root()) { + if err := bc.triedb.Recover(parent.Root()); err != nil { + return common.Hash{}, err + } + break + } + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.NumberU64()) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + + // If the chain is terminating, stop iteration + if bc.insertStopped() { + log.Debug("Abort during blocks iteration") + return common.Hash{}, errInsertionInterrupted + } + } + if parent == nil { + return common.Hash{}, errors.New("missing parent") + } + // Import all the pruned blocks to make the state available + for i := len(hashes) - 1; i >= 0; i-- { + // If the chain is terminating, stop processing blocks + if bc.insertStopped() { + log.Debug("Abort during blocks processing") + return common.Hash{}, errInsertionInterrupted + } + var b *types.Block + if i == 0 { + b = block + } else { + b = bc.GetBlock(hashes[i], numbers[i]) + } + if _, err := bc.insertChain(types.Blocks{b}, true); err != nil { + return b.ParentHash(), err + } + } + return block.Hash(), nil +} + // collectLogs collects the logs that were generated or removed during // the processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 056c1c1abc..4e659b8107 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -412,6 +412,12 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl // If we already are busy generating this work, then we do not need // to start a second process. if api.localBlocks.has(id) { + payload := api.localBlocks.getWithoutStatus(id) + if payload != nil { + // fix is running, listening the status of fix routine + log.Info("step into listenFix") + api.eth.Miner().Worker().ListenFixCompletion(id, payload, args) + } return valid(&id), nil } payload, err := api.eth.Miner().BuildPayload(args) diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index d42904843b..882d9d6535 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/miner" ) @@ -92,6 +93,23 @@ func (q *payloadQueue) get(id engine.PayloadID, full bool) *engine.ExecutionPayl return nil } +// getWithoutStatus retrieves a previously stored payload item or nil if it does not exist. +func (q *payloadQueue) getWithoutStatus(id engine.PayloadID) *miner.Payload { + q.lock.RLock() + defer q.lock.RUnlock() + + for _, item := range q.payloads { + if item == nil { + log.Info("getting payload not found", "id", id) + return nil // no more items + } + if item.id == id { + return item.payload + } + } + return nil +} + // waitFull waits until the first full payload has been built for the specified payload id // The method returns immediately if the payload is unknown. func (q *payloadQueue) waitFull(id engine.PayloadID) error { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 3f39b19485..8f67e904cd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1846,3 +1846,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta)) d.syncLogTime = time.Now() } + +func (d *Downloader) GetAllPeers() []*peerConnection { + return d.peers.AllPeers() +} diff --git a/eth/downloader/fetchers.go b/eth/downloader/fetchers.go index cc4279b0da..05898695ca 100644 --- a/eth/downloader/fetchers.go +++ b/eth/downloader/fetchers.go @@ -17,6 +17,7 @@ package downloader import ( + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -113,3 +114,16 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil } } + +func (d *Downloader) GetHeaderByHashFromPeer(peer *peerConnection, blockHash common.Hash) (*types.Header, error) { + headers, _, err := d.fetchHeadersByHash(peer, blockHash, 1, 0, false) + if err != nil { + return nil, fmt.Errorf("failed to fetch header from peer: %v", err) + } + + if len(headers) == 0 { + return nil, fmt.Errorf("no headers returned for hash: %v", blockHash) + } + + return headers[0], nil +} diff --git a/eth/sync.go b/eth/sync.go index a9732147fa..8587854812 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -212,7 +212,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { if !cs.handler.chain.NoTries() && !cs.handler.chain.HasState(head.Root) { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) - log.Info("Reenabled snap sync as chain is stateless") + log.Info("Reenabled snap sync as chain is stateless", "lost block", block.Number.Uint64()) return downloader.SnapSync, td } // Nope, we're really full syncing diff --git a/miner/fix_manager.go b/miner/fix_manager.go new file mode 100644 index 0000000000..dbcd237167 --- /dev/null +++ b/miner/fix_manager.go @@ -0,0 +1,130 @@ +package miner + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/log" +) + +// FixManager manages the fix operation state and notification mechanism. +type FixManager struct { + mutex sync.Mutex // Protects access to fix state + isFixInProgress bool // Tracks if a fix operation is in progress + fixChannels sync.Map // Stores fix state and notification channels + listenerStarted sync.Map // Tracks whether a listener goroutine has started for each payload ID + downloader *downloader.Downloader // Used to trigger BeaconSync operations + +} + +// NewFixManager initializes a FixManager with required dependencies +func NewFixManager(downloader *downloader.Downloader) *FixManager { + return &FixManager{ + downloader: downloader, + } +} + +// StartFix launches a goroutine to manage the fix process and tracks the fix state. +func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) { + fm.mutex.Lock() + defer fm.mutex.Unlock() + + if !fm.isFixInProgress { + fm.isFixInProgress = true + fixChan := make(chan struct{}) + fm.fixChannels.Store(id, fixChan) + + go func() { + defer func() { + fm.mutex.Lock() + fm.isFixInProgress = false + fm.mutex.Unlock() + + // Notify listeners that the fix is complete + if ch, ok := fm.fixChannels.Load(id); ok { + close(ch.(chan struct{})) + } + }() + worker.fix(parentHash) // Execute the fix logic + }() + } +} + +// ListenFixCompletion listens for the completion of the fix process to avoid redundant goroutine starts. +// +// payload: The payload that will be updated after fix completion. +// args: The arguments required to retry the payload update. +func (fm *FixManager) ListenFixCompletion(worker *worker, id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) { + ch, exists := fm.fixChannels.Load(id) + if !exists { + log.Info("payload is not fixing or has been completed") + return + } + + // Check if a listener goroutine has already been started + if _, listenerExists := fm.listenerStarted.LoadOrStore(id, true); listenerExists { + log.Info("Listener already started for payload", "payload", id) + return // If listener goroutine already exists, return immediately + } + + go func() { + log.Info("start waiting") + <-ch.(chan struct{}) // Wait for the fix to complete + log.Info("Fix completed, retrying payload update", "id", id) + worker.retryPayloadUpdate(args, payload) + fm.fixChannels.Delete(id) // Remove the id from fixChannels + fm.listenerStarted.Delete(id) // Remove the listener flag for this id + }() +} + +// RecoverFromLocal attempts to recover the block and MPT data from the local chain. +// +// blockHash: The latest header(unsafe block) hash of the block to recover. +func (fm *FixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { + block := w.chain.GetBlockByHash(blockHash) + if block == nil { + return fmt.Errorf("block not found in local chain") + } + + log.Info("Fixing data for block", "blocknumber", block.NumberU64()) + latestValid, err := w.chain.RecoverAncestors(block) + if err != nil { + return fmt.Errorf("failed to recover ancestors: %v", err) + } + + log.Info("Recovered ancestors up to block", "latestValid", latestValid) + return nil +} + +// RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful. +// +// blockHash: The latest header(unsafe block) hash of the block to recover. +func (fm *FixManager) RecoverFromPeer(blockHash common.Hash) error { + peers := fm.downloader.GetAllPeers() + if len(peers) == 0 { + return fmt.Errorf("no peers available") + } + + var header *types.Header + var err error + for _, peer := range peers { + header, err = fm.downloader.GetHeaderByHashFromPeer(peer, blockHash) + if err == nil && header != nil { + break + } + log.Warn("Failed to retrieve header from peer", "err", err) + } + + if header == nil { + return fmt.Errorf("failed to retrieve header from all valid peers") + } + + log.Info("Successfully retrieved header from peer", "blockHash", blockHash) + + fm.downloader.BeaconSync(downloader.FullSync, header, nil) + return nil +} diff --git a/miner/miner.go b/miner/miner.go index 2174d6ca81..d3d283f83e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/consensus/misc/eip1559" - "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "math/big" "sync" "time" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" @@ -86,6 +87,7 @@ var DefaultMevConfig = MevConfig{ type Backend interface { BlockChain() *core.BlockChain TxPool() *txpool.TxPool + Downloader() *downloader.Downloader } type BackendWithHistoricalState interface { @@ -300,6 +302,11 @@ func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { return miner.worker.buildPayload(args) } +// Worker builds the payload according to the provided parameters. +func (miner *Miner) Worker() *worker { + return miner.worker +} + func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) { env, err := miner.prepareSimulationEnv() diff --git a/miner/payload_building.go b/miner/payload_building.go index f3b8382783..7aff5e19c5 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "errors" "math/big" + "strings" "sync" "sync/atomic" "time" @@ -261,6 +262,38 @@ func (payload *Payload) stopBuilding() { }) } +// fix attempts to recover and repair the block and its associated data (such as MPT) +// either from the local blockchain or from peers. +// +// In most cases, the block can be recovered from the local node's data. However, +// there is a corner case where this may not be possible: If the sequencer +// broadcasts a block but the local node crashes before fully writing the block to its local +// storage, the local chain might be lagging behind by one block compared to peers. +// In such cases, we need to recover the missing block data from peers. +// +// The function first tries to recover the block using the local blockchain via the +// fixManager.RecoverFromLocal method. If local recovery fails (e.g., due to the node +// missing the block), it attempts to retrieve the block header from peers and triggers +// +// blockHash: The hash of the latest block that needs to be recovered and fixed. +func (w *worker) fix(blockHash common.Hash) { + log.Info("Fix operation started") + + err := w.fixManager.RecoverFromLocal(w, blockHash) + if err != nil { + log.Warn("Local recovery failed, trying to recover from peers", "err", err) + + err = w.fixManager.RecoverFromPeer(blockHash) + if err != nil { + log.Error("Failed to recover from peers", "err", err) + return + } + } + + log.Info("Fix operation completed") + +} + // buildPayload builds the payload according to the provided parameters. func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { if args.NoTxPool { // don't start the background payload updating job if there is no tx pool to pull from @@ -348,8 +381,17 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { start := time.Now() // getSealingBlock is interrupted by shared interrupt r := w.getSealingBlock(fullParams) + + // if state missing, init fixing routine + if r.err != nil && strings.Contains(r.err.Error(), "missing trie node") { + log.Info("step into fixing") + w.StartFix(payload.id, fullParams.parentHash) + return 0 + } + dur := time.Since(start) // update handles error case + payload.update(r, dur, func() { w.cacheMiningBlock(r.block, r.env) }) @@ -392,6 +434,47 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { return payload, nil } +// retryPayloadUpdate retries the payload update process after a fix operation. +// +// This function reconstructs the block using the provided BuildPayloadArgs and +// attempts to update the payload in the system. It performs validation of the +// block parameters and updates the payload if the block is successfully built. +func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) { + fullParams := &generateParams{ + timestamp: args.Timestamp, + forceTime: true, + parentHash: args.Parent, + coinbase: args.FeeRecipient, + random: args.Random, + withdrawals: args.Withdrawals, + beaconRoot: args.BeaconRoot, + noTxs: false, + txs: args.Transactions, + gasLimit: args.GasLimit, + } + + // Since we skip building the empty block when using the tx pool, we need to explicitly + // validate the BuildPayloadArgs here. + _, err := w.validateParams(fullParams) + if err != nil { + return + } + + // set shared interrupt + fullParams.interrupt = payload.interrupt + + r := w.getSealingBlock(fullParams) + if r.err != nil { + log.Error("Failed to build full payload after fix", "id", payload.id, "err", r.err) + return + } + + payload.update(r, 0, func() { + w.cacheMiningBlock(r.block, r.env) + }) + log.Info("Successfully updated payload after fix", "id", payload.id) +} + func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { var ( start = time.Now() diff --git a/miner/worker.go b/miner/worker.go index 292a2e9ad0..1e5ab75657 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,12 +20,14 @@ import ( "context" "errors" "fmt" - mapset "github.com/deckarep/golang-set/v2" "math/big" "sync" "sync/atomic" "time" + mapset "github.com/deckarep/golang-set/v2" + + "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -265,6 +267,17 @@ type worker struct { // MEV bundleCache *BundleCache + + // FixManager + fixManager *FixManager +} + +func (w *worker) StartFix(id engine.PayloadID, parentHash common.Hash) { + w.fixManager.StartFix(w, id, parentHash) +} + +func (w *worker) ListenFixCompletion(id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) { + w.fixManager.ListenFixCompletion(w, id, payload, args) } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { @@ -290,6 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), bundleCache: NewBundleCache(), + fixManager: NewFixManager(eth.Downloader()), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) From 5aa9b466eff38c63a60eacb00c7e7f0d8bf9706b Mon Sep 17 00:00:00 2001 From: Krish Date: Mon, 9 Sep 2024 10:53:29 +0800 Subject: [PATCH 2/8] fix ut --- core/blockchain.go | 104 +++++++++++++++++++------------------- miner/fix_manager.go | 51 ++++++++++++------- miner/miner_test.go | 4 ++ miner/payload_building.go | 40 +++++++++++---- miner/worker_test.go | 6 ++- 5 files changed, 122 insertions(+), 83 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a40364168d..a4d6960f1d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2204,8 +2204,8 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i return 0, nil } -func (bc *BlockChain) RecoverAncestors(block *types.Block) (common.Hash, error) { - return bc.recoverAncestorsWithSethead(block) +func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, error) { + return bc.recoverStateAndSetHead(block) } // recoverAncestors finds the closest ancestor with available state and re-execute @@ -2259,57 +2259,6 @@ func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) return block.Hash(), nil } -// recoverAncestors finds the closest ancestor with available state and re-execute -// all the ancestor blocks since that. -// recoverAncestors is only used post-merge. -// We return the hash of the latest block that we could correctly validate. -func (bc *BlockChain) recoverAncestorsWithSethead(block *types.Block) (common.Hash, error) { - // Gather all the sidechain hashes (full blocks may be memory heavy) - var ( - hashes []common.Hash - numbers []uint64 - parent = block - ) - for parent != nil && !bc.HasState(parent.Root()) { - if bc.stateRecoverable(parent.Root()) { - if err := bc.triedb.Recover(parent.Root()); err != nil { - return common.Hash{}, err - } - break - } - hashes = append(hashes, parent.Hash()) - numbers = append(numbers, parent.NumberU64()) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) - - // If the chain is terminating, stop iteration - if bc.insertStopped() { - log.Debug("Abort during blocks iteration") - return common.Hash{}, errInsertionInterrupted - } - } - if parent == nil { - return common.Hash{}, errors.New("missing parent") - } - // Import all the pruned blocks to make the state available - for i := len(hashes) - 1; i >= 0; i-- { - // If the chain is terminating, stop processing blocks - if bc.insertStopped() { - log.Debug("Abort during blocks processing") - return common.Hash{}, errInsertionInterrupted - } - var b *types.Block - if i == 0 { - b = block - } else { - b = bc.GetBlock(hashes[i], numbers[i]) - } - if _, err := bc.insertChain(types.Blocks{b}, true); err != nil { - return b.ParentHash(), err - } - } - return block.Hash(), nil -} - // collectLogs collects the logs that were generated or removed during // the processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { @@ -2764,6 +2713,55 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) { return 0, err } +// recoverStateAndSetHead attempts to recover the state of the blockchain by re-importing +// missing blocks and advancing the chain head. It ensures the state is available +// for the given block and its ancestors before updating the head. +func (bc *BlockChain) recoverStateAndSetHead(block *types.Block) (common.Hash, error) { + var ( + hashes []common.Hash + numbers []uint64 + parent = block + ) + for parent != nil && !bc.HasState(parent.Root()) { + if bc.stateRecoverable(parent.Root()) { + if err := bc.triedb.Recover(parent.Root()); err != nil { + return common.Hash{}, err + } + break + } + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.NumberU64()) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + + // If the chain is terminating, stop iteration + if bc.insertStopped() { + log.Debug("Abort during blocks iteration") + return common.Hash{}, errInsertionInterrupted + } + } + if parent == nil { + return common.Hash{}, errors.New("missing parent") + } + // Import all the pruned blocks to make the state available + for i := len(hashes) - 1; i >= 0; i-- { + // If the chain is terminating, stop processing blocks + if bc.insertStopped() { + log.Debug("Abort during blocks processing") + return common.Hash{}, errInsertionInterrupted + } + var b *types.Block + if i == 0 { + b = block + } else { + b = bc.GetBlock(hashes[i], numbers[i]) + } + if _, err := bc.insertChain(types.Blocks{b}, true); err != nil { + return b.ParentHash(), err + } + } + return block.Hash(), nil +} + // SetBlockValidatorAndProcessorForTesting sets the current validator and processor. // This method can be used to force an invalid blockchain to be verified for tests. // This method is unsafe and should only be used before block import starts. diff --git a/miner/fix_manager.go b/miner/fix_manager.go index dbcd237167..933a7c040e 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -21,6 +21,12 @@ type FixManager struct { } +// FixResult holds the result of the fix operation +type FixResult struct { + Success bool + Err error +} + // NewFixManager initializes a FixManager with required dependencies func NewFixManager(downloader *downloader.Downloader) *FixManager { return &FixManager{ @@ -35,8 +41,8 @@ func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash c if !fm.isFixInProgress { fm.isFixInProgress = true - fixChan := make(chan struct{}) - fm.fixChannels.Store(id, fixChan) + resultChan := make(chan FixResult, 1) // Channel to capture fix result (success or error) + fm.fixChannels.Store(id, resultChan) go func() { defer func() { @@ -44,12 +50,12 @@ func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash c fm.isFixInProgress = false fm.mutex.Unlock() - // Notify listeners that the fix is complete if ch, ok := fm.fixChannels.Load(id); ok { - close(ch.(chan struct{})) + resultChan := ch.(chan FixResult) + close(resultChan) } }() - worker.fix(parentHash) // Execute the fix logic + worker.fix(parentHash, resultChan) // processing fix logic }() } } @@ -61,23 +67,34 @@ func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash c func (fm *FixManager) ListenFixCompletion(worker *worker, id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) { ch, exists := fm.fixChannels.Load(id) if !exists { - log.Info("payload is not fixing or has been completed") + log.Info("Payload is not fixing or has been completed") return } // Check if a listener goroutine has already been started if _, listenerExists := fm.listenerStarted.LoadOrStore(id, true); listenerExists { log.Info("Listener already started for payload", "payload", id) - return // If listener goroutine already exists, return immediately + return } go func() { - log.Info("start waiting") - <-ch.(chan struct{}) // Wait for the fix to complete - log.Info("Fix completed, retrying payload update", "id", id) - worker.retryPayloadUpdate(args, payload) - fm.fixChannels.Delete(id) // Remove the id from fixChannels - fm.listenerStarted.Delete(id) // Remove the listener flag for this id + log.Info("Start waiting for fix completion") + result := <-ch.(chan FixResult) // Wait for the fix result + + // Check the result and decide whether to retry the payload update + if result.Success { + if err := worker.retryPayloadUpdate(args, payload); err != nil { + log.Error("Failed to retry payload update after fix", "id", id, "err", err) + } else { + log.Info("Payload update after fix succeeded", "id", id) + } + } else { + log.Error("Fix failed, skipping payload update", "id", id, "err", result.Err) + } + + // Clean up the fix state + fm.fixChannels.Delete(id) + fm.listenerStarted.Delete(id) }() } @@ -90,13 +107,13 @@ func (fm *FixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { return fmt.Errorf("block not found in local chain") } - log.Info("Fixing data for block", "blocknumber", block.NumberU64()) - latestValid, err := w.chain.RecoverAncestors(block) + log.Info("Fixing data for block", "block number", block.NumberU64()) + latestValid, err := w.chain.RecoverStateAndSetHead(block) if err != nil { - return fmt.Errorf("failed to recover ancestors: %v", err) + return fmt.Errorf("failed to recover state: %v", err) } - log.Info("Recovered ancestors up to block", "latestValid", latestValid) + log.Info("Recovered states up to block", "latestValid", latestValid) return nil } diff --git a/miner/miner_test.go b/miner/miner_test.go index 411d6026ce..24c626d12b 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -59,6 +59,10 @@ func (m *mockBackend) TxPool() *txpool.TxPool { return m.txPool } +func (m *mockBackend) Downloader() *downloader.Downloader { + return nil +} + func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { return nil, errors.New("not supported") } diff --git a/miner/payload_building.go b/miner/payload_building.go index 7aff5e19c5..22d66117a3 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -20,6 +20,7 @@ import ( "crypto/sha256" "encoding/binary" "errors" + "fmt" "math/big" "strings" "sync" @@ -276,22 +277,32 @@ func (payload *Payload) stopBuilding() { // missing the block), it attempts to retrieve the block header from peers and triggers // // blockHash: The hash of the latest block that needs to be recovered and fixed. -func (w *worker) fix(blockHash common.Hash) { +func (w *worker) fix(blockHash common.Hash, resultChan chan FixResult) { log.Info("Fix operation started") + // Try to recover from local data err := w.fixManager.RecoverFromLocal(w, blockHash) if err != nil { - log.Warn("Local recovery failed, trying to recover from peers", "err", err) - - err = w.fixManager.RecoverFromPeer(blockHash) - if err != nil { - log.Error("Failed to recover from peers", "err", err) + // Only proceed to peer recovery if the error is "block not found in local chain" + if strings.Contains(err.Error(), "block not found") { + log.Warn("Local recovery failed, trying to recover from peers", "err", err) + + // Try to recover from peers + err = w.fixManager.RecoverFromPeer(blockHash) + if err != nil { + log.Error("Failed to recover from peers", "err", err) + resultChan <- FixResult{Success: false, Err: err} + return + } + } else { + log.Error("Failed to recover from local data", "err", err) + resultChan <- FixResult{Success: false, Err: err} return } } - log.Info("Fix operation completed") - + log.Info("Fix operation completed successfully") + resultChan <- FixResult{Success: true, Err: nil} } // buildPayload builds the payload according to the provided parameters. @@ -439,7 +450,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { // This function reconstructs the block using the provided BuildPayloadArgs and // attempts to update the payload in the system. It performs validation of the // block parameters and updates the payload if the block is successfully built. -func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) { +func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) error { fullParams := &generateParams{ timestamp: args.Timestamp, forceTime: true, @@ -457,7 +468,8 @@ func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) { // validate the BuildPayloadArgs here. _, err := w.validateParams(fullParams) if err != nil { - return + log.Error("Failed to validate payload parameters", "id", payload.id, "err", err) + return fmt.Errorf("failed to validate payload parameters: %w", err) } // set shared interrupt @@ -466,13 +478,19 @@ func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) { r := w.getSealingBlock(fullParams) if r.err != nil { log.Error("Failed to build full payload after fix", "id", payload.id, "err", r.err) - return + return fmt.Errorf("failed to build full payload after fix: %w", r.err) } payload.update(r, 0, func() { w.cacheMiningBlock(r.block, r.env) }) + + if r.err == nil { + fullParams.isUpdate = true + } + log.Info("Successfully updated payload after fix", "id", payload.id) + return nil } func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { diff --git a/miner/worker_test.go b/miner/worker_test.go index aa05565301..c84a03cd01 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" @@ -144,8 +145,9 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine } } -func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } -func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } +func (b *testWorkerBackend) Downloader() *downloader.Downloader { return nil } func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { var tx *types.Transaction From 13b52eafcfb5e01ab847611ee2b28ec63a59de2c Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 29 Oct 2024 17:06:02 +0800 Subject: [PATCH 3/8] refine: simplified the fix logic from asynchronous to synchronous --- eth/catalyst/api.go | 6 --- miner/fix_manager.go | 88 +++++++++------------------------------ miner/payload_building.go | 65 ++++++++++++++++++++++------- miner/worker.go | 15 +++---- 4 files changed, 75 insertions(+), 99 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 07dcec5c49..e8703f7532 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -420,12 +420,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl // If we already are busy generating this work, then we do not need // to start a second process. if api.localBlocks.has(id) { - payload := api.localBlocks.getWithoutStatus(id) - if payload != nil { - // fix is running, listening the status of fix routine - log.Info("step into listenFix") - api.eth.Miner().Worker().ListenFixCompletion(id, payload, args) - } return valid(&id), nil } // If the beacon chain is ran by a simulator, then transaction insertion, diff --git a/miner/fix_manager.go b/miner/fix_manager.go index 933a7c040e..afb62840e6 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -11,97 +11,49 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// FixManager manages the fix operation state and notification mechanism. -type FixManager struct { +// StateFixManager manages the fix operation state and notification mechanism. +type StateFixManager struct { mutex sync.Mutex // Protects access to fix state isFixInProgress bool // Tracks if a fix operation is in progress - fixChannels sync.Map // Stores fix state and notification channels - listenerStarted sync.Map // Tracks whether a listener goroutine has started for each payload ID downloader *downloader.Downloader // Used to trigger BeaconSync operations - -} - -// FixResult holds the result of the fix operation -type FixResult struct { - Success bool - Err error } // NewFixManager initializes a FixManager with required dependencies -func NewFixManager(downloader *downloader.Downloader) *FixManager { - return &FixManager{ +func NewFixManager(downloader *downloader.Downloader) *StateFixManager { + return &StateFixManager{ downloader: downloader, } } // StartFix launches a goroutine to manage the fix process and tracks the fix state. -func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) { +func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) error { fm.mutex.Lock() defer fm.mutex.Unlock() - if !fm.isFixInProgress { - fm.isFixInProgress = true - resultChan := make(chan FixResult, 1) // Channel to capture fix result (success or error) - fm.fixChannels.Store(id, resultChan) - - go func() { - defer func() { - fm.mutex.Lock() - fm.isFixInProgress = false - fm.mutex.Unlock() - - if ch, ok := fm.fixChannels.Load(id); ok { - resultChan := ch.(chan FixResult) - close(resultChan) - } - }() - worker.fix(parentHash, resultChan) // processing fix logic - }() + if fm.isFixInProgress { + log.Warn("Fix is already in progress for this block", "id", id) + return nil } -} -// ListenFixCompletion listens for the completion of the fix process to avoid redundant goroutine starts. -// -// payload: The payload that will be updated after fix completion. -// args: The arguments required to retry the payload update. -func (fm *FixManager) ListenFixCompletion(worker *worker, id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) { - ch, exists := fm.fixChannels.Load(id) - if !exists { - log.Info("Payload is not fixing or has been completed") - return - } + defer func() { + fm.isFixInProgress = false + }() - // Check if a listener goroutine has already been started - if _, listenerExists := fm.listenerStarted.LoadOrStore(id, true); listenerExists { - log.Info("Listener already started for payload", "payload", id) - return + log.Info("Starting synchronous fix process", "id", id) + err := worker.fix(parentHash) + if err != nil { + log.Error("Fix process failed", "error", err) + return err } - go func() { - log.Info("Start waiting for fix completion") - result := <-ch.(chan FixResult) // Wait for the fix result - - // Check the result and decide whether to retry the payload update - if result.Success { - if err := worker.retryPayloadUpdate(args, payload); err != nil { - log.Error("Failed to retry payload update after fix", "id", id, "err", err) - } else { - log.Info("Payload update after fix succeeded", "id", id) - } - } else { - log.Error("Fix failed, skipping payload update", "id", id, "err", result.Err) - } - - // Clean up the fix state - fm.fixChannels.Delete(id) - fm.listenerStarted.Delete(id) - }() + log.Info("Fix process completed successfully", "id", id) + return nil } // RecoverFromLocal attempts to recover the block and MPT data from the local chain. // // blockHash: The latest header(unsafe block) hash of the block to recover. -func (fm *FixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { +func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { block := w.chain.GetBlockByHash(blockHash) if block == nil { return fmt.Errorf("block not found in local chain") @@ -120,7 +72,7 @@ func (fm *FixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { // RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful. // // blockHash: The latest header(unsafe block) hash of the block to recover. -func (fm *FixManager) RecoverFromPeer(blockHash common.Hash) error { +func (fm *StateFixManager) RecoverFromPeer(blockHash common.Hash) error { peers := fm.downloader.GetAllPeers() if len(peers) == 0 { return fmt.Errorf("no peers available") diff --git a/miner/payload_building.go b/miner/payload_building.go index dae520cd9f..3aa5164d86 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -17,6 +17,7 @@ package miner import ( + "context" "crypto/sha256" "encoding/binary" "errors" @@ -29,7 +30,9 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -279,32 +282,29 @@ func (payload *Payload) stopBuilding() { // missing the block), it attempts to retrieve the block header from peers and triggers // // blockHash: The hash of the latest block that needs to be recovered and fixed. -func (w *worker) fix(blockHash common.Hash, resultChan chan FixResult) { +func (w *worker) fix(blockHash common.Hash) error { log.Info("Fix operation started") // Try to recover from local data - err := w.fixManager.RecoverFromLocal(w, blockHash) + err := w.stateFixManager.RecoverFromLocal(w, blockHash) if err != nil { // Only proceed to peer recovery if the error is "block not found in local chain" if strings.Contains(err.Error(), "block not found") { log.Warn("Local recovery failed, trying to recover from peers", "err", err) // Try to recover from peers - err = w.fixManager.RecoverFromPeer(blockHash) + err = w.stateFixManager.RecoverFromPeer(blockHash) if err != nil { - log.Error("Failed to recover from peers", "err", err) - resultChan <- FixResult{Success: false, Err: err} - return + return err } } else { log.Error("Failed to recover from local data", "err", err) - resultChan <- FixResult{Success: false, Err: err} - return + return err } } log.Info("Fix operation completed successfully") - resultChan <- FixResult{Success: true, Err: nil} + return nil } // buildPayload builds the payload according to the provided parameters. @@ -362,6 +362,18 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { return nil, err } + //check state of parent block + _, err = w.retrieveParentState(w.chain.CurrentBlock()) + if err != nil && strings.Contains(err.Error(), "missing trie node") { + log.Error("missing parent state when building block, try to fix...") + // fix state data + fixErr := w.StartStateFix(args.Id(), fullParams.parentHash) + if fixErr != nil { + log.Error("fix failed", "err", fixErr) + } + return nil, err + } + payload := newPayload(nil, args.Id()) // set shared interrupt fullParams.interrupt = payload.interrupt @@ -395,13 +407,6 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { // getSealingBlock is interrupted by shared interrupt r := w.getSealingBlock(fullParams) - // if state missing, init fixing routine - if r.err != nil && strings.Contains(r.err.Error(), "missing trie node") { - log.Info("step into fixing") - w.StartFix(payload.id, fullParams.parentHash) - return 0 - } - dur := time.Since(start) // update handles error case @@ -531,3 +536,31 @@ func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { log.Info("Successfully cached sealed new block", "number", block.Number(), "root", block.Root(), "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) } + +func (w *worker) retrieveParentState(parent *types.Header) (state *state.StateDB, err error) { + // Retrieve the parent state to execute on top and start a prefetcher for + // the miner to speed block sealing up a bit. + state, err = w.chain.StateAt(parent.Root) + + // If there is an error and Optimism is enabled in the chainConfig, allow reorg + if err != nil && w.chainConfig.Optimism != nil { + if historicalBackend, ok := w.eth.(BackendWithHistoricalState); ok { + // Attempt to retrieve the historical state + var release tracers.StateReleaseFunc + parentBlock := w.eth.BlockChain().GetBlockByHash(parent.Hash()) + state, release, err = historicalBackend.StateAtBlock( + context.Background(), parentBlock, ^uint64(0), nil, false, false, + ) + + // Copy the state and release the resources + state = state.Copy() + release() + } + } + + // Return the state and any error encountered + if err != nil { + return nil, err + } + return state, nil +} diff --git a/miner/worker.go b/miner/worker.go index 33fb9794b6..42d698d24a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -27,6 +27,8 @@ import ( mapset "github.com/deckarep/golang-set/v2" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -44,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" - "github.com/holiman/uint256" ) const ( @@ -271,15 +272,11 @@ type worker struct { bundleCache *BundleCache // FixManager - fixManager *FixManager -} - -func (w *worker) StartFix(id engine.PayloadID, parentHash common.Hash) { - w.fixManager.StartFix(w, id, parentHash) + stateFixManager *StateFixManager } -func (w *worker) ListenFixCompletion(id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) { - w.fixManager.ListenFixCompletion(w, id, payload, args) +func (w *worker) StartStateFix(id engine.PayloadID, parentHash common.Hash) error { + return w.stateFixManager.StartFix(w, id, parentHash) } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { @@ -306,7 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), bundleCache: NewBundleCache(), - fixManager: NewFixManager(eth.Downloader()), + stateFixManager: NewFixManager(eth.Downloader()), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) From a11dbb4545a4782e995b195bcca1e63e91ba6e17 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 29 Oct 2024 17:13:14 +0800 Subject: [PATCH 4/8] remove unused code --- eth/catalyst/queue.go | 18 ----------- miner/payload_building.go | 68 +++++++++------------------------------ 2 files changed, 16 insertions(+), 70 deletions(-) diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index 882d9d6535..d42904843b 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/miner" ) @@ -93,23 +92,6 @@ func (q *payloadQueue) get(id engine.PayloadID, full bool) *engine.ExecutionPayl return nil } -// getWithoutStatus retrieves a previously stored payload item or nil if it does not exist. -func (q *payloadQueue) getWithoutStatus(id engine.PayloadID) *miner.Payload { - q.lock.RLock() - defer q.lock.RUnlock() - - for _, item := range q.payloads { - if item == nil { - log.Info("getting payload not found", "id", id) - return nil // no more items - } - if item.id == id { - return item.payload - } - } - return nil -} - // waitFull waits until the first full payload has been built for the specified payload id // The method returns immediately if the payload is unknown. func (q *payloadQueue) waitFull(id engine.PayloadID) error { diff --git a/miner/payload_building.go b/miner/payload_building.go index 3aa5164d86..7b93600bb8 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -363,7 +363,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { } //check state of parent block - _, err = w.retrieveParentState(w.chain.CurrentBlock()) + _, err = w.retrieveParentState(fullParams) if err != nil && strings.Contains(err.Error(), "missing trie node") { log.Error("missing parent state when building block, try to fix...") // fix state data @@ -452,54 +452,6 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { return payload, nil } -// retryPayloadUpdate retries the payload update process after a fix operation. -// -// This function reconstructs the block using the provided BuildPayloadArgs and -// attempts to update the payload in the system. It performs validation of the -// block parameters and updates the payload if the block is successfully built. -func (w *worker) retryPayloadUpdate(args *BuildPayloadArgs, payload *Payload) error { - fullParams := &generateParams{ - timestamp: args.Timestamp, - forceTime: true, - parentHash: args.Parent, - coinbase: args.FeeRecipient, - random: args.Random, - withdrawals: args.Withdrawals, - beaconRoot: args.BeaconRoot, - noTxs: false, - txs: args.Transactions, - gasLimit: args.GasLimit, - } - - // Since we skip building the empty block when using the tx pool, we need to explicitly - // validate the BuildPayloadArgs here. - _, err := w.validateParams(fullParams) - if err != nil { - log.Error("Failed to validate payload parameters", "id", payload.id, "err", err) - return fmt.Errorf("failed to validate payload parameters: %w", err) - } - - // set shared interrupt - fullParams.interrupt = payload.interrupt - - r := w.getSealingBlock(fullParams) - if r.err != nil { - log.Error("Failed to build full payload after fix", "id", payload.id, "err", r.err) - return fmt.Errorf("failed to build full payload after fix: %w", r.err) - } - - payload.update(r, 0, func() { - w.cacheMiningBlock(r.block, r.env) - }) - - if r.err == nil { - fullParams.isUpdate = true - } - - log.Info("Successfully updated payload after fix", "id", payload.id) - return nil -} - func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { var ( start = time.Now() @@ -537,9 +489,21 @@ func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { "elapsed", common.PrettyDuration(time.Since(start))) } -func (w *worker) retrieveParentState(parent *types.Header) (state *state.StateDB, err error) { - // Retrieve the parent state to execute on top and start a prefetcher for - // the miner to speed block sealing up a bit. +func (w *worker) retrieveParentState(genParams *generateParams) (state *state.StateDB, err error) { + w.mu.RLock() + defer w.mu.RUnlock() + + log.Info("retrieveParentState validate") + // Find the parent block for sealing task + parent := w.chain.CurrentBlock() + if genParams.parentHash != (common.Hash{}) { + block := w.chain.GetBlockByHash(genParams.parentHash) + if block == nil { + return nil, fmt.Errorf("missing parent") + } + parent = block.Header() + } + state, err = w.chain.StateAt(parent.Root) // If there is an error and Optimism is enabled in the chainConfig, allow reorg From 487b72228944328262788ac2bf54b75ec1f8abb0 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 5 Nov 2024 16:57:56 +0800 Subject: [PATCH 5/8] refine: simplify fix logic --- core/blockchain.go | 68 +++++++++++++++++++++--------------- eth/downloader/downloader.go | 4 --- eth/downloader/fetchers.go | 14 -------- miner/fix_manager.go | 42 +++------------------- miner/miner.go | 6 ---- miner/miner_test.go | 4 --- miner/payload_building.go | 30 ++-------------- miner/worker.go | 2 +- miner/worker_test.go | 9 +++-- 9 files changed, 52 insertions(+), 127 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 561e102abc..c3a969db99 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -242,16 +242,17 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - lastWrite uint64 // Last block when the state was flushed - flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state - triedb *triedb.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) - proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. - txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + lastWrite uint64 // Last block when the state was flushed + flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state + triedb *triedb.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) + proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. + txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + stateRecoveringStatus bool hc *HeaderChain rmLogsFeed event.Feed @@ -337,24 +338,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triedb: triedb, - triegc: prque.New[int64, common.Hash](nil), - quit: make(chan struct{}), - chainmu: syncx.NewClosableMutex(), - bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), - bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), - receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit), - miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit), - futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), - engine: engine, - vmConfig: vmConfig, + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triedb: triedb, + triegc: prque.New[int64, common.Hash](nil), + quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), + bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), + bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), + receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), + miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit), + miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit), + futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), + engine: engine, + vmConfig: vmConfig, + stateRecoveringStatus: false, } bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit)) bc.forker = NewForkChoice(bc, shouldPreserve) @@ -2232,6 +2234,16 @@ func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, e // recoverAncestors is only used post-merge. // We return the hash of the latest block that we could correctly validate. func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) { + if bc.stateRecoveringStatus { + log.Warn("recover is already in progress, skipping", "block", block.Hash()) + return common.Hash{}, errors.New("state recover in progress") + } + + bc.stateRecoveringStatus = true + defer func() { + bc.stateRecoveringStatus = false + }() + // Gather all the sidechain hashes (full blocks may be memory heavy) var ( hashes []common.Hash diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5b637901c8..8cb9f14716 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1847,7 +1847,3 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta)) d.syncLogTime = time.Now() } - -func (d *Downloader) GetAllPeers() []*peerConnection { - return d.peers.AllPeers() -} diff --git a/eth/downloader/fetchers.go b/eth/downloader/fetchers.go index 05898695ca..cc4279b0da 100644 --- a/eth/downloader/fetchers.go +++ b/eth/downloader/fetchers.go @@ -17,7 +17,6 @@ package downloader import ( - "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -114,16 +113,3 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil } } - -func (d *Downloader) GetHeaderByHashFromPeer(peer *peerConnection, blockHash common.Hash) (*types.Header, error) { - headers, _, err := d.fetchHeadersByHash(peer, blockHash, 1, 0, false) - if err != nil { - return nil, fmt.Errorf("failed to fetch header from peer: %v", err) - } - - if len(headers) == 0 { - return nil, fmt.Errorf("no headers returned for hash: %v", blockHash) - } - - return headers[0], nil -} diff --git a/miner/fix_manager.go b/miner/fix_manager.go index afb62840e6..316eb703fc 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -6,23 +6,18 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" ) // StateFixManager manages the fix operation state and notification mechanism. type StateFixManager struct { - mutex sync.Mutex // Protects access to fix state - isFixInProgress bool // Tracks if a fix operation is in progress - downloader *downloader.Downloader // Used to trigger BeaconSync operations + mutex sync.Mutex // Protects access to fix state + isFixInProgress bool // Tracks if a fix operation is in progress } // NewFixManager initializes a FixManager with required dependencies -func NewFixManager(downloader *downloader.Downloader) *StateFixManager { - return &StateFixManager{ - downloader: downloader, - } +func NewFixManager() *StateFixManager { + return &StateFixManager{} } // StartFix launches a goroutine to manage the fix process and tracks the fix state. @@ -68,32 +63,3 @@ func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) er log.Info("Recovered states up to block", "latestValid", latestValid) return nil } - -// RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful. -// -// blockHash: The latest header(unsafe block) hash of the block to recover. -func (fm *StateFixManager) RecoverFromPeer(blockHash common.Hash) error { - peers := fm.downloader.GetAllPeers() - if len(peers) == 0 { - return fmt.Errorf("no peers available") - } - - var header *types.Header - var err error - for _, peer := range peers { - header, err = fm.downloader.GetHeaderByHashFromPeer(peer, blockHash) - if err == nil && header != nil { - break - } - log.Warn("Failed to retrieve header from peer", "err", err) - } - - if header == nil { - return fmt.Errorf("failed to retrieve header from all valid peers") - } - - log.Info("Successfully retrieved header from peer", "blockHash", blockHash) - - fm.downloader.BeaconSync(downloader.FullSync, header, nil) - return nil -} diff --git a/miner/miner.go b/miner/miner.go index 02b2fd5699..53755ad632 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -87,7 +87,6 @@ var DefaultMevConfig = MevConfig{ type Backend interface { BlockChain() *core.BlockChain TxPool() *txpool.TxPool - Downloader() *downloader.Downloader } type BackendWithHistoricalState interface { @@ -308,11 +307,6 @@ func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { return miner.worker.buildPayload(args) } -// Worker builds the payload according to the provided parameters. -func (miner *Miner) Worker() *worker { - return miner.worker -} - func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) { env, err := miner.prepareSimulationEnv() diff --git a/miner/miner_test.go b/miner/miner_test.go index e3fd39fd51..5907fb4464 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -60,10 +60,6 @@ func (m *mockBackend) TxPool() *txpool.TxPool { return m.txPool } -func (m *mockBackend) Downloader() *downloader.Downloader { - return nil -} - func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { return nil, errors.New("not supported") } diff --git a/miner/payload_building.go b/miner/payload_building.go index 7b93600bb8..457e526218 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -269,18 +269,7 @@ func (payload *Payload) stopBuilding() { } // fix attempts to recover and repair the block and its associated data (such as MPT) -// either from the local blockchain or from peers. -// -// In most cases, the block can be recovered from the local node's data. However, -// there is a corner case where this may not be possible: If the sequencer -// broadcasts a block but the local node crashes before fully writing the block to its local -// storage, the local chain might be lagging behind by one block compared to peers. -// In such cases, we need to recover the missing block data from peers. -// -// The function first tries to recover the block using the local blockchain via the -// fixManager.RecoverFromLocal method. If local recovery fails (e.g., due to the node -// missing the block), it attempts to retrieve the block header from peers and triggers -// +// from the local blockchain // blockHash: The hash of the latest block that needs to be recovered and fixed. func (w *worker) fix(blockHash common.Hash) error { log.Info("Fix operation started") @@ -288,19 +277,8 @@ func (w *worker) fix(blockHash common.Hash) error { // Try to recover from local data err := w.stateFixManager.RecoverFromLocal(w, blockHash) if err != nil { - // Only proceed to peer recovery if the error is "block not found in local chain" - if strings.Contains(err.Error(), "block not found") { - log.Warn("Local recovery failed, trying to recover from peers", "err", err) - - // Try to recover from peers - err = w.stateFixManager.RecoverFromPeer(blockHash) - if err != nil { - return err - } - } else { - log.Error("Failed to recover from local data", "err", err) - return err - } + log.Error("Failed to recover from local data", "err", err) + return err } log.Info("Fix operation completed successfully") @@ -406,10 +384,8 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { start := time.Now() // getSealingBlock is interrupted by shared interrupt r := w.getSealingBlock(fullParams) - dur := time.Since(start) // update handles error case - payload.update(r, dur, func() { w.cacheMiningBlock(r.block, r.env) }) diff --git a/miner/worker.go b/miner/worker.go index 7832c1eeb4..b8df413238 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -303,7 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), bundleCache: NewBundleCache(), - stateFixManager: NewFixManager(eth.Downloader()), + stateFixManager: NewFixManager(), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) diff --git a/miner/worker_test.go b/miner/worker_test.go index 035233353b..1c19e60de9 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -34,11 +36,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) const ( @@ -146,9 +146,8 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine } } -func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } -func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } -func (b *testWorkerBackend) Downloader() *downloader.Downloader { return nil } +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { var tx *types.Transaction From 631b962322d6d55e50808332365ee9e02a64e1af Mon Sep 17 00:00:00 2001 From: Krish Date: Mon, 11 Nov 2024 13:43:57 +0800 Subject: [PATCH 6/8] chore: refine as comment --- core/blockchain.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c3a969db99..511d4db8a9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -252,7 +252,7 @@ type BlockChain struct { stateCache state.Database // State database to reuse between imports (contains state cache) proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. txIndexer *txIndexer // Transaction indexer, might be nil if not enabled - stateRecoveringStatus bool + stateRecoveringStatus atomic.Bool hc *HeaderChain rmLogsFeed event.Feed @@ -338,25 +338,24 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triedb: triedb, - triegc: prque.New[int64, common.Hash](nil), - quit: make(chan struct{}), - chainmu: syncx.NewClosableMutex(), - bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), - bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), - receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit), - miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit), - futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), - engine: engine, - vmConfig: vmConfig, - stateRecoveringStatus: false, + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triedb: triedb, + triegc: prque.New[int64, common.Hash](nil), + quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), + bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), + bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), + receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), + miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit), + miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit), + futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), + engine: engine, + vmConfig: vmConfig, } bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit)) bc.forker = NewForkChoice(bc, shouldPreserve) @@ -2234,14 +2233,14 @@ func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, e // recoverAncestors is only used post-merge. // We return the hash of the latest block that we could correctly validate. func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) { - if bc.stateRecoveringStatus { + if bc.stateRecoveringStatus.Load() { log.Warn("recover is already in progress, skipping", "block", block.Hash()) return common.Hash{}, errors.New("state recover in progress") } - bc.stateRecoveringStatus = true + bc.stateRecoveringStatus.Store(true) defer func() { - bc.stateRecoveringStatus = false + bc.stateRecoveringStatus.Store(false) }() // Gather all the sidechain hashes (full blocks may be memory heavy) From 6a5de76a2fdd4d168d61dfb44ff6a4d60ab07bf6 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 12 Nov 2024 14:29:19 +0800 Subject: [PATCH 7/8] fix: set reentry flag --- miner/fix_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/miner/fix_manager.go b/miner/fix_manager.go index 316eb703fc..5e70711a09 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -30,6 +30,7 @@ func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentH return nil } + fm.isFixInProgress = true defer func() { fm.isFixInProgress = false }() From c270b4514005902b86af7168ef4ab45d207f4a46 Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 13 Nov 2024 11:16:48 +0800 Subject: [PATCH 8/8] chore: remove unused code --- miner/fix_manager.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/miner/fix_manager.go b/miner/fix_manager.go index 5e70711a09..b2f097ee85 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -11,8 +11,7 @@ import ( // StateFixManager manages the fix operation state and notification mechanism. type StateFixManager struct { - mutex sync.Mutex // Protects access to fix state - isFixInProgress bool // Tracks if a fix operation is in progress + mutex sync.Mutex // Protects access to fix state } // NewFixManager initializes a FixManager with required dependencies @@ -25,17 +24,8 @@ func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentH fm.mutex.Lock() defer fm.mutex.Unlock() - if fm.isFixInProgress { - log.Warn("Fix is already in progress for this block", "id", id) - return nil - } - - fm.isFixInProgress = true - defer func() { - fm.isFixInProgress = false - }() + log.Info("Fix is in progress for the block", "id", id) - log.Info("Starting synchronous fix process", "id", id) err := worker.fix(parentHash) if err != nil { log.Error("Fix process failed", "error", err)