diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 60aba1262db..e6abc44bbdf 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -265,7 +265,15 @@ func stageExec(ctx context.Context) error { u := &stagedsync.UnwindState{Stage: stages.Execution, UnwindPoint: stage4.BlockNumber - unwind} return stagedsync.UnwindExecutionStage(u, stage4, db, false) } - return stagedsync.SpawnExecuteBlocksStage(stage4, db, bc.Config(), bc, bc.GetVMConfig(), block, ch, sm.Receipts, hdd, nil) + return stagedsync.SpawnExecuteBlocksStage(stage4, db, + bc.Config(), bc, bc.GetVMConfig(), + ch, + stagedsync.ExecuteBlockStageParams{ + ToBlock: block, // limit execution to the specified block + WriteReceipts: sm.Receipts, + Hdd: hdd, + }) + } func stageIHash(ctx context.Context) error { @@ -480,6 +488,7 @@ func newSync(quitCh <-chan struct{}, db ethdb.Database, tx ethdb.Database, hook st, err := stagedsync.New( stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), + stagedsync.OptionalParameters{}, ).Prepare(nil, chainConfig, bc, bc.GetVMConfig(), db, tx, "integration_test", sm, "", false, quitCh, nil, nil, func() error { return nil }, hook) if err != nil { panic(err) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 61dd0a64d50..65ea7d44bd8 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/ledgerwatch/turbo-geth/cmd/utils" "github.com/ledgerwatch/turbo-geth/common/dbutils" @@ -136,7 +137,16 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error { // set block limit of execute stage st.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder) error { - if err := stagedsync.SpawnExecuteBlocksStage(stageState, tx, bc.Config(), bc, bc.GetVMConfig(), execToBlock, ch, sm.Receipts, hdd, changeSetHook); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage( + stageState, tx, + bc.Config(), bc, bc.GetVMConfig(), + ch, + stagedsync.ExecuteBlockStageParams{ + ToBlock: execToBlock, // limit execution to the specified block + WriteReceipts: sm.Receipts, + Hdd: hdd, + ChangeSetHook: changeSetHook, + }); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } return nil diff --git a/cmd/tg/main.go b/cmd/tg/main.go index 3cb6d19235c..da827102e05 100644 --- a/cmd/tg/main.go +++ b/cmd/tg/main.go @@ -32,6 +32,7 @@ func runTurboGeth(cliCtx *cli.Context) { sync := stagedsync.New( stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), + stagedsync.OptionalParameters{}, ) ctx := utils.RootContext() diff --git a/cmd/tgcustom/main.go b/cmd/tgcustom/main.go index 016e0542635..b4d8f02d5b6 100644 --- a/cmd/tgcustom/main.go +++ b/cmd/tgcustom/main.go @@ -5,8 +5,10 @@ import ( "os" "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/state" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/turbo/node" @@ -72,6 +74,12 @@ func runTurboGeth(ctx *cli.Context) { sync := stagedsync.New( syncStages(ctx), stagedsync.DefaultUnwindOrder(), + stagedsync.OptionalParameters{ + StateReaderBuilder: func(getter ethdb.Getter) state.StateReader { + // put your custom caching code here + return state.NewPlainStateReader(getter) + }, + }, ) // running a node and initializing a custom bucket with all default settings diff --git a/eth/downloader/downloader_stagedsync_test.go b/eth/downloader/downloader_stagedsync_test.go index f86b28e455c..26cdf0b7cd7 100644 --- a/eth/downloader/downloader_stagedsync_test.go +++ b/eth/downloader/downloader_stagedsync_test.go @@ -46,6 +46,7 @@ func newStagedSyncTester() (*stagedSyncTester, func()) { stagedsync.New( stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), + stagedsync.OptionalParameters{}, ), ) clear := func() { diff --git a/eth/handler.go b/eth/handler.go index bb61e5de05e..7ab804202b4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -112,7 +112,7 @@ type ProtocolManager struct { func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb *ethdb.ObjectDatabase, whitelist map[uint64]common.Hash, stagedSync *stagedsync.StagedSync) (*ProtocolManager, error) { // Create the protocol manager with the base fields if stagedSync == nil { - stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder()) + stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{}) } manager := &ProtocolManager{ networkID: networkID, diff --git a/eth/stagedsync/all_stages.go b/eth/stagedsync/all_stages.go index 264209defe8..4837afc27ae 100644 --- a/eth/stagedsync/all_stages.go +++ b/eth/stagedsync/all_stages.go @@ -73,7 +73,7 @@ func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, engine c if err := SpawnExecuteBlocksStage(&StageState{ Stage: stages.Execution, BlockNumber: num - 1, - }, db, config, bc, bc.GetVMConfig(), 0, nil, true, false, nil); err != nil { + }, db, config, bc, bc.GetVMConfig(), nil, ExecuteBlockStageParams{WriteReceipts: true}); err != nil { return err } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 76e8cd6e2e5..cae2bb918f9 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/ledgerwatch/turbo-geth/ethdb/cbor" "os" "runtime" "runtime/pprof" "time" + "github.com/ledgerwatch/turbo-geth/ethdb/cbor" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" @@ -34,14 +35,24 @@ type HasChangeSetWriter interface { type ChangeSetHook func(blockNum uint64, wr *state.ChangeSetWriter) -func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, toBlock uint64, quit <-chan struct{}, writeReceipts bool, hdd bool, changeSetHook ChangeSetHook) error { +type StateReaderBuilder func(ethdb.Getter) state.StateReader + +type ExecuteBlockStageParams struct { + ToBlock uint64 // not setting this params means no limit + WriteReceipts bool + Hdd bool + ChangeSetHook ChangeSetHook + ReaderBuilder StateReaderBuilder +} + +func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, quit <-chan struct{}, params ExecuteBlockStageParams) error { prevStageProgress, _, errStart := stages.GetStageProgress(stateDB, stages.Senders) if errStart != nil { return errStart } var to = prevStageProgress - if toBlock > 0 { - to = min(prevStageProgress, toBlock) + if params.ToBlock > 0 { + to = min(prevStageProgress, params.ToBlock) } if to <= s.BlockNumber { s.Done() @@ -85,7 +96,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig stageProgress := s.BlockNumber logBlock := stageProgress // Warmup only works for HDD sync, and for long ranges - var warmup = hdd && (to-s.BlockNumber) > 30000 + var warmup = params.Hdd && (to-s.BlockNumber) > 30000 for blockNum := stageProgress + 1; blockNum <= to; blockNum++ { if err := common.Stopped(quit); err != nil { @@ -122,7 +133,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig var stateReader state.StateReader var stateWriter state.WriterWithChangeSets - stateReader = state.NewPlainStateReader(batch) + if params.ReaderBuilder != nil { + stateReader = params.ReaderBuilder(batch) + } else { + stateReader = state.NewPlainStateReader(batch) + } stateWriter = state.NewPlainStateWriter(batch, tx, blockNum) // where the magic happens @@ -131,7 +146,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig return err } - if writeReceipts { + if params.WriteReceipts { if err = appendReceipts(tx, receipts, block.NumberU64(), block.Hash()); err != nil { return err } @@ -149,7 +164,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig return err } } - warmup = hdd && (to-blockNum) > 30000 + warmup = params.Hdd && (to-blockNum) > 30000 } if prof { @@ -159,9 +174,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig } } - if changeSetHook != nil { + if params.ChangeSetHook != nil { if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok { - changeSetHook(blockNum, hasChangeSet.ChangeSetWriter()) + params.ChangeSetHook(blockNum, hasChangeSet.ChangeSetWriter()) } } diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index c3890b0ac16..bfbb7889aed 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -1,6 +1,8 @@ package stagedsync import ( + "fmt" + "strings" "time" "github.com/ledgerwatch/turbo-geth/core" @@ -28,12 +30,13 @@ type StageParameters struct { datadir string // QuitCh is a channel that is closed. This channel is useful to listen to when // the stage can take significant time and gracefully shutdown at Ctrl+C. - QuitCh <-chan struct{} - headersFetchers []func() error - txPool *core.TxPool - poolStart func() error - changeSetHook ChangeSetHook - prefetchedBlocks *PrefetchedBlocks + QuitCh <-chan struct{} + headersFetchers []func() error + txPool *core.TxPool + poolStart func() error + changeSetHook ChangeSetHook + prefetchedBlocks *PrefetchedBlocks + stateReaderBuilder StateReaderBuilder } // StageBuilder represent an object to create a single stage for staged sync @@ -47,6 +50,29 @@ type StageBuilder struct { // StageBuilders represents an ordered list of builders to build different stages. It also contains helper methods to change the list of stages. type StageBuilders []StageBuilder +// MustReplace finds a stage with a specific ID and then sets the new one instead of that. +// Chainable but panics if it can't find stage to replace. +func (bb StageBuilders) MustReplace(id stages.SyncStage, newBuilder StageBuilder) StageBuilders { + result := make([]StageBuilder, len(bb)) + + found := false + + for i, originalBuilder := range bb { + if strings.EqualFold(string(originalBuilder.ID), string(id)) { + found = true + result[i] = newBuilder + } else { + result[i] = originalBuilder + } + } + + if !found { + panic(fmt.Sprintf("StageBuilders#Replace can't find the stage with id %s", string(id))) + } + + return result +} + // Build creates sync states out of builders func (bb StageBuilders) Build(world StageParameters) []*Stage { stages := make([]*Stage, len(bb)) @@ -140,7 +166,15 @@ func DefaultStages() StageBuilders { ID: stages.Execution, Description: "Execute blocks w/o hash checks", ExecFunc: func(s *StageState, u Unwinder) error { - return SpawnExecuteBlocksStage(s, world.TX, world.chainConfig, world.chainContext, world.vmConfig, 0 /* limit (meaning no limit) */, world.QuitCh, world.storageMode.Receipts, world.hdd, world.changeSetHook) + return SpawnExecuteBlocksStage(s, world.TX, + world.chainConfig, world.chainContext, world.vmConfig, + world.QuitCh, + ExecuteBlockStageParams{ + WriteReceipts: world.storageMode.Receipts, + Hdd: world.hdd, + ChangeSetHook: world.changeSetHook, + ReaderBuilder: world.stateReaderBuilder, + }) }, UnwindFunc: func(u *UnwindState, s *StageState) error { return UnwindExecutionStage(u, s, world.TX, world.storageMode.Receipts) diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go index 01f53721f8c..5556fc1e9a6 100644 --- a/eth/stagedsync/stagedsync.go +++ b/eth/stagedsync/stagedsync.go @@ -2,6 +2,7 @@ package stagedsync import ( "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/core/state" "github.com/ledgerwatch/turbo-geth/core/vm" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/params" @@ -13,13 +14,23 @@ type StagedSync struct { PrefetchedBlocks *PrefetchedBlocks stageBuilders StageBuilders unwindOrder UnwindOrder + params OptionalParameters } -func New(stages StageBuilders, unwindOrder UnwindOrder) *StagedSync { +// OptionalParameters contains any non-necessary parateres you can specify to fine-tune +// and experiment on StagedSync. +type OptionalParameters struct { + // StateReaderBuilder is a function that returns state reader for the block execution stage. + // It can be used to add someting like bloom filters to figure out non-existing accounts and similar experiments. + StateReaderBuilder StateReaderBuilder +} + +func New(stages StageBuilders, unwindOrder UnwindOrder, params OptionalParameters) *StagedSync { return &StagedSync{ PrefetchedBlocks: NewPrefetchedBlocks(), stageBuilders: stages, unwindOrder: unwindOrder, + params: params, } } @@ -40,24 +51,32 @@ func (stagedSync *StagedSync) Prepare( poolStart func() error, changeSetHook ChangeSetHook, ) (*State, error) { + var readerBuilder StateReaderBuilder + if stagedSync.params.StateReaderBuilder != nil { + readerBuilder = stagedSync.params.StateReaderBuilder + } else { + readerBuilder = func(getter ethdb.Getter) state.StateReader { return state.NewPlainStateReader(getter) } + } + stages := stagedSync.stageBuilders.Build( StageParameters{ - d: d, - chainConfig: chainConfig, - chainContext: chainContext, - vmConfig: vmConfig, - db: db, - TX: tx, - pid: pid, - storageMode: storageMode, - datadir: datadir, - QuitCh: quitCh, - headersFetchers: headersFetchers, - txPool: txPool, - poolStart: poolStart, - changeSetHook: changeSetHook, - hdd: hdd, - prefetchedBlocks: stagedSync.PrefetchedBlocks, + d: d, + chainConfig: chainConfig, + chainContext: chainContext, + vmConfig: vmConfig, + db: db, + TX: tx, + pid: pid, + storageMode: storageMode, + datadir: datadir, + QuitCh: quitCh, + headersFetchers: headersFetchers, + txPool: txPool, + poolStart: poolStart, + changeSetHook: changeSetHook, + hdd: hdd, + prefetchedBlocks: stagedSync.PrefetchedBlocks, + stateReaderBuilder: readerBuilder, }, ) state := NewState(stages)