Skip to content

Commit

Permalink
turbo-api: add a plug-in mechanism for possible Bloom and other filte…
Browse files Browse the repository at this point in the history
…rs (#1229)

* Add a convenience method to replace stages

* allow more parameters to the execution function

* add a plug-in point for stuff like bloom filters, etc

* add comments

* fixups
  • Loading branch information
mandrigin authored Oct 12, 2020
1 parent c4f7ff2 commit f3ce1ce
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 38 deletions.
11 changes: 10 additions & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,15 @@ func stageExec(ctx context.Context) error {
u := &stagedsync.UnwindState{Stage: stages.Execution, UnwindPoint: stage4.BlockNumber - unwind}
return stagedsync.UnwindExecutionStage(u, stage4, db, false)
}
return stagedsync.SpawnExecuteBlocksStage(stage4, db, bc.Config(), bc, bc.GetVMConfig(), block, ch, sm.Receipts, hdd, nil)
return stagedsync.SpawnExecuteBlocksStage(stage4, db,
bc.Config(), bc, bc.GetVMConfig(),
ch,
stagedsync.ExecuteBlockStageParams{
ToBlock: block, // limit execution to the specified block
WriteReceipts: sm.Receipts,
Hdd: hdd,
})

}

func stageIHash(ctx context.Context) error {
Expand Down Expand Up @@ -480,6 +488,7 @@ func newSync(quitCh <-chan struct{}, db ethdb.Database, tx ethdb.Database, hook
st, err := stagedsync.New(
stagedsync.DefaultStages(),
stagedsync.DefaultUnwindOrder(),
stagedsync.OptionalParameters{},
).Prepare(nil, chainConfig, bc, bc.GetVMConfig(), db, tx, "integration_test", sm, "", false, quitCh, nil, nil, func() error { return nil }, hook)
if err != nil {
panic(err)
Expand Down
12 changes: 11 additions & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"

"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/common/dbutils"

Expand Down Expand Up @@ -136,7 +137,16 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {

// set block limit of execute stage
st.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder) error {
if err := stagedsync.SpawnExecuteBlocksStage(stageState, tx, bc.Config(), bc, bc.GetVMConfig(), execToBlock, ch, sm.Receipts, hdd, changeSetHook); err != nil {
if err := stagedsync.SpawnExecuteBlocksStage(
stageState, tx,
bc.Config(), bc, bc.GetVMConfig(),
ch,
stagedsync.ExecuteBlockStageParams{
ToBlock: execToBlock, // limit execution to the specified block
WriteReceipts: sm.Receipts,
Hdd: hdd,
ChangeSetHook: changeSetHook,
}); err != nil {
return fmt.Errorf("spawnExecuteBlocksStage: %w", err)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions cmd/tg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func runTurboGeth(cliCtx *cli.Context) {
sync := stagedsync.New(
stagedsync.DefaultStages(),
stagedsync.DefaultUnwindOrder(),
stagedsync.OptionalParameters{},
)

ctx := utils.RootContext()
Expand Down
8 changes: 8 additions & 0 deletions cmd/tgcustom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"os"

"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/turbo/node"

Expand Down Expand Up @@ -72,6 +74,12 @@ func runTurboGeth(ctx *cli.Context) {
sync := stagedsync.New(
syncStages(ctx),
stagedsync.DefaultUnwindOrder(),
stagedsync.OptionalParameters{
StateReaderBuilder: func(getter ethdb.Getter) state.StateReader {
// put your custom caching code here
return state.NewPlainStateReader(getter)
},
},
)

// running a node and initializing a custom bucket with all default settings
Expand Down
1 change: 1 addition & 0 deletions eth/downloader/downloader_stagedsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newStagedSyncTester() (*stagedSyncTester, func()) {
stagedsync.New(
stagedsync.DefaultStages(),
stagedsync.DefaultUnwindOrder(),
stagedsync.OptionalParameters{},
),
)
clear := func() {
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type ProtocolManager struct {
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb *ethdb.ObjectDatabase, whitelist map[uint64]common.Hash, stagedSync *stagedsync.StagedSync) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
if stagedSync == nil {
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder())
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{})
}
manager := &ProtocolManager{
networkID: networkID,
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/all_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, engine c
if err := SpawnExecuteBlocksStage(&StageState{
Stage: stages.Execution,
BlockNumber: num - 1,
}, db, config, bc, bc.GetVMConfig(), 0, nil, true, false, nil); err != nil {
}, db, config, bc, bc.GetVMConfig(), nil, ExecuteBlockStageParams{WriteReceipts: true}); err != nil {
return err
}

Expand Down
35 changes: 25 additions & 10 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/ledgerwatch/turbo-geth/ethdb/cbor"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
Expand All @@ -34,14 +35,24 @@ type HasChangeSetWriter interface {

type ChangeSetHook func(blockNum uint64, wr *state.ChangeSetWriter)

func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, toBlock uint64, quit <-chan struct{}, writeReceipts bool, hdd bool, changeSetHook ChangeSetHook) error {
type StateReaderBuilder func(ethdb.Getter) state.StateReader

type ExecuteBlockStageParams struct {
ToBlock uint64 // not setting this params means no limit
WriteReceipts bool
Hdd bool
ChangeSetHook ChangeSetHook
ReaderBuilder StateReaderBuilder
}

func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, quit <-chan struct{}, params ExecuteBlockStageParams) error {
prevStageProgress, _, errStart := stages.GetStageProgress(stateDB, stages.Senders)
if errStart != nil {
return errStart
}
var to = prevStageProgress
if toBlock > 0 {
to = min(prevStageProgress, toBlock)
if params.ToBlock > 0 {
to = min(prevStageProgress, params.ToBlock)
}
if to <= s.BlockNumber {
s.Done()
Expand Down Expand Up @@ -85,7 +96,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
stageProgress := s.BlockNumber
logBlock := stageProgress
// Warmup only works for HDD sync, and for long ranges
var warmup = hdd && (to-s.BlockNumber) > 30000
var warmup = params.Hdd && (to-s.BlockNumber) > 30000

for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if err := common.Stopped(quit); err != nil {
Expand Down Expand Up @@ -122,7 +133,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
var stateReader state.StateReader
var stateWriter state.WriterWithChangeSets

stateReader = state.NewPlainStateReader(batch)
if params.ReaderBuilder != nil {
stateReader = params.ReaderBuilder(batch)
} else {
stateReader = state.NewPlainStateReader(batch)
}
stateWriter = state.NewPlainStateWriter(batch, tx, blockNum)

// where the magic happens
Expand All @@ -131,7 +146,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
return err
}

if writeReceipts {
if params.WriteReceipts {
if err = appendReceipts(tx, receipts, block.NumberU64(), block.Hash()); err != nil {
return err
}
Expand All @@ -149,7 +164,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
return err
}
}
warmup = hdd && (to-blockNum) > 30000
warmup = params.Hdd && (to-blockNum) > 30000
}

if prof {
Expand All @@ -159,9 +174,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
}

if changeSetHook != nil {
if params.ChangeSetHook != nil {
if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok {
changeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
params.ChangeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
}
}

Expand Down
48 changes: 41 additions & 7 deletions eth/stagedsync/stagebuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package stagedsync

import (
"fmt"
"strings"
"time"

"github.com/ledgerwatch/turbo-geth/core"
Expand Down Expand Up @@ -28,12 +30,13 @@ type StageParameters struct {
datadir string
// QuitCh is a channel that is closed. This channel is useful to listen to when
// the stage can take significant time and gracefully shutdown at Ctrl+C.
QuitCh <-chan struct{}
headersFetchers []func() error
txPool *core.TxPool
poolStart func() error
changeSetHook ChangeSetHook
prefetchedBlocks *PrefetchedBlocks
QuitCh <-chan struct{}
headersFetchers []func() error
txPool *core.TxPool
poolStart func() error
changeSetHook ChangeSetHook
prefetchedBlocks *PrefetchedBlocks
stateReaderBuilder StateReaderBuilder
}

// StageBuilder represent an object to create a single stage for staged sync
Expand All @@ -47,6 +50,29 @@ type StageBuilder struct {
// StageBuilders represents an ordered list of builders to build different stages. It also contains helper methods to change the list of stages.
type StageBuilders []StageBuilder

// MustReplace finds a stage with a specific ID and then sets the new one instead of that.
// Chainable but panics if it can't find stage to replace.
func (bb StageBuilders) MustReplace(id stages.SyncStage, newBuilder StageBuilder) StageBuilders {
result := make([]StageBuilder, len(bb))

found := false

for i, originalBuilder := range bb {
if strings.EqualFold(string(originalBuilder.ID), string(id)) {
found = true
result[i] = newBuilder
} else {
result[i] = originalBuilder
}
}

if !found {
panic(fmt.Sprintf("StageBuilders#Replace can't find the stage with id %s", string(id)))
}

return result
}

// Build creates sync states out of builders
func (bb StageBuilders) Build(world StageParameters) []*Stage {
stages := make([]*Stage, len(bb))
Expand Down Expand Up @@ -140,7 +166,15 @@ func DefaultStages() StageBuilders {
ID: stages.Execution,
Description: "Execute blocks w/o hash checks",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnExecuteBlocksStage(s, world.TX, world.chainConfig, world.chainContext, world.vmConfig, 0 /* limit (meaning no limit) */, world.QuitCh, world.storageMode.Receipts, world.hdd, world.changeSetHook)
return SpawnExecuteBlocksStage(s, world.TX,
world.chainConfig, world.chainContext, world.vmConfig,
world.QuitCh,
ExecuteBlockStageParams{
WriteReceipts: world.storageMode.Receipts,
Hdd: world.hdd,
ChangeSetHook: world.changeSetHook,
ReaderBuilder: world.stateReaderBuilder,
})
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindExecutionStage(u, s, world.TX, world.storageMode.Receipts)
Expand Down
53 changes: 36 additions & 17 deletions eth/stagedsync/stagedsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stagedsync

import (
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/params"
Expand All @@ -13,13 +14,23 @@ type StagedSync struct {
PrefetchedBlocks *PrefetchedBlocks
stageBuilders StageBuilders
unwindOrder UnwindOrder
params OptionalParameters
}

func New(stages StageBuilders, unwindOrder UnwindOrder) *StagedSync {
// OptionalParameters contains any non-necessary parateres you can specify to fine-tune
// and experiment on StagedSync.
type OptionalParameters struct {
// StateReaderBuilder is a function that returns state reader for the block execution stage.
// It can be used to add someting like bloom filters to figure out non-existing accounts and similar experiments.
StateReaderBuilder StateReaderBuilder
}

func New(stages StageBuilders, unwindOrder UnwindOrder, params OptionalParameters) *StagedSync {
return &StagedSync{
PrefetchedBlocks: NewPrefetchedBlocks(),
stageBuilders: stages,
unwindOrder: unwindOrder,
params: params,
}
}

Expand All @@ -40,24 +51,32 @@ func (stagedSync *StagedSync) Prepare(
poolStart func() error,
changeSetHook ChangeSetHook,
) (*State, error) {
var readerBuilder StateReaderBuilder
if stagedSync.params.StateReaderBuilder != nil {
readerBuilder = stagedSync.params.StateReaderBuilder
} else {
readerBuilder = func(getter ethdb.Getter) state.StateReader { return state.NewPlainStateReader(getter) }
}

stages := stagedSync.stageBuilders.Build(
StageParameters{
d: d,
chainConfig: chainConfig,
chainContext: chainContext,
vmConfig: vmConfig,
db: db,
TX: tx,
pid: pid,
storageMode: storageMode,
datadir: datadir,
QuitCh: quitCh,
headersFetchers: headersFetchers,
txPool: txPool,
poolStart: poolStart,
changeSetHook: changeSetHook,
hdd: hdd,
prefetchedBlocks: stagedSync.PrefetchedBlocks,
d: d,
chainConfig: chainConfig,
chainContext: chainContext,
vmConfig: vmConfig,
db: db,
TX: tx,
pid: pid,
storageMode: storageMode,
datadir: datadir,
QuitCh: quitCh,
headersFetchers: headersFetchers,
txPool: txPool,
poolStart: poolStart,
changeSetHook: changeSetHook,
hdd: hdd,
prefetchedBlocks: stagedSync.PrefetchedBlocks,
stateReaderBuilder: readerBuilder,
},
)
state := NewState(stages)
Expand Down

0 comments on commit f3ce1ce

Please sign in to comment.