Skip to content

Commit

Permalink
Revert "StageSenders: --sync.loop.block.limit support" (#10060)
Browse files Browse the repository at this point in the history
Reverts #9982
  • Loading branch information
AskAlexSharov authored Apr 25, 2024
1 parent a700f92 commit cb73f91
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return err
}

cfg := stagedsync.StageSendersCfg(db, chainConfig, sync.Cfg(), false, tmpdir, pm, br, nil, nil)
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, nil, nil)
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
Expand Down
9 changes: 1 addition & 8 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/eth/ethconfig"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -46,10 +45,9 @@ type SendersCfg struct {
hd *headerdownload.HeaderDownload
blockReader services.FullBlockReader
loopBreakCheck func(int) bool
syncCfg ethconfig.Sync
}

func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
const sendersBatchSize = 10000
const sendersBlockSize = 4096

Expand All @@ -67,7 +65,6 @@ func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync,
hd: hd,
blockReader: blockReader,
loopBreakCheck: loopBreakCheck,
syncCfg: syncCfg,
}
}

Expand Down Expand Up @@ -109,10 +106,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R

startFrom := s.BlockNumber + 1

if to > startFrom && cfg.syncCfg.LoopBlockLimit > 0 && to-startFrom > uint64(cfg.syncCfg.LoopBlockLimit) { // uint underflow protection. preserve global jump limit.
to = startFrom + uint64(cfg.syncCfg.LoopBlockLimit)
}

jobs := make(chan *senderRecoveryJob, cfg.batchSize)
out := make(chan *senderRecoveryJob, cfg.batchSize)
wg := new(sync.WaitGroup)
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_senders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/turbo/stages/mock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -129,7 +128,7 @@ func TestSenders(t *testing.T) {

require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))

cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, ethconfig.Defaults.Sync, false, "", prune.Mode{}, br, nil, nil)
cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, br, nil, nil)
err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New())
require.NoError(err)

Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (s *Sync) Len() int {
return len(s.stages)
}

func (s *Sync) Cfg() ethconfig.Sync { return s.cfg }

func (s *Sync) UnwindPoint() uint64 {
return *s.unwindPoint
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil /* heimdallClient */, mock.BlockReader, nil, nil, nil, recents, signatures),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, cfg.HistoryV3, blockWriter, nil),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,
Expand Down
8 changes: 4 additions & 4 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func NewDefaultStages(ctx context.Context,
stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, blockReader, controlServer.Hd, controlServer.Penalize, loopBreakCheck, recents, signatures),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down Expand Up @@ -581,7 +581,7 @@ func NewPipelineStages(ctx context.Context,
return stagedsync.PipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down Expand Up @@ -616,7 +616,7 @@ func NewPipelineStages(ctx context.Context,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications, loopBreakCheck),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
Expand Down Expand Up @@ -658,7 +658,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, nil),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down

0 comments on commit cb73f91

Please sign in to comment.