diff --git a/eth/backend.go b/eth/backend.go index 230ce464b7f..eadd8c39b3b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -560,7 +560,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger heimdallClient = heimdall.NewHeimdallClient(config.HeimdallURL, logger) } - if config.PolygonSync || config.PolygonSyncStage { + if config.PolygonSync { polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, consensusConfig.(*borcfg.BorConfig), heimdallClient.FetchStateSyncEvents, bor.GenesisContractStateReceiverABI()) } @@ -884,7 +884,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger p2pConfig.MaxPeers, statusDataProvider, backend.stopNode, - polygonBridge, ) backend.syncUnwindOrder = stagedsync.PolygonSyncUnwindOrder backend.syncPruneOrder = stagedsync.PolygonSyncPruneOrder diff --git a/eth/stagedsync/stage_polygon_sync.go b/eth/stagedsync/stage_polygon_sync.go index 1e03d3102c3..18409f5798d 100644 --- a/eth/stagedsync/stage_polygon_sync.go +++ b/eth/stagedsync/stage_polygon_sync.go @@ -18,9 +18,12 @@ package stagedsync import ( "context" + "encoding/binary" + "encoding/json" "errors" "fmt" "math/big" + "slices" "time" "golang.org/x/sync/errgroup" @@ -37,13 +40,14 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/p2p/sentry" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" - "github.com/ledgerwatch/erigon/polygon/bridge" "github.com/ledgerwatch/erigon/polygon/heimdall" "github.com/ledgerwatch/erigon/polygon/p2p" polygonsync "github.com/ledgerwatch/erigon/polygon/sync" "github.com/ledgerwatch/erigon/turbo/services" ) +var updateForkChoiceSuccessErr = errors.New("update fork choice success") + func NewPolygonSyncStageCfg( logger log.Logger, chainConfig *chain.Config, @@ -56,25 +60,39 @@ func NewPolygonSyncStageCfg( stopNode func() error, stateReceiverABI abi.ABI, blockLimit uint, - polygonBridge bridge.Service, ) PolygonSyncStageCfg { - dataStream := make(chan polygonSyncStageDataItem) - storage := &polygonSyncStageStorage{ - db: db, - blockReader: blockReader, - dataStream: dataStream, - } + txActionStream := make(chan polygonSyncStageTxAction) executionEngine := &polygonSyncStageExecutionEngine{ - db: db, - blockReader: blockReader, - dataStream: dataStream, + blockReader: blockReader, + txActionStream: txActionStream, + logger: logger, + heimdallClient: heimdallClient, + stateReceiverABI: stateReceiverABI, + chainConfig: chainConfig, } + heimdallStore := &polygonSyncStageHeimdallStore{ + checkpoints: &polygonSyncStageCheckpointStore{ + checkpointReader: blockReader, + txActionStream: txActionStream, + }, + milestones: &polygonSyncStageMilestoneStore{ + milestoneReader: blockReader, + txActionStream: txActionStream, + }, + spans: &polygonSyncStageSpanStore{ + spanReader: blockReader, + txActionStream: txActionStream, + }, + } + heimdallService := heimdall.NewService(heimdallClient, heimdallStore, logger) + borConfig := chainConfig.Bor.(*borcfg.BorConfig) p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData) checkpointVerifier := polygonsync.VerifyCheckpointHeaders milestoneVerifier := polygonsync.VerifyMilestoneHeaders blocksVerifier := polygonsync.VerifyBlocks - heimdallService := heimdall.NewHeimdall(heimdallClient, logger, heimdall.WithStore(storage)) - borConfig := chainConfig.Bor.(*borcfg.BorConfig) + syncStore := &polygonSyncStageSyncStore{ + executionEngine: executionEngine, + } blockDownloader := polygonsync.NewBlockDownloader( logger, p2pService, @@ -82,13 +100,13 @@ func NewPolygonSyncStageCfg( checkpointVerifier, milestoneVerifier, blocksVerifier, - storage, + syncStore, blockLimit, ) spansCache := polygonsync.NewSpansCache() events := polygonsync.NewTipEvents(logger, p2pService, heimdallService) sync := polygonsync.NewSync( - storage, + syncStore, executionEngine, milestoneVerifier, blocksVerifier, @@ -101,17 +119,14 @@ func NewPolygonSyncStageCfg( logger, ) syncService := &polygonSyncStageService{ - logger: logger, - chainConfig: chainConfig, - blockReader: blockReader, - bridge: polygonBridge, - sync: sync, - events: events, - p2p: p2pService, - heimdallClient: heimdallClient, - stateReceiverABI: stateReceiverABI, - dataStream: dataStream, - stopNode: stopNode, + logger: logger, + sync: sync, + events: events, + p2p: p2pService, + executionEngine: executionEngine, + heimdall: heimdallService, + txActionStream: txActionStream, + stopNode: stopNode, } return PolygonSyncStageCfg{ db: db, @@ -166,52 +181,41 @@ func PrunePolygonSyncStage() error { return nil } -type polygonSyncStageDataItem struct { - updateForkChoice *types.Header - insertBlocks []*types.Block - span *heimdall.Span - milestone *heimdall.Milestone - checkpoint *heimdall.Checkpoint +type polygonSyncStageTxAction struct { + apply func(tx kv.RwTx) error } type polygonSyncStageService struct { - logger log.Logger - chainConfig *chain.Config - blockReader services.FullBlockReader - bridge bridge.Service - sync *polygonsync.Sync - events *polygonsync.TipEvents - p2p p2p.Service - heimdallClient heimdall.HeimdallClient - stateReceiverABI abi.ABI - dataStream <-chan polygonSyncStageDataItem - stopNode func() error + logger log.Logger + sync *polygonsync.Sync + events *polygonsync.TipEvents + p2p p2p.Service + executionEngine *polygonSyncStageExecutionEngine + heimdall heimdall.Service + txActionStream <-chan polygonSyncStageTxAction + stopNode func() error // internal - appendLogPrefix func(string) string - stageState *StageState - unwinder Unwinder - cachedForkChoice *types.Header - lastStateSyncEventId uint64 - lastStateSyncEventIdInit bool - bgComponentsRun bool - bgComponentsErr chan error + appendLogPrefix func(string) string + bgComponentsRun bool + bgComponentsErr chan error } func (s *polygonSyncStageService) Run(ctx context.Context, tx kv.RwTx, stageState *StageState, unwinder Unwinder) error { s.appendLogPrefix = newAppendLogPrefix(stageState.LogPrefix()) - s.stageState = stageState - s.unwinder = unwinder + s.executionEngine.appendLogPrefix = s.appendLogPrefix + s.executionEngine.stageState = stageState + s.executionEngine.unwinder = unwinder s.logger.Info(s.appendLogPrefix("begin..."), "progress", stageState.BlockNumber) s.runBgComponentsOnce(ctx) - if s.cachedForkChoice != nil { - err := s.handleUpdateForkChoice(tx, s.cachedForkChoice) + if s.executionEngine.cachedForkChoice != nil { + err := s.executionEngine.UpdateForkChoice(ctx, s.executionEngine.cachedForkChoice, nil) if err != nil { return err } - s.cachedForkChoice = nil + s.executionEngine.cachedForkChoice = nil return nil } @@ -231,21 +235,10 @@ func (s *polygonSyncStageService) Run(ctx context.Context, tx kv.RwTx, stageStat // use ErrStopped to exit the stage loop return fmt.Errorf("%w: %w", common.ErrStopped, err) - case data := <-s.dataStream: - var err error - if data.updateForkChoice != nil { - // exit stage upon update fork choice - return s.handleUpdateForkChoice(tx, data.updateForkChoice) - } else if len(data.insertBlocks) > 0 { - err = s.handleInsertBlocks(ctx, tx, data.insertBlocks) - } else if data.span != nil { - err = s.handleSpan(ctx, tx, data.span) - } else if data.checkpoint != nil { - err = s.handleCheckpoint(ctx, tx, data.checkpoint) - } else if data.milestone != nil { - err = s.handleMilestone(ctx, tx, data.milestone) - } else { - err = errors.New("unrecognized data") + case txAction := <-s.txActionStream: + err := txAction.apply(tx) + if errors.Is(err, updateForkChoiceSuccessErr) { + return nil } if err != nil { return err @@ -270,11 +263,9 @@ func (s *polygonSyncStageService) runBgComponentsOnce(ctx context.Context) { return s.events.Run(ctx) }) - if s.bridge != nil { - eg.Go(func() error { - return s.bridge.Run(ctx) - }) - } + eg.Go(func() error { + return s.heimdall.Run(ctx) + }) eg.Go(func() error { s.p2p.Run(ctx) @@ -296,7 +287,445 @@ func (s *polygonSyncStageService) runBgComponentsOnce(ctx context.Context) { }() } -func (s *polygonSyncStageService) handleInsertBlocks(ctx context.Context, tx kv.RwTx, blocks []*types.Block) error { +type polygonSyncStageSyncStore struct { + executionEngine *polygonSyncStageExecutionEngine +} + +func (s *polygonSyncStageSyncStore) InsertBlocks(ctx context.Context, blocks []*types.Block) error { + return s.executionEngine.InsertBlocks(ctx, blocks) +} + +func (s *polygonSyncStageSyncStore) Flush(context.Context, *types.Header) error { + return nil +} + +func (s *polygonSyncStageSyncStore) Run(context.Context) error { + return nil +} + +type polygonSyncStageHeimdallStore struct { + checkpoints *polygonSyncStageCheckpointStore + milestones *polygonSyncStageMilestoneStore + spans *polygonSyncStageSpanStore +} + +func (s polygonSyncStageHeimdallStore) Checkpoints() heimdall.EntityStore[*heimdall.Checkpoint] { + return s.checkpoints +} + +func (s polygonSyncStageHeimdallStore) Milestones() heimdall.EntityStore[*heimdall.Milestone] { + return s.milestones +} + +func (s polygonSyncStageHeimdallStore) Spans() heimdall.EntityStore[*heimdall.Span] { + return s.spans +} + +func (s polygonSyncStageHeimdallStore) Prepare(_ context.Context) error { + return nil +} + +func (s polygonSyncStageHeimdallStore) Close() { + // no-op +} + +type polygonSyncStageCheckpointStore struct { + checkpointReader services.BorCheckpointReader + txActionStream chan<- polygonSyncStageTxAction +} + +func (s polygonSyncStageCheckpointStore) GetLastEntityId(ctx context.Context) (uint64, bool, error) { + type response struct { + id uint64 + ok bool + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + id, ok, err := s.checkpointReader.LastCheckpointId(ctx, tx) + responseStream <- response{id: id, ok: ok, err: err} + return nil + }) + if err != nil { + return 0, false, err + } + + return r.id, r.ok, r.err +} + +func (s polygonSyncStageCheckpointStore) GetLastEntity(ctx context.Context) (cp *heimdall.Checkpoint, err error) { + id, ok, err := s.GetLastEntityId(ctx) + if err != nil { + return nil, err + } + if !ok { + return nil, errors.New("last checkpoint not found") + } + + return s.GetEntity(ctx, id) +} + +func (s polygonSyncStageCheckpointStore) GetEntity(ctx context.Context, id uint64) (*heimdall.Checkpoint, error) { + type response struct { + v []byte + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + v, err := s.checkpointReader.Checkpoint(ctx, tx, id) + responseStream <- response{v: v, err: err} + return nil + }) + if err != nil { + return nil, err + } + if r.err != nil { + return nil, r.err + } + + var c heimdall.Checkpoint + err = json.Unmarshal(r.v, &c) + return &c, err +} + +func (s polygonSyncStageCheckpointStore) PutEntity(ctx context.Context, id uint64, entity *heimdall.Checkpoint) error { + entity.Id = heimdall.CheckpointId(id) + + var k [8]byte + binary.BigEndian.PutUint64(k[:], id) + + v, err := json.Marshal(entity) + if err != nil { + return err + } + + type response struct { + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + responseStream <- response{err: tx.Put(kv.BorCheckpoints, k[:], v)} + return nil + }) + if err != nil { + return err + } + + return r.err +} + +func (s polygonSyncStageCheckpointStore) RangeFromBlockNum(ctx context.Context, blockNum uint64) ([]*heimdall.Checkpoint, error) { + type response struct { + result []*heimdall.Checkpoint + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + makeEntity := func() *heimdall.Checkpoint { return &heimdall.Checkpoint{} } + r, err := blockRangeEntitiesFromBlockNum(tx, kv.BorCheckpoints, makeEntity, blockNum) + responseStream <- response{result: r, err: err} + return nil + }) + if err != nil { + return nil, err + } + + return r.result, r.err +} + +func (s polygonSyncStageCheckpointStore) Prepare(_ context.Context) error { + return nil +} + +func (s polygonSyncStageCheckpointStore) Close() { + // no-op +} + +type polygonSyncStageMilestoneStore struct { + milestoneReader services.BorMilestoneReader + txActionStream chan<- polygonSyncStageTxAction +} + +func (s polygonSyncStageMilestoneStore) GetLastEntityId(ctx context.Context) (uint64, bool, error) { + type response struct { + id uint64 + ok bool + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + id, ok, err := s.milestoneReader.LastMilestoneId(ctx, tx) + responseStream <- response{id: id, ok: ok, err: err} + return nil + }) + if err != nil { + return 0, false, err + } + + return r.id, r.ok, r.err +} + +func (s polygonSyncStageMilestoneStore) GetLastEntity(ctx context.Context) (*heimdall.Milestone, error) { + id, ok, err := s.GetLastEntityId(ctx) + if err != nil { + return nil, err + } + if !ok { + return nil, errors.New("last milestone not found") + } + + return s.GetEntity(ctx, id) +} + +func (s polygonSyncStageMilestoneStore) GetEntity(ctx context.Context, id uint64) (*heimdall.Milestone, error) { + type response struct { + v []byte + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + v, err := s.milestoneReader.Milestone(ctx, tx, id) + responseStream <- response{v: v, err: err} + return nil + }) + if err != nil { + return nil, err + } + if r.err != nil { + return nil, r.err + } + + var m heimdall.Milestone + err = json.Unmarshal(r.v, &m) + return &m, err +} + +func (s polygonSyncStageMilestoneStore) PutEntity(ctx context.Context, id uint64, entity *heimdall.Milestone) error { + var k [8]byte + binary.BigEndian.PutUint64(k[:], id) + + v, err := json.Marshal(entity) + if err != nil { + return err + } + + type response struct { + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + responseStream <- response{err: tx.Put(kv.BorMilestones, k[:], v)} + return nil + }) + if err != nil { + return err + } + + return r.err +} + +func (s polygonSyncStageMilestoneStore) RangeFromBlockNum(ctx context.Context, blockNum uint64) ([]*heimdall.Milestone, error) { + type response struct { + result []*heimdall.Milestone + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + makeEntity := func() *heimdall.Milestone { return &heimdall.Milestone{} } + r, err := blockRangeEntitiesFromBlockNum(tx, kv.BorMilestones, makeEntity, blockNum) + responseStream <- response{result: r, err: err} + return nil + }) + if err != nil { + return nil, err + } + + return r.result, r.err +} + +func (s polygonSyncStageMilestoneStore) Prepare(_ context.Context) error { + return nil +} + +func (s polygonSyncStageMilestoneStore) Close() { + // no-op +} + +type polygonSyncStageSpanStore struct { + spanReader services.BorSpanReader + txActionStream chan<- polygonSyncStageTxAction +} + +func (s polygonSyncStageSpanStore) GetLastEntityId(ctx context.Context) (id uint64, ok bool, err error) { + type response struct { + id uint64 + ok bool + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + id, ok, err := s.spanReader.LastSpanId(ctx, tx) + responseStream <- response{id: id, ok: ok, err: err} + return nil + }) + if err != nil { + return 0, false, err + } + + return r.id, r.ok, r.err +} + +func (s polygonSyncStageSpanStore) GetLastEntity(ctx context.Context) (*heimdall.Span, error) { + id, ok, err := s.GetLastEntityId(ctx) + if err != nil { + return nil, err + } + if !ok { + return nil, errors.New("last span not found") + } + + return s.GetEntity(ctx, id) +} + +func (s polygonSyncStageSpanStore) GetEntity(ctx context.Context, id uint64) (*heimdall.Span, error) { + type response struct { + v []byte + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + v, err := s.spanReader.Span(ctx, tx, id) + responseStream <- response{v: v, err: err} + return nil + }) + if err != nil { + return nil, err + } + if r.err != nil { + return nil, r.err + } + + var span heimdall.Span + err = json.Unmarshal(r.v, &span) + return &span, err +} + +func (s polygonSyncStageSpanStore) PutEntity(ctx context.Context, id uint64, entity *heimdall.Span) error { + var k [8]byte + binary.BigEndian.PutUint64(k[:], id) + + v, err := json.Marshal(entity) + if err != nil { + return err + } + + type response struct { + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + responseStream <- response{err: tx.Put(kv.BorSpans, k[:], v)} + return nil + }) + if err != nil { + return err + } + + return r.err +} + +func (s polygonSyncStageSpanStore) RangeFromBlockNum(ctx context.Context, blockNum uint64) ([]*heimdall.Span, error) { + type response struct { + result []*heimdall.Span + err error + } + + r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + makeEntity := func() *heimdall.Span { return &heimdall.Span{} } + r, err := blockRangeEntitiesFromBlockNum(tx, kv.BorSpans, makeEntity, blockNum) + responseStream <- response{result: r, err: err} + return nil + }) + if err != nil { + return nil, err + } + + return r.result, r.err +} + +func (s polygonSyncStageSpanStore) Prepare(_ context.Context) error { + return nil +} + +func (s polygonSyncStageSpanStore) Close() { + // no-op +} + +type blockRangeComparator interface { + CmpRange(blockNum uint64) int +} + +func blockRangeEntitiesFromBlockNum[T blockRangeComparator](tx kv.Tx, table string, makeEntity func() T, blockNum uint64) ([]T, error) { + cur, err := tx.Cursor(table) + if err != nil { + return nil, err + } + + defer cur.Close() + var k, v []byte + var entities []T + for k, v, err = cur.Last(); err == nil && k != nil; _, v, err = cur.Prev() { + entity := makeEntity() + err = json.Unmarshal(v, entity) + if err != nil { + return nil, err + } + if entity.CmpRange(blockNum) == 1 { + break + } + entities = append(entities, entity) + } + if err != nil { + return nil, err + } + + slices.Reverse(entities) + return entities, nil +} + +type polygonSyncStageExecutionEngine struct { + blockReader services.FullBlockReader + txActionStream chan<- polygonSyncStageTxAction + logger log.Logger + heimdallClient heimdall.HeimdallClient + stateReceiverABI abi.ABI + chainConfig *chain.Config + // internal + appendLogPrefix func(string) string + stageState *StageState + unwinder Unwinder + cachedForkChoice *types.Header + lastStateSyncEventIdInit bool + lastStateSyncEventId uint64 +} + +func (e *polygonSyncStageExecutionEngine) InsertBlocks(ctx context.Context, blocks []*types.Block) error { + type response struct { + err error + } + + r, err := awaitTxAction(ctx, e.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + responseStream <- response{err: e.insertBlocks(ctx, blocks, tx)} + return nil + }) + if err != nil { + return err + } + + return r.err +} + +func (e *polygonSyncStageExecutionEngine) insertBlocks(ctx context.Context, blocks []*types.Block, tx kv.RwTx) error { stateSyncEventsLogTicker := time.NewTicker(logInterval) defer stateSyncEventsLogTicker.Stop() @@ -305,8 +734,8 @@ func (s *polygonSyncStageService) handleInsertBlocks(ctx context.Context, tx kv. header := block.Header() body := block.Body() - metrics.UpdateBlockConsumerHeaderDownloadDelay(header.Time, height-1, s.logger) - metrics.UpdateBlockConsumerBodyDownloadDelay(header.Time, height-1, s.logger) + metrics.UpdateBlockConsumerHeaderDownloadDelay(header.Time, height-1, e.logger) + metrics.UpdateBlockConsumerBodyDownloadDelay(header.Time, height-1, e.logger) var parentTd *big.Int var err error @@ -338,40 +767,53 @@ func (s *polygonSyncStageService) handleInsertBlocks(ctx context.Context, tx kv. return err } - if err := s.downloadStateSyncEvents(ctx, tx, header, stateSyncEventsLogTicker); err != nil { + if err := e.downloadStateSyncEvents(ctx, tx, header, stateSyncEventsLogTicker); err != nil { return err } } - if s.bridge != nil { - err := s.bridge.ProcessNewBlocks(ctx, blocks) - if err != nil { - return err + return nil +} + +func (e *polygonSyncStageExecutionEngine) UpdateForkChoice(ctx context.Context, tip *types.Header, _ *types.Header) error { + type response struct { + err error + } + + r, err := awaitTxAction(ctx, e.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + err := e.updateForkChoice(tx, tip) + responseStream <- response{err: err} + if err == nil { + return updateForkChoiceSuccessErr } + return nil + }) + if err != nil { + return err } - return nil + return r.err } -func (s *polygonSyncStageService) handleUpdateForkChoice(tx kv.RwTx, tip *types.Header) error { +func (e *polygonSyncStageExecutionEngine) updateForkChoice(tx kv.RwTx, tip *types.Header) error { tipBlockNum := tip.Number.Uint64() tipHash := tip.Hash() - s.logger.Info(s.appendLogPrefix("handle update fork choice"), "block", tipBlockNum, "hash", tipHash) + e.logger.Info(e.appendLogPrefix("update fork choice"), "block", tipBlockNum, "hash", tipHash) - logPrefix := s.stageState.LogPrefix() + logPrefix := e.stageState.LogPrefix() logTicker := time.NewTicker(logInterval) defer logTicker.Stop() - newNodes, badNodes, err := fixCanonicalChain(logPrefix, logTicker, tipBlockNum, tipHash, tx, s.blockReader, s.logger) + newNodes, badNodes, err := fixCanonicalChain(logPrefix, logTicker, tipBlockNum, tipHash, tx, e.blockReader, e.logger) if err != nil { return err } if len(badNodes) > 0 { badNode := badNodes[len(badNodes)-1] - s.cachedForkChoice = tip - return s.unwinder.UnwindTo(badNode.number, ForkReset(badNode.hash), tx) + e.cachedForkChoice = tip + return e.unwinder.UnwindTo(badNode.number, ForkReset(badNode.hash), tx) } if len(newNodes) == 0 { @@ -386,7 +828,7 @@ func (s *polygonSyncStageService) handleUpdateForkChoice(tx kv.RwTx, tip *types. return err } - if err := s.stageState.Update(tx, tipBlockNum); err != nil { + if err := e.stageState.Update(tx, tipBlockNum); err != nil { return err } @@ -405,46 +847,84 @@ func (s *polygonSyncStageService) handleUpdateForkChoice(tx kv.RwTx, tip *types. return nil } -func (s *polygonSyncStageService) downloadStateSyncEvents( +func (e *polygonSyncStageExecutionEngine) CurrentHeader(ctx context.Context) (*types.Header, error) { + type response struct { + result *types.Header + err error + } + + r, err := awaitTxAction(ctx, e.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error { + r, err := e.currentHeader(ctx, tx) + responseStream <- response{result: r, err: err} + return nil + }) + if err != nil { + return nil, err + } + + return r.result, r.err +} + +func (e *polygonSyncStageExecutionEngine) currentHeader(ctx context.Context, tx kv.Tx) (*types.Header, error) { + stageBlockNum, err := stages.GetStageProgress(tx, stages.PolygonSync) + if err != nil { + return nil, err + } + + snapshotBlockNum := e.blockReader.FrozenBlocks() + if stageBlockNum < snapshotBlockNum { + return e.blockReader.HeaderByNumber(ctx, tx, snapshotBlockNum) + } + + hash := rawdb.ReadHeadHeaderHash(tx) + header := rawdb.ReadHeader(tx, hash, stageBlockNum) + if header == nil { + return nil, errors.New("header not found") + } + + return header, nil +} + +func (e *polygonSyncStageExecutionEngine) downloadStateSyncEvents( ctx context.Context, tx kv.RwTx, header *types.Header, logTicker *time.Ticker, ) error { var err error - if !s.lastStateSyncEventIdInit { - s.lastStateSyncEventId, _, err = s.blockReader.LastEventId(ctx, tx) + if !e.lastStateSyncEventIdInit { + e.lastStateSyncEventId, _, err = e.blockReader.LastEventId(ctx, tx) } if err != nil { return err } - s.lastStateSyncEventIdInit = true + e.lastStateSyncEventIdInit = true newStateSyncEventId, records, duration, err := fetchRequiredHeimdallStateSyncEventsIfNeeded( ctx, header, tx, - s.chainConfig.Bor.(*borcfg.BorConfig), - s.blockReader, - s.heimdallClient, - s.chainConfig.ChainID.String(), - s.stateReceiverABI, - s.stageState.LogPrefix(), - s.logger, - s.lastStateSyncEventId, + e.chainConfig.Bor.(*borcfg.BorConfig), + e.blockReader, + e.heimdallClient, + e.chainConfig.ChainID.String(), + e.stateReceiverABI, + e.stageState.LogPrefix(), + e.logger, + e.lastStateSyncEventId, ) if err != nil { return err } - if s.lastStateSyncEventId == newStateSyncEventId { + if e.lastStateSyncEventId == newStateSyncEventId { return nil } select { case <-logTicker.C: - s.logger.Info( - s.appendLogPrefix("downloading state sync events progress"), + e.logger.Info( + e.appendLogPrefix("downloading state sync events progress"), "blockNum", header.Number, "records", records, "duration", duration, @@ -453,163 +933,37 @@ func (s *polygonSyncStageService) downloadStateSyncEvents( // carry on } - s.lastStateSyncEventId = newStateSyncEventId + e.lastStateSyncEventId = newStateSyncEventId return nil } -func (s *polygonSyncStageService) handleSpan(ctx context.Context, tx kv.RwTx, sp *heimdall.Span) error { - return heimdall.NewTxStore(s.blockReader, tx).PutSpan(ctx, sp) -} - -func (s *polygonSyncStageService) handleCheckpoint(ctx context.Context, tx kv.RwTx, cp *heimdall.Checkpoint) error { - return heimdall.NewTxStore(s.blockReader, tx).PutCheckpoint(ctx, cp.Id, cp) -} - -func (s *polygonSyncStageService) handleMilestone(ctx context.Context, tx kv.RwTx, ms *heimdall.Milestone) error { - return heimdall.NewTxStore(s.blockReader, tx).PutMilestone(ctx, ms.Id, ms) -} - -type polygonSyncStageStorage struct { - db kv.RoDB - blockReader services.FullBlockReader - dataStream chan<- polygonSyncStageDataItem -} - -func (s *polygonSyncStageStorage) LastSpanId(ctx context.Context) (id heimdall.SpanId, ok bool, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastSpanId(ctx) - return err - }) - return -} - -func (s *polygonSyncStageStorage) GetSpan(ctx context.Context, id heimdall.SpanId) (sp *heimdall.Span, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - sp, err = heimdall.NewTxReadStore(s.blockReader, tx).GetSpan(ctx, id) - return err - }) - return -} - -func (s *polygonSyncStageStorage) PutSpan(_ context.Context, span *heimdall.Span) error { - s.dataStream <- polygonSyncStageDataItem{ - span: span, - } - - return nil -} - -func (s *polygonSyncStageStorage) LastMilestoneId(ctx context.Context) (id heimdall.MilestoneId, ok bool, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastMilestoneId(ctx) - return err - }) - return -} - -func (s *polygonSyncStageStorage) GetMilestone(ctx context.Context, id heimdall.MilestoneId) (ms *heimdall.Milestone, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - ms, err = heimdall.NewTxReadStore(s.blockReader, tx).GetMilestone(ctx, id) - return err - }) - return -} - -func (s *polygonSyncStageStorage) PutMilestone(_ context.Context, _ heimdall.MilestoneId, ms *heimdall.Milestone) error { - s.dataStream <- polygonSyncStageDataItem{ - milestone: ms, - } - - return nil -} - -func (s *polygonSyncStageStorage) LastCheckpointId(ctx context.Context) (id heimdall.CheckpointId, ok bool, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastCheckpointId(ctx) - return err - }) - return -} - -func (s *polygonSyncStageStorage) GetCheckpoint(ctx context.Context, id heimdall.CheckpointId) (cp *heimdall.Checkpoint, err error) { - err = s.db.View(ctx, func(tx kv.Tx) error { - cp, err = heimdall.NewTxReadStore(s.blockReader, tx).GetCheckpoint(ctx, id) - return err - }) - return -} - -func (s *polygonSyncStageStorage) PutCheckpoint(_ context.Context, _ heimdall.CheckpointId, cp *heimdall.Checkpoint) error { - s.dataStream <- polygonSyncStageDataItem{ - checkpoint: cp, - } - - return nil -} - -func (s *polygonSyncStageStorage) InsertBlocks(_ context.Context, blocks []*types.Block) error { - s.dataStream <- polygonSyncStageDataItem{ - insertBlocks: blocks, - } - - return nil -} - -func (s *polygonSyncStageStorage) Flush(context.Context, *types.Header) error { - return nil -} - -func (s *polygonSyncStageStorage) Run(context.Context) error { - return nil -} - -type polygonSyncStageExecutionEngine struct { - db kv.RoDB - blockReader services.FullBlockReader - dataStream chan<- polygonSyncStageDataItem -} - -func (e *polygonSyncStageExecutionEngine) InsertBlocks(_ context.Context, blocks []*types.Block) error { - e.dataStream <- polygonSyncStageDataItem{ - insertBlocks: blocks, - } - - return nil -} - -func (e *polygonSyncStageExecutionEngine) UpdateForkChoice(_ context.Context, tip *types.Header, _ *types.Header) error { - e.dataStream <- polygonSyncStageDataItem{ - updateForkChoice: tip, - } - - return nil -} - -func (e *polygonSyncStageExecutionEngine) CurrentHeader(ctx context.Context) (*types.Header, error) { - tx, err := e.db.BeginRo(ctx) - if err != nil { - return nil, err - } - - defer tx.Rollback() - - stageBlockNum, err := stages.GetStageProgress(tx, stages.PolygonSync) - if err != nil { - return nil, err +func awaitTxAction[T any]( + ctx context.Context, + txActionStream chan<- polygonSyncStageTxAction, + cb func(tx kv.RwTx, responseStream chan<- T) error, +) (T, error) { + responseStream := make(chan T) + txAction := polygonSyncStageTxAction{ + apply: func(tx kv.RwTx) error { + return cb(tx, responseStream) + }, } - snapshotBlockNum := e.blockReader.FrozenBlocks() - if stageBlockNum < snapshotBlockNum { - return e.blockReader.HeaderByNumber(ctx, tx, snapshotBlockNum) + select { + case <-ctx.Done(): + var nilValue T + return nilValue, ctx.Err() + case txActionStream <- txAction: + // no-op } - hash := rawdb.ReadHeadHeaderHash(tx) - header := rawdb.ReadHeader(tx, hash, stageBlockNum) - if header == nil { - return nil, errors.New("header not found") + select { + case <-ctx.Done(): + var nilValue T + return nilValue, ctx.Err() + case resp := <-responseStream: + return resp, nil } - - return header, nil } func newAppendLogPrefix(logPrefix string) func(msg string) string { diff --git a/polygon/heimdall/closed_range.go b/polygon/heimdall/closed_range.go index 08383f82407..a33a48d67f2 100644 --- a/polygon/heimdall/closed_range.go +++ b/polygon/heimdall/closed_range.go @@ -39,7 +39,3 @@ func ClosedRangeMap[TResult any](r ClosedRange, projection func(i uint64) (TResu return results, nil } - -func (r ClosedRange) Map(projection func(i uint64) (any, error)) ([]any, error) { - return ClosedRangeMap(r, projection) -} diff --git a/polygon/heimdall/entity_store.go b/polygon/heimdall/entity_store.go index aebe76fd242..4801786658c 100644 --- a/polygon/heimdall/entity_store.go +++ b/polygon/heimdall/entity_store.go @@ -40,8 +40,6 @@ type EntityStore[TEntity Entity] interface { GetLastEntity(ctx context.Context) (TEntity, error) GetEntity(ctx context.Context, id uint64) (TEntity, error) PutEntity(ctx context.Context, id uint64, entity TEntity) error - FindByBlockNum(ctx context.Context, blockNum uint64) (TEntity, error) - RangeFromId(ctx context.Context, startId uint64) ([]TEntity, error) RangeFromBlockNum(ctx context.Context, startBlockNum uint64) ([]TEntity, error) } @@ -205,19 +203,6 @@ func (s *mdbxEntityStore[TEntity]) PutEntity(ctx context.Context, id uint64, ent return s.blockNumToIdIndex.Put(ctx, entity.BlockNumRange(), id) } -func (s *mdbxEntityStore[TEntity]) FindByBlockNum(ctx context.Context, blockNum uint64) (TEntity, error) { - id, err := s.blockNumToIdIndex.Lookup(ctx, blockNum) - if err != nil { - return Zero[TEntity](), err - } - // not found - if id == 0 { - return Zero[TEntity](), nil - } - - return s.GetEntity(ctx, id) -} - func (s *mdbxEntityStore[TEntity]) RangeFromId(ctx context.Context, startId uint64) ([]TEntity, error) { tx, err := s.db.BeginRo(ctx) if err != nil { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index e2e2c2d8e8b..5482880e3b6 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -53,7 +53,6 @@ import ( "github.com/ledgerwatch/erigon/polygon/bor" "github.com/ledgerwatch/erigon/polygon/bor/finality" "github.com/ledgerwatch/erigon/polygon/bor/finality/flags" - "github.com/ledgerwatch/erigon/polygon/bridge" "github.com/ledgerwatch/erigon/polygon/heimdall" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" "github.com/ledgerwatch/erigon/turbo/services" @@ -712,7 +711,6 @@ func NewPolygonSyncStages( maxPeers int, statusDataProvider *sentry.StatusDataProvider, stopNode func() error, - polygonBridge bridge.Service, ) []*stagedsync.Stage { loopBreakCheck := NewLoopBreakCheck(config, heimdallClient) return stagedsync.PolygonSyncStages( @@ -744,7 +742,6 @@ func NewPolygonSyncStages( stopNode, bor.GenesisContractStateReceiverABI(), config.LoopBlockLimit, - polygonBridge, ), stagedsync.StageSendersCfg( db,