diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 3b0887761f2..2028298b163 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + stateLib "github.com/erigontech/erigon-lib/state" "os" "time" @@ -165,6 +166,12 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. } defer tx.Rollback() + sd, err := stateLib.NewSharedDomains(tx, logger1) + if err != nil { + return err + } + defer sd.Close() + quit := ctx.Done() var batchSize datasize.ByteSize @@ -270,7 +277,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. stateStages.MockExecFunc(stages.Execution, execUntilFunc(execToBlock)) _ = stateStages.SetCurrentStage(stages.Execution) - if _, err := stateStages.Run(db, wrap.TxContainer{Tx: tx}, false /* firstCycle */, false); err != nil { + if _, err := stateStages.Run(db, wrap.TxContainer{Tx: tx, Doms: sd}, false /* firstCycle */, false); err != nil { return err } @@ -300,7 +307,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. miner.MiningConfig.Etherbase = nextBlock.Coinbase() miner.MiningConfig.ExtraData = nextBlock.Extra() miningStages.MockExecFunc(stages.MiningCreateBlock, func(badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error { - err = stagedsync.SpawnMiningCreateBlockStage(s, txc.Tx, + err = stagedsync.SpawnMiningCreateBlockStage(s, txc, stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, dirs.Tmp, br), quit, logger) if err != nil { @@ -322,7 +329,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. //}) _ = miningStages.SetCurrentStage(stages.MiningCreateBlock) - if _, err := miningStages.Run(db, wrap.TxContainer{Tx: tx}, false /* firstCycle */, false); err != nil { + if _, err := miningStages.Run(db, wrap.TxContainer{Tx: tx, Doms: sd}, false /* firstCycle */, false); err != nil { return err } tx.Rollback() diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index cee07fdfcad..f0c1b5175cc 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -69,9 +69,11 @@ type Worker struct { vmCfg vm.Config dirs datadir.Dirs + + isMining bool } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -91,6 +93,8 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro taskGasPool: new(core.GasPool), dirs: dirs, + + isMining: isMining, } w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer} @@ -126,7 +130,7 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) { func (rw *Worker) Run() error { for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) { - rw.RunTxTask(txTask) + rw.RunTxTask(txTask, rw.isMining) if err := rw.resultCh.Add(rw.ctx, txTask); err != nil { return err } @@ -134,10 +138,10 @@ func (rw *Worker) Run() error { return nil } -func (rw *Worker) RunTxTask(txTask *state.TxTask) { +func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) { rw.lock.Lock() defer rw.lock.Unlock() - rw.RunTxTaskNoLock(txTask) + rw.RunTxTaskNoLock(txTask, isMining) } // Needed to set history reader when need to offset few txs from block beginning and does not break processing, @@ -159,7 +163,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) { } } -func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) { +func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { if txTask.HistoryExecution && !rw.historyMode { // in case if we cancelled execution and commitment happened in the middle of the block, we have to process block // from the beginning until committed txNum and only then disable history mode. @@ -228,7 +232,11 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) { return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - _, _, _, err := rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger) + if isMining { + _, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger) + } else { + _, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger) + } if err != nil { txTask.Error = err } else { @@ -291,7 +299,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) { } } -func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -302,7 +310,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs) + reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) reconWorkers[i].ResetState(rs, accumulator) } if background { @@ -329,7 +337,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs) + applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/blockchain.go b/core/blockchain.go index f7eca6cc3b9..2033c627067 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -334,7 +334,7 @@ func FinalizeBlockExecution( newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger) } else { var rss types.Requests - _, _, rss, err = engine.Finalize(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, logger) + newTxs, newReceipt, rss, err = engine.Finalize(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, logger) if !reflect.DeepEqual(rss, requests) { return nil, nil, nil, fmt.Errorf("invalid requests for block %d", header.Number.Uint64()) @@ -367,7 +367,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead return nil } -func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header) error { +func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error { if gasUsed != h.GasUsed { return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x", gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash()) @@ -383,6 +383,10 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip } receiptHash := types.DeriveSha(receipts) if receiptHash != h.ReceiptHash { + if isMining { + h.ReceiptHash = receiptHash + return nil + } return fmt.Errorf("receiptHash mismatch: %x != %x, headerNum=%d, %x", receiptHash, h.ReceiptHash, h.Number.Uint64(), h.Hash()) } diff --git a/core/state/intra_block_state.go b/core/state/intra_block_state.go index 600a1241d4e..4ab67f19f28 100644 --- a/core/state/intra_block_state.go +++ b/core/state/intra_block_state.go @@ -23,10 +23,6 @@ package state import ( "errors" "fmt" - "sort" - - "github.com/holiman/uint256" - "github.com/erigontech/erigon-lib/chain" libcommon "github.com/erigontech/erigon-lib/common" types2 "github.com/erigontech/erigon-lib/types" @@ -37,6 +33,8 @@ import ( "github.com/erigontech/erigon/core/vm/evmtypes" "github.com/erigontech/erigon/crypto" "github.com/erigontech/erigon/turbo/trie" + "github.com/holiman/uint256" + "sort" ) var _ evmtypes.IntraBlockState = new(IntraBlockState) // compile-time interface-check diff --git a/core/state/state_object.go b/core/state/state_object.go index 3eb76a514f1..f625af0617f 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -118,7 +118,6 @@ func newObject(db *IntraBlockState, address libcommon.Address, data, original *a so.data.Root = trie.EmptyRoot } so.original.Copy(original) - return &so } diff --git a/core/state_processor.go b/core/state_processor.go index b4a1c9a371c..0ebd3cbafe1 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -61,7 +61,6 @@ func applyTransaction(config *chain.Config, engine consensus.EngineReader, gp *G // Update the evm with the new transaction context. evm.Reset(txContext, ibs) - result, err := ApplyMessage(evm, msg, gp, true /* refunds */, false /* gasBailout */) if err != nil { return nil, nil, err diff --git a/core/state_transition.go b/core/state_transition.go index 511c3b91a7b..f0aae712387 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -315,7 +315,6 @@ func (st *StateTransition) preCheck(gasBailout bool) error { // nil evm execution result. func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtypes.ExecutionResult, error) { coinbase := st.evm.Context.Coinbase - senderInitBalance := st.state.GetBalance(st.msg.From()).Clone() coinbaseInitBalance := st.state.GetBalance(coinbase).Clone() diff --git a/erigon-lib/kv/membatchwithdb/memory_mutation.go b/erigon-lib/kv/membatchwithdb/memory_mutation.go index 49d1c8d45e4..61f6ba656eb 100644 --- a/erigon-lib/kv/membatchwithdb/memory_mutation.go +++ b/erigon-lib/kv/membatchwithdb/memory_mutation.go @@ -731,28 +731,35 @@ func (m *MemoryMutation) AggTx() any { } func (m *MemoryMutation) DomainGet(name kv.Domain, k, k2 []byte) (v []byte, step uint64, err error) { - return m.db.(kv.TemporalTx).DomainGet(name, k, k2) + panic("not supported") + //return m.db.(kv.TemporalTx).DomainGet(name, k, k2) } func (m *MemoryMutation) DomainGetAsOf(name kv.Domain, k, k2 []byte, ts uint64) (v []byte, ok bool, err error) { - return m.db.(kv.TemporalTx).DomainGetAsOf(name, k, k2, ts) + panic("not supported") + //return m.db.(kv.TemporalTx).DomainGetAsOf(name, k, k2, ts) } func (m *MemoryMutation) HistorySeek(name kv.History, k []byte, ts uint64) (v []byte, ok bool, err error) { - return m.db.(kv.TemporalTx).HistorySeek(name, k, ts) + panic("not supported") + //return m.db.(kv.TemporalTx).HistorySeek(name, k, ts) } func (m *MemoryMutation) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int) (timestamps stream.U64, err error) { - return m.db.(kv.TemporalTx).IndexRange(name, k, fromTs, toTs, asc, limit) + panic("not supported") + //return m.db.(kv.TemporalTx).IndexRange(name, k, fromTs, toTs, asc, limit) } func (m *MemoryMutation) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limit int) (it stream.KV, err error) { - return m.db.(kv.TemporalTx).HistoryRange(name, fromTs, toTs, asc, limit) + panic("not supported") + //return m.db.(kv.TemporalTx).HistoryRange(name, fromTs, toTs, asc, limit) } func (m *MemoryMutation) DomainRange(name kv.Domain, fromKey, toKey []byte, ts uint64, asc order.By, limit int) (it stream.KV, err error) { - return m.db.(kv.TemporalTx).DomainRange(name, fromKey, toKey, ts, asc, limit) + panic("not supported") + //return m.db.(kv.TemporalTx).DomainRange(name, fromKey, toKey, ts, asc, limit) } func (m *MemoryMutation) AppendableGet(name kv.Appendable, ts kv.TxnId) ([]byte, bool, error) { - return m.db.(kv.TemporalTx).AppendableGet(name, ts) + panic("not supported") + //return m.db.(kv.TemporalTx).AppendableGet(name, ts) } diff --git a/eth/backend.go b/eth/backend.go index ed6eb63d72f..ade2f57ef06 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1211,6 +1211,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient defer streamCancel() mineEvery := time.NewTicker(miner.MiningConfig.Recommit) + defer mineEvery.Stop() s.logger.Info("Starting to mine", "etherbase", eb) @@ -1221,9 +1222,6 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient hasWork := true // Start mining immediately errc := make(chan error, 1) - workCtx, workCancel := context.WithCancel(ctx) - defer workCancel() - for { // Only reset if some work was done previously as we'd like to rely // on the `miner.recommit` as backup. @@ -1243,12 +1241,15 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient hasWork = true case <-s.notifyMiningAboutNewTxs: + //log.Warn("[dbg] notifyMiningAboutNewTxs") + // Skip mining based on new txn notif for bor consensus hasWork = s.chainConfig.Bor == nil if hasWork { s.logger.Debug("Start mining based on txpool notif") } case <-mineEvery.C: + //log.Warn("[dbg] mineEvery", "working", working, "waiting", waiting.Load()) if !(working || waiting.Load()) { s.logger.Debug("Start mining based on miner.recommit", "duration", miner.MiningConfig.Recommit) } @@ -1272,12 +1273,13 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient hasWork = false mineEvery.Reset(miner.MiningConfig.Recommit) go func() { - err := stages2.MiningStep(ctx, db, mining, tmpDir, logger) + err = stages2.MiningStep(ctx, db, mining, tmpDir, logger) waiting.Store(true) - defer waiting.Store(false) - - errc <- err + defer func() { + waiting.Store(false) + errc <- err + }() if err != nil { return @@ -1291,8 +1293,8 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient s.minedBlocks <- block.Block } return - case <-workCtx.Done(): - errc <- workCtx.Err() + case <-ctx.Done(): + errc <- ctx.Err() return } } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 13f7f4d1a29..1b2aded217c 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -378,7 +378,7 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, isMining) defer stopWorkers() applyWorker := cfg.applyWorker if isMining { @@ -422,7 +422,7 @@ func ExecV3(ctx context.Context, return err } - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, isMining) if err != nil { return err } @@ -523,7 +523,7 @@ func ExecV3(ctx context.Context, rws.DrainNonBlocking() applyWorker.ResetTx(tx) - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, isMining) if err != nil { return err } @@ -834,7 +834,7 @@ Loop: if txTask.Error != nil { break Loop } - applyWorker.RunTxTaskNoLock(txTask) + applyWorker.RunTxTaskNoLock(txTask, isMining) if err := func() error { if errors.Is(txTask.Error, context.Canceled) { return err @@ -853,7 +853,7 @@ Loop: if txTask.Final { checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header); err != nil { + if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, isMining); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } @@ -875,12 +875,16 @@ Loop: return err } if errors.Is(err, consensus.ErrInvalidBlock) { - if err := u.UnwindTo(blockNum-1, BadBlock(header.Hash(), err), applyTx); err != nil { - return err + if u != nil { + if err := u.UnwindTo(blockNum-1, BadBlock(header.Hash(), err), applyTx); err != nil { + return err + } } } else { - if err := u.UnwindTo(blockNum-1, ExecUnwind, applyTx); err != nil { - return err + if u != nil { + if err := u.UnwindTo(blockNum-1, ExecUnwind, applyTx); err != nil { + return err + } } } break Loop @@ -1119,6 +1123,11 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return false, fmt.Errorf("writing plain state version: %w", err) } } + + if header == nil { + return false, errors.New("header is nil") + } + if dbg.DiscardCommitment() { return true, nil } @@ -1126,11 +1135,12 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT panic(fmt.Errorf("%d != %d", doms.BlockNum(), header.Number.Uint64())) } - rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), u.LogPrefix()) + rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix()) if err != nil { return false, fmt.Errorf("StateV3.Apply: %w", err) } if cfg.blockProduction { + header.Root = common.BytesToHash(rh) return true, nil } if bytes.Equal(rh, header.Root.Bytes()) { @@ -1176,8 +1186,10 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return false, fmt.Errorf("%w: requested=%d, minAllowed=%d", ErrTooDeepUnwind, unwindTo, allowedUnwindTo) } logger.Warn("Unwinding due to incorrect root hash", "to", unwindTo) - if err := u.UnwindTo(allowedUnwindTo, BadBlock(header.Hash(), ErrInvalidStateRootHash), applyTx); err != nil { - return false, err + if u != nil { + if err := u.UnwindTo(allowedUnwindTo, BadBlock(header.Hash(), ErrInvalidStateRootHash), applyTx); err != nil { + return false, err + } } return false, nil } @@ -1203,7 +1215,7 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser return b, err } -func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { +func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, isMining bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { rwsIt := rws.Iter() defer rwsIt.Close() @@ -1221,7 +1233,7 @@ func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *stat } // resolve first conflict right here: it's faster and conflict-free - applyWorker.RunTxTask(txTask) + applyWorker.RunTxTask(txTask, isMining) if txTask.Error != nil { return outputTxNum, conflicts, triggers, processedBlockNum, false, fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, txTask.Error) } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index e1bb7c9cf3f..79cc30fad60 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -128,8 +128,8 @@ func StageExecuteBlocksCfg( historyV3: true, syncCfg: syncCfg, silkworm: silkworm, - applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs), - applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs), + applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, false), + applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, true), } } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 60fbbabcd6c..bd6fdb3dc6d 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -162,7 +162,7 @@ func HeadersPOW(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg return nil } - logger.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", startProgress) + logger.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", startProgress, "hash", hash.Hex()) diagnostics.Send(diagnostics.HeadersWaitingUpdate{From: startProgress}) @@ -173,8 +173,8 @@ func HeadersPOW(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg /* TEMP TESTING if localTd == nil { return fmt.Errorf("localTD is nil: %d, %x", startProgress, hash) - } - TEMP TESTING */ + }*/ + headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, startProgress, cfg.blockReader) cfg.hd.SetHeaderReader(&ChainReaderImpl{ config: &cfg.chainConfig, diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index f05433808d6..7e9c4e1727d 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/erigontech/erigon-lib/wrap" "math/big" "time" @@ -97,7 +98,7 @@ var maxTransactions uint16 = 1000 // SpawnMiningCreateBlockStage // TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBlockCfg, quit <-chan struct{}, logger log.Logger) (err error) { +func SpawnMiningCreateBlockStage(s *StageState, txc wrap.TxContainer, cfg MiningCreateBlockCfg, quit <-chan struct{}, logger log.Logger) (err error) { current := cfg.miner.MiningBlock txPoolLocals := []libcommon.Address{} //txPoolV2 has no concept of local addresses (yet?) coinbase := cfg.miner.MiningConfig.Etherbase @@ -108,11 +109,11 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc ) logPrefix := s.LogPrefix() - executionAt, err := s.ExecutionAt(tx) + executionAt, err := s.ExecutionAt(txc.Tx) if err != nil { return fmt.Errorf("getting last executed block: %w", err) } - parent := rawdb.ReadHeaderByNumber(tx, executionAt) + parent := rawdb.ReadHeaderByNumber(txc.Tx, executionAt) if parent == nil { // todo: how to return error and don't stop Erigon? return fmt.Errorf("empty block %d", executionAt) } @@ -131,18 +132,18 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc blockNum := executionAt + 1 - localUncles, remoteUncles, err := readNonCanonicalHeaders(tx, blockNum, cfg.engine, coinbase, txPoolLocals) + localUncles, remoteUncles, err := readNonCanonicalHeaders(txc.Tx, blockNum, cfg.engine, coinbase, txPoolLocals) if err != nil { return err } - chain := ChainReader{Cfg: cfg.chainConfig, Db: tx, BlockReader: cfg.blockReader, Logger: logger} + chain := ChainReader{Cfg: cfg.chainConfig, Db: txc.Tx, BlockReader: cfg.blockReader, Logger: logger} var GetBlocksFromHash = func(hash libcommon.Hash, n int) (blocks []*types.Block) { - number, _ := cfg.blockReader.HeaderNumber(context.Background(), tx, hash) + number, _ := cfg.blockReader.HeaderNumber(context.Background(), txc.Tx, hash) if number == nil { return nil } for i := 0; i < n; i++ { - block, _, _ := cfg.blockReader.BlockWithSenders(context.Background(), tx, hash, *number) + block, _, _ := cfg.blockReader.BlockWithSenders(context.Background(), txc.Tx, hash, *number) if block == nil { break } @@ -183,9 +184,7 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc header.Extra = cfg.miner.MiningConfig.ExtraData logger.Info(fmt.Sprintf("[%s] Start mine", logPrefix), "block", executionAt+1, "baseFee", header.BaseFee, "gasLimit", header.GasLimit) - - stateReader := state.NewPlainStateReader(tx) - ibs := state.New(stateReader) + ibs := state.New(state.NewReaderV3(txc.Doms)) if err = cfg.engine.Prepare(chain, header, ibs); err != nil { logger.Error("Failed to prepare header for mining", diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index eea4b2f55e2..381bca90880 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -96,23 +96,23 @@ func StageMiningExecCfg( // SpawnMiningExecStage // TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg, sendersCfg SendersCfg, execCfg ExecuteBlockCfg, ctx context.Context, logger log.Logger) error { +func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg, sendersCfg SendersCfg, execCfg ExecuteBlockCfg, ctx context.Context, logger log.Logger, u Unwinder) error { cfg.vmConfig.NoReceipts = false chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) logPrefix := s.LogPrefix() current := cfg.miningState.MiningBlock txs := current.PreparedTxs noempty := true - var domains *state2.SharedDomains var ( stateReader state.StateReader ) stateReader = state.NewReaderV3(txc.Doms) ibs := state.New(stateReader) // Clique consensus needs forced author in the evm context - if cfg.chainConfig.Consensus == chain.CliqueConsensus { - execCfg.author = &cfg.miningState.MiningConfig.Etherbase - } + //if cfg.chainConfig.Consensus == chain.CliqueConsensus { + // execCfg.author = &cfg.miningState.MiningConfig.Etherbase + //} + execCfg.author = &cfg.miningState.MiningConfig.Etherbase // Create an empty block based on temporary copied state for // sealing in advance without waiting block execution finished. @@ -138,18 +138,18 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg yielded := mapset.NewSet[[32]byte]() var simStateReader state.StateReader var simStateWriter state.StateWriter - m := membatchwithdb.NewMemoryBatch(txc.Tx, cfg.tmpdir, logger) - defer m.Rollback() - var err error - domains, err = state2.NewSharedDomains(m, logger) + + mb := membatchwithdb.NewMemoryBatch(txc.Tx, cfg.tmpdir, logger) + defer mb.Close() + sd, err := state2.NewSharedDomains(mb, logger) if err != nil { return err } - defer domains.Close() - simStateReader = state.NewReaderV3(domains) - simStateWriter = state.NewWriterV4(domains) + defer sd.Close() + simStateWriter = state.NewWriterV4(sd) + simStateReader = state.NewReaderV3(sd) - executionAt, err := s.ExecutionAt(txc.Tx) + executionAt, err := s.ExecutionAt(mb) if err != nil { return err } @@ -208,38 +208,39 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg current.Requests = block.Requests() // Simulate the block execution to get the final state root - if err := rawdb.WriteHeader(txc.Tx, block.Header()); err != nil { + if err = rawdb.WriteHeader(txc.Tx, block.Header()); err != nil { return fmt.Errorf("cannot write header: %s", err) } blockHeight := block.NumberU64() - if err := rawdb.WriteCanonicalHash(txc.Tx, block.Hash(), blockHeight); err != nil { + if err = rawdb.WriteCanonicalHash(txc.Tx, block.Hash(), blockHeight); err != nil { return fmt.Errorf("cannot write canonical hash: %s", err) } - if err := rawdb.WriteHeadHeaderHash(txc.Tx, block.Hash()); err != nil { + if err = rawdb.WriteHeadHeaderHash(txc.Tx, block.Hash()); err != nil { return err } if _, err = rawdb.WriteRawBodyIfNotExists(txc.Tx, block.Hash(), blockHeight, block.RawBody()); err != nil { return fmt.Errorf("cannot write body: %s", err) } - if err := rawdb.AppendCanonicalTxNums(txc.Tx, blockHeight); err != nil { + if err = rawdb.AppendCanonicalTxNums(txc.Tx, blockHeight); err != nil { return err } - if err := stages.SaveStageProgress(txc.Tx, kv.Headers, blockHeight); err != nil { + if err = stages.SaveStageProgress(txc.Tx, kv.Headers, blockHeight); err != nil { return err } - if err := stages.SaveStageProgress(txc.Tx, stages.Bodies, blockHeight); err != nil { + if err = stages.SaveStageProgress(txc.Tx, stages.Bodies, blockHeight); err != nil { return err } senderS := &StageState{state: s.state, ID: stages.Senders, BlockNumber: blockHeight - 1} - if err := SpawnRecoverSendersStage(sendersCfg, senderS, nil, txc.Tx, blockHeight, ctx, logger); err != nil { + if err = SpawnRecoverSendersStage(sendersCfg, senderS, nil, txc.Tx, blockHeight, ctx, logger); err != nil { return err } // This flag will skip checking the state root execCfg.blockProduction = true execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1} - if err := ExecBlockV3(execS, nil, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { + if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { + logger.Error("cannot execute block execution", "err", err) return err } @@ -532,7 +533,7 @@ LOOP: txs.Pop() } else if errors.Is(err, core.ErrNonceTooLow) { // New head notification data race between the transaction pool and miner, shift - logger.Debug(fmt.Sprintf("[%s] Skipping transaction with low nonce", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce()) + logger.Debug(fmt.Sprintf("[%s] Skipping transaction with low nonce", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "err", err) txs.Shift() } else if errors.Is(err, core.ErrNonceTooHigh) { // Reorg notification data race between the transaction pool and miner, skip account = diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index e35f78919ef..d74f6dfcffa 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -99,6 +99,7 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit "gasUsed", block.GasUsed(), "gasLimit", block.GasLimit(), "difficulty", block.Difficulty(), + "header", block.Header(), ) } // interrupt aborts the in-flight sealing task. diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index f04acdb0c3b..cbde166e0ea 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -49,7 +49,7 @@ func MiningStages( ID: stages.MiningCreateBlock, Description: "Mining: construct new block from txn pool", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnMiningCreateBlockStage(s, txc.Tx, createBlockCfg, ctx.Done(), logger) + return SpawnMiningCreateBlockStage(s, txc, createBlockCfg, ctx.Done(), logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return nil @@ -76,7 +76,7 @@ func MiningStages( ID: stages.MiningExecution, Description: "Mining: execute new block from txn pool", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnMiningExecStage(s, txc, execCfg, sendersCfg, executeBlockCfg, ctx, logger) + return SpawnMiningExecStage(s, txc, execCfg, sendersCfg, executeBlockCfg, ctx, logger, nil) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return nil diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 83541f975a0..5f3b20ce3ed 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -365,9 +365,8 @@ func (s *Sync) Run(db kv.RwDB, txc wrap.TxContainer, initialCycle, firstCycle bo s.timings = s.timings[:0] hasMore := false - + var badBlockUnwind bool for !s.IsDone() { - var badBlockUnwind bool if s.unwindPoint != nil { for j := 0; j < len(s.unwindOrder); j++ { if s.unwindOrder[j] == nil || s.unwindOrder[j].Disabled || s.unwindOrder[j].Unwind == nil { @@ -405,11 +404,9 @@ func (s *Sync) Run(db kv.RwDB, txc wrap.TxContainer, initialCycle, firstCycle bo if stage.Disabled || stage.Forward == nil { s.logger.Trace(fmt.Sprintf("%s disabled. %s", stage.ID, stage.DisabledDescription)) - s.NextStage() continue } - if err := s.runStage(stage, db, txc, initialCycle, firstCycle, badBlockUnwind); err != nil { return false, err } @@ -450,6 +447,11 @@ func (s *Sync) Run(db kv.RwDB, txc wrap.TxContainer, initialCycle, firstCycle bo } s.currentStage = 0 + + if badBlockUnwind { + return false, errors.New("bad block unwinding") + } + return hasMore, nil } diff --git a/tests/bor/helper/miner.go b/tests/bor/helper/miner.go index eac225c4938..6a8e95563d3 100644 --- a/tests/bor/helper/miner.go +++ b/tests/bor/helper/miner.go @@ -78,16 +78,15 @@ func NewNodeConfig() *nodecfg.Config { } // InitNode initializes a node with the given genesis file and config -func InitMiner(ctx context.Context, genesis *types.Genesis, privKey *ecdsa.PrivateKey, withoutHeimdall bool, minerID int) (*node.Node, *eth.Ethereum, error) { +func InitMiner(ctx context.Context, dirName string, genesis *types.Genesis, privKey *ecdsa.PrivateKey, withoutHeimdall bool, minerID int) (*node.Node, *eth.Ethereum, error) { // Define the basic configurations for the Ethereum node - ddir, _ := os.MkdirTemp("", "") logger := log.New() nodeCfg := &nodecfg.Config{ Name: "erigon", Version: params.Version, - Dirs: datadir.New(ddir), + Dirs: datadir.New(dirName), P2P: p2p.Config{ ListenAddr: ":30303", ProtocolVersion: []uint{direct.ETH68, direct.ETH67}, // No need to specify direct.ETH66, because 1 sentry is used for both 66 and 67 @@ -125,13 +124,13 @@ func InitMiner(ctx context.Context, genesis *types.Genesis, privKey *ecdsa.Priva return nil, nil, err } - downloaderConfig, err := downloadercfg.New(datadir.New(ddir), nodeCfg.Version, torrentLogLevel, downloadRate, uploadRate, utils.TorrentPortFlag.Value, utils.TorrentConnsPerFileFlag.Value, utils.TorrentDownloadSlotsFlag.Value, []string{}, []string{}, "", true, utils.DbWriteMapFlag.Value) + downloaderConfig, err := downloadercfg.New(datadir.New(dirName), nodeCfg.Version, torrentLogLevel, downloadRate, uploadRate, utils.TorrentPortFlag.Value, utils.TorrentConnsPerFileFlag.Value, utils.TorrentDownloadSlotsFlag.Value, []string{}, []string{}, "", true, utils.DbWriteMapFlag.Value) if err != nil { return nil, nil, err } ethCfg := ðconfig.Config{ - Dirs: datadir.New(ddir), + Dirs: datadir.New(dirName), Genesis: genesis, NetworkID: genesis.Config.ChainID.Uint64(), TxPool: txpoolcfg.DefaultConfig, diff --git a/tests/bor/mining_test.go b/tests/bor/mining_test.go index 8a43b96b057..28ec1b76804 100644 --- a/tests/bor/mining_test.go +++ b/tests/bor/mining_test.go @@ -14,7 +14,6 @@ import ( "github.com/holiman/uint256" "github.com/erigontech/erigon-lib/chain/networkname" - "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/gointerfaces" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" txpool "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto" @@ -56,15 +55,14 @@ var ( // Example : CGO_CFLAGS="-D__BLST_PORTABLE__" go test -run ^TestMiningBenchmark$ github.com/erigontech/erigon/tests/bor -v -count=1 // In TestMiningBenchmark, we will test the mining performance. We will initialize a single node devnet and fire 5000 txs. We will measure the time it takes to include all the txs. This can be made more advcanced by increasing blockLimit and txsInTxpool. func TestMiningBenchmark(t *testing.T) { - if config3.EnableHistoryV4InTest { - t.Skip("TODO: [e4] implement me") - } + //if config3.EnableHistoryV4InTest { + // t.Skip("TODO: [e4] implement me") + //} log.Root().SetHandler(log.LvlFilterHandler(log.LvlWarn, log.StreamHandler(os.Stderr, log.TerminalFormat()))) fdlimit.Raise(2048) genesis := helper.InitGenesis("./testdata/genesis_2val.json", 64, networkname.BorE2ETestChain2ValName) - var stacks []*node.Node var ethbackends []*eth.Ethereum var enodes []string @@ -72,7 +70,7 @@ func TestMiningBenchmark(t *testing.T) { var txs []*types.Transaction for i := 0; i < 1; i++ { - stack, ethBackend, err := helper.InitMiner(context.Background(), &genesis, pkeys[i], true, i) + stack, ethBackend, err := helper.InitMiner(context.Background(), t.TempDir(), &genesis, pkeys[i], true, i) if err != nil { panic(err) } @@ -105,8 +103,11 @@ func TestMiningBenchmark(t *testing.T) { initNonce := uint64(0) for i := 0; i < txInTxpool; i++ { - txn := *newRandomTxWithNonce(false, initNonce+uint64(i), ethbackends[0].TxpoolServer()) - txs = append(txs, &txn) + txn, err := newRandomTxWithNonce(false, initNonce+uint64(i), ethbackends[0].TxpoolServer()) + if err != nil { + panic(err) + } + txs = append(txs, txn) } start := time.Now() @@ -118,7 +119,10 @@ func TestMiningBenchmark(t *testing.T) { if err != nil { panic(err) } - ethbackends[0].TxpoolServer().Add(context.Background(), &txpool.AddRequest{RlpTxs: [][]byte{buf.Bytes()}}) + _, err = ethbackends[0].TxpoolServer().Add(context.Background(), &txpool.AddRequest{RlpTxs: [][]byte{buf.Bytes()}}) + if err != nil { + panic(err) + } } for { @@ -126,7 +130,6 @@ func TestMiningBenchmark(t *testing.T) { if err != nil { panic(err) } - if pendingReply.PendingCount == 0 { break } @@ -141,17 +144,27 @@ func TestMiningBenchmark(t *testing.T) { } // newRandomTxWithNonce creates a new transaction with the given nonce. -func newRandomTxWithNonce(creation bool, nonce uint64, txPool txpool_proto.TxpoolServer) *types.Transaction { +func newRandomTxWithNonce(creation bool, nonce uint64, txPool txpool_proto.TxpoolServer) (tx *types.Transaction, err error) { var txn types.Transaction gasPrice := uint256.NewInt(100 * params.InitialBaseFee) if creation { - nonce, _ := txPool.Nonce(context.Background(), &txpool_proto.NonceRequest{Address: gointerfaces.ConvertAddressToH160(addr1)}) - txn, _ = types.SignTx(types.NewContractCreation(nonce.Nonce, uint256.NewInt(0), testGas, gasPrice, common.FromHex(testCode)), *types.LatestSignerForChainID(nil), pkey1) + var nonceReply *txpool_proto.NonceReply + nonceReply, err = txPool.Nonce(context.Background(), &txpool_proto.NonceRequest{Address: gointerfaces.ConvertAddressToH160(addr1)}) + if err != nil { + return nil, err + } + txn, err = types.SignTx(types.NewContractCreation(nonceReply.Nonce, uint256.NewInt(0), testGas, gasPrice, common.FromHex(testCode)), *types.LatestSignerForChainID(nil), pkey1) + if err != nil { + return nil, err + } } else { - txn, _ = types.SignTx(types.NewTransaction(nonce, addr2, uint256.NewInt(1000), params.TxGas, gasPrice, nil), *types.LatestSignerForChainID(nil), pkey1) + txn, err = types.SignTx(types.NewTransaction(nonce, addr2, uint256.NewInt(1000), params.TxGas, gasPrice, nil), *types.LatestSignerForChainID(nil), pkey1) + if err != nil { + return nil, err + } } - return &txn + return &txn, nil } diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index 1e0e57a3cbf..b7e6fef56f2 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -18,7 +18,6 @@ package shards import ( "context" - libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/gointerfaces" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 784e38a854a..3332710a049 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -393,6 +393,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ defer hd.lock.Unlock() var penalties []PenaltyItem var req *HeaderRequest + hd.anchorTree.Ascend(func(anchor *Anchor) bool { if anchor.blockHeight == 0 { //has no parent return true diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 5fe9fafd5e1..73bbeb61217 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/erigontech/erigon-lib/kv/membatchwithdb" "runtime" "time" @@ -32,7 +33,6 @@ import ( "github.com/erigontech/erigon-lib/direct" proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto" "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/membatchwithdb" "github.com/erigontech/erigon-lib/kv/rawdbv3" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/state" @@ -98,7 +98,8 @@ func StageLoop( t := time.Now() // Estimate the current top height seen from the peer - err := StageLoopIteration(ctx, db, wrap.TxContainer{}, sync, initialCycle, false, logger, blockReader, hook) + + err := StageLoopIteration(ctx, db, wrap.TxContainer{ /*Tx: tx, Doms: sd*/ }, sync, initialCycle, false, logger, blockReader, hook) if err != nil { if errors.Is(err, libcommon.ErrStopped) || errors.Is(err, context.Canceled) { return @@ -136,6 +137,7 @@ func ProcessFrozenBlocks(ctx context.Context, db kv.RwDB, blockReader services.F sawZeroBlocksTimes := 0 initialCycle, firstCycle := true, true for { + // run stages first time - it will download blocks if hook != nil { if err := db.View(ctx, func(tx kv.Tx) (err error) { err = hook.BeforeRun(tx, false) @@ -198,6 +200,7 @@ func ProcessFrozenBlocks(ctx context.Context, db kv.RwDB, blockReader services.F func StageLoopIteration(ctx context.Context, db kv.RwDB, txc wrap.TxContainer, sync *stagedsync.Sync, initialCycle, firstCycle bool, logger log.Logger, blockReader services.FullBlockReader, hook *Hook) (err error) { defer func() { if rec := recover(); rec != nil { + logger.Error("panic", "err", dbg.Stack()) err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack()) } }() // avoid crash because Erigon's core does many things @@ -238,10 +241,17 @@ func StageLoopIteration(ctx context.Context, db kv.RwDB, txc wrap.TxContainer, s return err } defer txc.Tx.Rollback() + txc.Doms, err = state.NewSharedDomains(txc.Tx, logger) + if err != nil { + logger.Error("NewSharedDomains err", "err", err) + return err + } + defer txc.Doms.Close() } if hook != nil { if err = hook.BeforeRun(txc.Tx, isSynced); err != nil { + logger.Error("BeforeRun err", "err", err) return err } } @@ -260,6 +270,7 @@ func StageLoopIteration(ctx context.Context, db kv.RwDB, txc wrap.TxContainer, s if errTx != nil { return errTx } + txc.Doms = nil commitTime = time.Since(commitStart) } @@ -429,20 +440,16 @@ func MiningStep(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, tmpDir err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack()) } }() // avoid crash because Erigon's core does many things - tx, err := db.BeginRo(ctx) if err != nil { return err } defer tx.Rollback() - var miningBatch kv.RwTx - mb := membatchwithdb.NewMemoryBatch(tx, tmpDir, logger) - defer mb.Rollback() - miningBatch = mb + defer mb.Close() - txc := wrap.TxContainer{Tx: miningBatch} + txc := wrap.TxContainer{Tx: mb} sd, err := state.NewSharedDomains(mb, logger) if err != nil { return err @@ -453,7 +460,6 @@ func MiningStep(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, tmpDir if _, err = mining.Run(nil, txc, false /* firstCycle */, false); err != nil { return err } - tx.Rollback() return nil }