From 9881886a2d9b046d44da67c5b833d7cd29af1dd3 Mon Sep 17 00:00:00 2001 From: Valentin Staykov <79150443+V-Staykov@users.noreply.github.com> Date: Fri, 4 Oct 2024 14:10:47 +0300 Subject: [PATCH 1/9] refactor: remove unused flag (#1273) * refactor: remove unused flag * fix: kurtosis tests --- README.md | 4 ---- cmd/cdk-erigon/main.go | 10 ++++++++-- hermezconfig-bali.yaml.example | 1 - hermezconfig-cardona.yaml.example | 1 - hermezconfig-dev.yaml.example | 1 - hermezconfig-estest-syncer.yaml.example | 1 - hermezconfig-mainnet-shadowfork.yaml.example | 1 - hermezconfig-mainnet.yaml.example | 1 - turbo/cli/flags_zkevm.go | 5 ++--- xlayerconfig-mainnet.yaml.example | 1 - xlayerconfig-testnet.yaml.example | 1 - zk/tests/nightly-l1-recovery/network5-config.yaml | 1 - zk/tests/nightly-l1-recovery/network5-sync-config.yaml | 1 - zk/tests/nightly-l1-recovery/network8-config.yaml | 1 - zk/tests/nightly-l1-recovery/network8-sync-config.yaml | 1 - zk/tests/unwinds/config/dynamic-integration8.yaml | 1 - 16 files changed, 10 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 55b567116ab..1e593d467ed 100644 --- a/README.md +++ b/README.md @@ -132,9 +132,6 @@ Initial SMT build performance can be increased if machine has enough RAM: ## Configuration Files Config files are the easiest way to configure cdk-erigon, there are examples in the repository for each network e.g. `hermezconfig-mainnet.yaml.example`. - -Depending on the RPC provider you are using, you may wish to alter `zkevm.rpc-ratelimit`. - *** ## Running CDK-Erigon @@ -193,7 +190,6 @@ For a full explanation of the config options, see below: - `zkevm.address-zkevm`: The address for the zkevm contract - `zkevm.address-rollup`: The address for the rollup contract - `zkevm.address-ger-manager`: The address for the GER manager contract -- `zkevm.rpc-ratelimit`: Rate limit for RPC calls. - `zkevm.data-stream-port`: Port for the data stream. This needs to be set to enable the datastream server - `zkevm.data-stream-host`: The host for the data stream i.e. `localhost`. This must be set to enable the datastream server - `zkevm.datastream-version:` Version of the data stream protocol. diff --git a/cmd/cdk-erigon/main.go b/cmd/cdk-erigon/main.go index 6598464dea1..d8422b8faef 100644 --- a/cmd/cdk-erigon/main.go +++ b/cmd/cdk-erigon/main.go @@ -111,14 +111,20 @@ func setFlagsFromConfigFile(ctx *cli.Context, filePath string) error { } if err := ctx.Set(key, strings.Join(s, ",")); err != nil { if deprecatedFlag, found := erigoncli.DeprecatedFlags[key]; found { - return fmt.Errorf("failed setting %s flag Flag is deprecated, use %s instead", key, deprecatedFlag) + if deprecatedFlag == "" { + return fmt.Errorf("failed setting %s flag it is deprecated, remove it", key) + } + return fmt.Errorf("failed setting %s flag it is deprecated, use %s instead", key, deprecatedFlag) } return fmt.Errorf("failed setting %s flag with values=%s error=%s", key, s, err) } } else { if err := ctx.Set(key, fmt.Sprintf("%v", value)); err != nil { if deprecatedFlag, found := erigoncli.DeprecatedFlags[key]; found { - return fmt.Errorf("failed setting %s flag Flag is deprecated, use %s instead", key, deprecatedFlag) + if deprecatedFlag == "" { + return fmt.Errorf("failed setting %s flag it is deprecated, remove it", key) + } + return fmt.Errorf("failed setting %s flag it is deprecated, use %s instead", key, deprecatedFlag) } return fmt.Errorf("failed setting %s flag with value=%v error=%s", key, value, err) } diff --git a/hermezconfig-bali.yaml.example b/hermezconfig-bali.yaml.example index 84ecf06d856..2324247b7e4 100644 --- a/hermezconfig-bali.yaml.example +++ b/hermezconfig-bali.yaml.example @@ -19,7 +19,6 @@ zkevm.gas-price-factor: 0.12 zkevm.l1-rollup-id: 1 zkevm.l1-first-block: 4794475 -zkevm.rpc-ratelimit: 250 txpool.disable: true torrent.port: 42070 zkevm.datastream-version: 2 diff --git a/hermezconfig-cardona.yaml.example b/hermezconfig-cardona.yaml.example index 4840fe005b1..b25311f2fbb 100644 --- a/hermezconfig-cardona.yaml.example +++ b/hermezconfig-cardona.yaml.example @@ -21,7 +21,6 @@ zkevm.l1-rollup-id: 1 zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 4789190 -zkevm.rpc-ratelimit: 250 txpool.disable: true torrent.port: 42070 zkevm.datastream-version: 2 diff --git a/hermezconfig-dev.yaml.example b/hermezconfig-dev.yaml.example index d41f711ba82..76b02a7608b 100644 --- a/hermezconfig-dev.yaml.example +++ b/hermezconfig-dev.yaml.example @@ -16,7 +16,6 @@ zkevm.address-ger-manager: "0x76216E45Bdd20022eEcC07999e50228d7829534B" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 5192000 -zkevm.rpc-ratelimit: 250 zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 zkevm.data-stream-host: localhost diff --git a/hermezconfig-estest-syncer.yaml.example b/hermezconfig-estest-syncer.yaml.example index eda029d1d12..1458b566a94 100644 --- a/hermezconfig-estest-syncer.yaml.example +++ b/hermezconfig-estest-syncer.yaml.example @@ -16,7 +16,6 @@ zkevm.address-ger-manager: "0x0FE6A2FcF455b9B8004fd625909857933d3c7494" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 5192000 -zkevm.rpc-ratelimit: 250 zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 zkevm.data-stream-host: "127.0.0.1" diff --git a/hermezconfig-mainnet-shadowfork.yaml.example b/hermezconfig-mainnet-shadowfork.yaml.example index f50e69102ec..285b570b7a4 100644 --- a/hermezconfig-mainnet-shadowfork.yaml.example +++ b/hermezconfig-mainnet-shadowfork.yaml.example @@ -16,7 +16,6 @@ zkevm.address-ger-manager: "0x76216E45Bdd20022eEcC07999e50228d7829534B" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 5192000 -zkevm.rpc-ratelimit: 250 zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 zkevm.data-stream-host: localhost diff --git a/hermezconfig-mainnet.yaml.example b/hermezconfig-mainnet.yaml.example index f5ec1540a45..8cbf02145c9 100644 --- a/hermezconfig-mainnet.yaml.example +++ b/hermezconfig-mainnet.yaml.example @@ -21,7 +21,6 @@ zkevm.l1-rollup-id: 1 zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 16896700 -zkevm.rpc-ratelimit: 250 zkevm.datastream-version: 2 # debug.timers: true # Uncomment to enable timers diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 1014e76abc5..4d979d04d7e 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -17,7 +17,8 @@ import ( ) var DeprecatedFlags = map[string]string{ - "zkevm.gasless": "zkevm.allow-free-transactions", + "zkevm.gasless": "zkevm.allow-free-transactions", + "zkevm.rpc-ratelimit": "", } func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { @@ -135,7 +136,6 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { L1FirstBlock: ctx.Uint64(utils.L1FirstBlockFlag.Name), L1FinalizedBlockRequirement: ctx.Uint64(utils.L1FinalizedBlockRequirementFlag.Name), L1ContractAddressCheck: ctx.Bool(utils.L1ContractAddressCheckFlag.Name), - RpcRateLimits: ctx.Int(utils.RpcRateLimitsFlag.Name), RpcGetBatchWitnessConcurrencyLimit: ctx.Int(utils.RpcGetBatchWitnessConcurrencyLimitFlag.Name), DatastreamVersion: ctx.Int(utils.DatastreamVersionFlag.Name), RebuildTreeAfter: ctx.Uint64(utils.RebuildTreeAfterFlag.Name), @@ -224,7 +224,6 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { checkFlag(utils.L1RpcUrlFlag.Name, cfg.L1RpcUrl) checkFlag(utils.L1MaticContractAddressFlag.Name, cfg.L1MaticContractAddress.Hex()) checkFlag(utils.L1FirstBlockFlag.Name, cfg.L1FirstBlock) - checkFlag(utils.RpcRateLimitsFlag.Name, cfg.RpcRateLimits) checkFlag(utils.RpcGetBatchWitnessConcurrencyLimitFlag.Name, cfg.RpcGetBatchWitnessConcurrencyLimit) checkFlag(utils.RebuildTreeAfterFlag.Name, cfg.RebuildTreeAfter) checkFlag(utils.L1BlockRangeFlag.Name, cfg.L1BlockRange) diff --git a/xlayerconfig-mainnet.yaml.example b/xlayerconfig-mainnet.yaml.example index 9d92556b800..6bb2c269066 100644 --- a/xlayerconfig-mainnet.yaml.example +++ b/xlayerconfig-mainnet.yaml.example @@ -17,7 +17,6 @@ zkevm.l1-rollup-id: 3 zkevm.l1-first-block: 19218658 zkevm.l1-block-range: 2000 zkevm.l1-query-delay: 1000 -zkevm.rpc-ratelimit: 250 zkevm.datastream-version: 3 externalcl: true diff --git a/xlayerconfig-testnet.yaml.example b/xlayerconfig-testnet.yaml.example index cb9a07a510e..706c8009901 100644 --- a/xlayerconfig-testnet.yaml.example +++ b/xlayerconfig-testnet.yaml.example @@ -17,7 +17,6 @@ zkevm.l1-rollup-id: 1 zkevm.l1-first-block: 4648290 zkevm.l1-block-range: 2000 zkevm.l1-query-delay: 1000 -zkevm.rpc-ratelimit: 250 zkevm.datastream-version: 3 externalcl: true diff --git a/zk/tests/nightly-l1-recovery/network5-config.yaml b/zk/tests/nightly-l1-recovery/network5-config.yaml index 82cf74f727e..b274cc3b290 100644 --- a/zk/tests/nightly-l1-recovery/network5-config.yaml +++ b/zk/tests/nightly-l1-recovery/network5-config.yaml @@ -18,7 +18,6 @@ zkevm.l1-matic-contract-address: "0x0f25eE4CA85Db4362a9749782872c558873566e4" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 6032365 -zkevm.rpc-ratelimit: 250 zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 zkevm.data-stream-host: "127.0.0.1" diff --git a/zk/tests/nightly-l1-recovery/network5-sync-config.yaml b/zk/tests/nightly-l1-recovery/network5-sync-config.yaml index 648303c2b50..5a09921c6af 100644 --- a/zk/tests/nightly-l1-recovery/network5-sync-config.yaml +++ b/zk/tests/nightly-l1-recovery/network5-sync-config.yaml @@ -17,7 +17,6 @@ zkevm.l1-matic-contract-address: "0x0f25eE4CA85Db4362a9749782872c558873566e4" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 6032365 -zkevm.rpc-ratelimit: 250 zkevm.executor-strict: false # zkevm.executor-urls: "zkevm2-stateless-executor:50071" zkevm.witness-full: false diff --git a/zk/tests/nightly-l1-recovery/network8-config.yaml b/zk/tests/nightly-l1-recovery/network8-config.yaml index b38c7a3b45f..4c414cdc9c0 100644 --- a/zk/tests/nightly-l1-recovery/network8-config.yaml +++ b/zk/tests/nightly-l1-recovery/network8-config.yaml @@ -17,7 +17,6 @@ zkevm.l1-matic-contract-address: "0xdC66C280f5E8bBbd2F2d92FaD1489863c8F55915" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 6411787 -zkevm.rpc-ratelimit: 250 zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 zkevm.data-stream-host: "127.0.0.1" diff --git a/zk/tests/nightly-l1-recovery/network8-sync-config.yaml b/zk/tests/nightly-l1-recovery/network8-sync-config.yaml index 98a80e3747d..f51c251ea81 100644 --- a/zk/tests/nightly-l1-recovery/network8-sync-config.yaml +++ b/zk/tests/nightly-l1-recovery/network8-sync-config.yaml @@ -17,7 +17,6 @@ zkevm.l1-matic-contract-address: "0xdC66C280f5E8bBbd2F2d92FaD1489863c8F55915" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 6411787 -zkevm.rpc-ratelimit: 250 #zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 #zkevm.data-stream-host: "127.0.0.1" diff --git a/zk/tests/unwinds/config/dynamic-integration8.yaml b/zk/tests/unwinds/config/dynamic-integration8.yaml index afba2cd46a8..2590341925d 100644 --- a/zk/tests/unwinds/config/dynamic-integration8.yaml +++ b/zk/tests/unwinds/config/dynamic-integration8.yaml @@ -17,7 +17,6 @@ zkevm.l1-matic-contract-address: "0xdC66C280f5E8bBbd2F2d92FaD1489863c8F55915" zkevm.l1-block-range: 20000 zkevm.l1-query-delay: 6000 zkevm.l1-first-block: 6411787 -zkevm.rpc-ratelimit: 250 # zkevm.data-stream-port: 6900 zkevm.datastream-version: 2 # zkevm.data-stream-host: "127.0.0.1" From 58ab71c69d46d3778a92efeab967b24ef45a0e7c Mon Sep 17 00:00:00 2001 From: Kamen Stoykov <24619432+kstoykov@users.noreply.github.com> Date: Fri, 4 Oct 2024 17:06:59 +0300 Subject: [PATCH 2/9] feat(sequencer): update handing of errors (#1277) --- cmd/utils/flags.go | 7 +- eth/ethconfig/config_zkevm.go | 1 + eth/stagedsync/stage.go | 1 + eth/stagedsync/sync.go | 4 ++ turbo/cli/default_flags.go | 1 + turbo/cli/flags_zkevm.go | 1 + .../legacy_executor_verifier.go | 67 ++++++++++++++----- zk/stages/stage_sequence_execute.go | 15 ++--- zk/stages/stage_sequence_execute_batch.go | 41 ++++++++---- .../stage_sequence_execute_data_stream.go | 11 ++- zk/stages/stages.go | 8 ++- 11 files changed, 111 insertions(+), 46 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 021553ec9fa..7a9f2818e0c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -493,9 +493,14 @@ var ( } SequencerBatchVerificationTimeout = cli.StringFlag{ Name: "zkevm.sequencer-batch-verification-timeout", - Usage: "This is a maximum time that a batch verification could take. Including retries. This could be interpreted as maximum that that the sequencer can run without executor. Setting it to 0s will mean infinite timeout. Defaults to 30min", + Usage: "This is a maximum time that a batch verification could take in terms of executors' errors. Including retries. This could be interpreted as `maximum that that the sequencer can run without executor`. Setting it to 0s will mean infinite timeout. Defaults to 30min", Value: "30m", } + SequencerBatchVerificationRetries = cli.StringFlag{ + Name: "zkevm.sequencer-batch-verification-retries", + Usage: "Number of attempts that a batch will re-run in case of an internal (not executors') error. This could be interpreted as `maximum attempts to send a batch for verification`. Setting it to -1 will mean unlimited attempts. Defaults to 3", + Value: "3", + } SequencerTimeoutOnEmptyTxPool = cli.StringFlag{ Name: "zkevm.sequencer-timeout-on-empty-tx-pool", Usage: "Timeout before requesting txs from the txpool if none were found before. Defaults to 250ms", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 32de04302b2..4b7b1e155dd 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -37,6 +37,7 @@ type Zk struct { SequencerBlockSealTime time.Duration SequencerBatchSealTime time.Duration SequencerBatchVerificationTimeout time.Duration + SequencerBatchVerificationRetries int SequencerTimeoutOnEmptyTxPool time.Duration SequencerHaltOnBatchNumber uint64 SequencerResequence bool diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 0a04b9db012..0ce2c05615c 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -75,6 +75,7 @@ func (s *StageState) IntermediateHashesAt(db kv.Getter) (uint64, error) { type Unwinder interface { // UnwindTo begins staged sync unwind to the specified block. UnwindTo(unwindPoint uint64, badBlock libcommon.Hash) + IsUnwindSet() bool } // UnwindState contains the information about unwind. diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index caaf1e907ca..4fb793a393b 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -113,6 +113,10 @@ func (s *Sync) UnwindTo(unwindPoint uint64, badBlock libcommon.Hash) { s.badBlock = badBlock } +func (s *Sync) IsUnwindSet() bool { + return s.unwindPoint != nil +} + func (s *Sync) IsDone() bool { return s.currentStage >= uint(len(s.stages)) && s.unwindPoint == nil } diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 6f8fabc694b..a517b260c30 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -200,6 +200,7 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerBlockSealTime, &utils.SequencerBatchSealTime, &utils.SequencerBatchVerificationTimeout, + &utils.SequencerBatchVerificationRetries, &utils.SequencerTimeoutOnEmptyTxPool, &utils.SequencerHaltOnBatchNumber, &utils.SequencerResequence, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 4d979d04d7e..b9e750097b1 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -144,6 +144,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerBlockSealTime: sequencerBlockSealTime, SequencerBatchSealTime: sequencerBatchSealTime, SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout, + SequencerBatchVerificationRetries: ctx.Int(utils.SequencerBatchVerificationRetries.Name), SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool, SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index 9b081b0c01e..7084d33aefe 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -35,13 +35,14 @@ type VerifierRequest struct { Counters map[string]int creationTime time.Time timeout time.Duration + retries int } func NewVerifierRequest(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int) *VerifierRequest { - return NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, 0) + return NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, 0, -1) } -func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration) *VerifierRequest { +func NewVerifierRequestWithLimits(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration, retries int) *VerifierRequest { return &VerifierRequest{ BatchNumber: batchNumber, BlockNumbers: blockNumbers, @@ -50,6 +51,7 @@ func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []ui Counters: counters, creationTime: time.Now(), timeout: timeout, + retries: retries, } } @@ -61,6 +63,17 @@ func (vr *VerifierRequest) IsOverdue() bool { return time.Since(vr.creationTime) > vr.timeout } +func (vr *VerifierRequest) IncrementAndValidateRetries() bool { + if vr.retries == -1 { + return true + } + + if vr.retries > 0 { + vr.retries-- + } + return vr.retries > 0 +} + func (vr *VerifierRequest) GetFirstBlockNumber() uint64 { return vr.BlockNumbers[0] } @@ -78,17 +91,27 @@ type VerifierResponse struct { } type VerifierBundle struct { - Request *VerifierRequest - Response *VerifierResponse + Request *VerifierRequest + Response *VerifierResponse + readyForSendingRequest bool } -func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse) *VerifierBundle { +func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse, readyForSendingRequest bool) *VerifierBundle { return &VerifierBundle{ - Request: request, - Response: response, + Request: request, + Response: response, + readyForSendingRequest: readyForSendingRequest, } } +func (vb *VerifierBundle) markAsreadyForSendingRequest() { + vb.readyForSendingRequest = true +} + +func (vb *VerifierBundle) isInternalError() bool { + return !vb.readyForSendingRequest +} + type WitnessGenerator interface { GetWitnessByBlockRange(tx kv.Tx, ctx context.Context, startBlock, endBlock uint64, debug, witnessFull bool) ([]byte, error) } @@ -138,10 +161,11 @@ func (v *LegacyExecutorVerifier) StartAsyncVerification( blockNumbers []uint64, useRemoteExecutor bool, requestTimeout time.Duration, + retries int, ) { var promise *Promise[*VerifierBundle] - request := NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout) + request := NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout, retries) if useRemoteExecutor { promise = v.VerifyAsync(request) } else { @@ -200,7 +224,7 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[ // eager promise will do the work as soon as called in a goroutine, then we can retrieve the result later // ProcessResultsSequentiallyUnsafe relies on the fact that this function returns ALWAYS non-verifierBundle and error. The only exception is the case when verifications has been canceled. Only then the verifierBundle can be nil return NewPromise[*VerifierBundle](func() (*VerifierBundle, error) { - verifierBundle := NewVerifierBundle(request, nil) + verifierBundle := NewVerifierBundle(request, nil, false) blockNumbers := verifierBundle.Request.BlockNumbers e := v.GetNextOnlineAvailableExecutor() @@ -274,6 +298,8 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[ return verifierBundle, err } + verifierBundle.markAsreadyForSendingRequest() + ok, executorResponse, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) if generalErr != nil { return verifierBundle, generalErr @@ -308,7 +334,7 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest) ExecutorResponse: nil, Error: nil, } - return NewVerifierBundle(request, response), nil + return NewVerifierBundle(request, response, true), nil }) promise.Wait() @@ -322,11 +348,12 @@ func (v *LegacyExecutorVerifier) HasPendingVerifications() bool { return len(v.promises) > 0 } -func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) { +func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, *VerifierBundle) { v.mtxPromises.Lock() defer v.mtxPromises.Unlock() var verifierResponse []*VerifierBundle + var verifierBundleForUnwind *VerifierBundle // not a stop signal, so we can start to process our promises now for idx, promise := range v.promises { @@ -346,15 +373,23 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([ log.Error("error on our end while preparing the verification request, re-queueing the task", "err", err) - if verifierBundle.Request.IsOverdue() { - // signal an error, the caller can check on this and stop the process if needs be - return nil, fmt.Errorf("error: batch %d couldn't be processed in 30 minutes", verifierBundle.Request.BatchNumber) + if verifierBundle.isInternalError() { + canRetry := verifierBundle.Request.IncrementAndValidateRetries() + if !canRetry { + verifierBundleForUnwind = verifierBundle + break + } + } else { + if verifierBundle.Request.IsOverdue() { + verifierBundleForUnwind = verifierBundle + break + } } // re-queue the task - it should be safe to replace the index of the slice here as we only add to it v.promises[idx] = promise.CloneAndRerun() - // break now as we know we can't proceed here until this promise is attempted again + // we have a problamtic bundle so we cannot processed next, because it should break the sequentiality break } @@ -365,7 +400,7 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([ // remove processed promises from the list v.promises = v.promises[len(verifierResponse):] - return verifierResponse, nil + return verifierResponse, verifierBundleForUnwind } func (v *LegacyExecutorVerifier) Wait() { diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index ad533588a0d..075d67c3ff0 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -19,8 +19,7 @@ import ( "github.com/ledgerwatch/erigon/zk/utils" ) -// we must perform execution and datastream alignment only during first run of this stage -var shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = true +var shouldCheckForExecutionAndDataStreamAlighment = true func SpawnSequencingStage( s *stagedsync.StageState, @@ -132,7 +131,7 @@ func sequencingBatchStep( return sdb.tx.Commit() } - if shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart { + if shouldCheckForExecutionAndDataStreamAlighment { // handle cases where the last batch wasn't committed to the data stream. // this could occur because we're migrating from an RPC node to a sequencer // or because the sequencer was restarted and not all processes completed (like waiting from remote executor) @@ -142,20 +141,20 @@ func sequencingBatchStep( if !batchState.isAnyRecovery() { isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u) if err != nil { - // do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error + // do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error return err } if isUnwinding { err = sdb.tx.Commit() if err != nil { - // do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error + // do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error return err } - shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false + shouldCheckForExecutionAndDataStreamAlighment = false return nil } } - shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false + shouldCheckForExecutionAndDataStreamAlighment = false } tryHaltSequencer(batchContext, batchState.batchNumber) @@ -508,7 +507,7 @@ func sequencingBatchStep( if err != nil { return err } - cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout) + cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout, batchContext.cfg.zk.SequencerBatchVerificationRetries) // check for new responses from the verifier needsUnwind, err := updateStreamAndCheckRollback(batchContext, batchState, streamWriter, u) diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 80e2c7351b5..011fd186be4 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -9,6 +9,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk/l1_data" + verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/log/v3" ) @@ -91,7 +92,7 @@ func updateStreamAndCheckRollback( streamWriter *SequencerBatchStreamWriter, u stagedsync.Unwinder, ) (bool, error) { - checkedVerifierBundles, err := streamWriter.CommitNewUpdates() + checkedVerifierBundles, verifierBundleForUnwind, err := streamWriter.CommitNewUpdates() if err != nil { return false, err } @@ -122,21 +123,35 @@ func updateStreamAndCheckRollback( return false, err } - unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1 + err = markForUnwind(batchContext, streamWriter, u, verifierBundle) + return err == nil, err + } - // for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block - // causing the unwind. - unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber()) - if unwindHeader == nil { - return false, fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber()) - } + if verifierBundleForUnwind != nil { + err = markForUnwind(batchContext, streamWriter, u, verifierBundleForUnwind) + return err == nil, err + } - log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root) + return false, nil +} - u.UnwindTo(unwindTo, unwindHeader.Hash()) - streamWriter.legacyVerifier.CancelAllRequests() - return true, nil +func markForUnwind( + batchContext *BatchContext, + streamWriter *SequencerBatchStreamWriter, + u stagedsync.Unwinder, + verifierBundle *verifier.VerifierBundle, +) error { + unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1 + + // for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block + // causing the unwind. + unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber()) + if unwindHeader == nil { + return fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber()) } - return false, nil + log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root) + + u.UnwindTo(unwindTo, unwindHeader.Hash()) + return nil } diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 9631b317d98..41b52d1adf6 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -37,13 +37,10 @@ func newSequencerBatchStreamWriter(batchContext *BatchContext, batchState *Batch } } -func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, error) { - verifierBundles, err := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix) - if err != nil { - return nil, err - } - - return sbc.writeBlockDetailsToDatastream(verifierBundles) +func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, *verifier.VerifierBundle, error) { + verifierBundles, verifierBundleForUnwind := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix) + checkedVerifierBundles, err := sbc.writeBlockDetailsToDatastream(verifierBundles) + return checkedVerifierBundles, verifierBundleForUnwind, err } func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBundles []*verifier.VerifierBundle) ([]*verifier.VerifierBundle, error) { diff --git a/zk/stages/stages.go b/zk/stages/stages.go index ff24e6a01fd..4fc497f0f09 100644 --- a/zk/stages/stages.go +++ b/zk/stages/stages.go @@ -104,7 +104,13 @@ func SequencerZkStages( ID: stages2.Execution, Description: "Sequence transactions", Forward: func(firstCycle bool, badBlockUnwind bool, s *stages.StageState, u stages.Unwinder, tx kv.RwTx, quiet bool) error { - return SpawnSequencingStage(s, u, ctx, exec, history, quiet) + sequencerErr := SpawnSequencingStage(s, u, ctx, exec, history, quiet) + if sequencerErr != nil || u.IsUnwindSet() { + exec.legacyVerifier.CancelAllRequests() + // on the begining of next iteration the EXECUTION will be aligned to DS + shouldCheckForExecutionAndDataStreamAlighment = true + } + return sequencerErr }, Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { return UnwindSequenceExecutionStage(u, s, tx, ctx, exec, firstCycle) From 2f6f9dd8f4fad95802343b9acdee6442fb827ff3 Mon Sep 17 00:00:00 2001 From: Max Revitt Date: Mon, 7 Oct 2024 09:01:24 +0100 Subject: [PATCH 3/9] docs(readme): flag, zkevm.l1-first-block explanation (#1264) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1e593d467ed..ce3224d8b15 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ For a full explanation of the config options, see below: - `zkevm.l2-datastreamer-url`: URL for the L2 data streamer. - `zkevm.l1-chain-id`: Chain ID for the L1 network. - `zkevm.l1-rpc-url`: L1 Ethereum RPC URL. +- `zkevm.l1-first-block`: The first block on L1 from which we begin syncing (where the rollup begins on the L1). NB: for AggLayer networks this must be the L1 block where the GER Manager contract was deployed. - `zkevm.address-sequencer`: The contract address for the sequencer - `zkevm.address-zkevm`: The address for the zkevm contract - `zkevm.address-rollup`: The address for the rollup contract From 7177a0b8b918b64b284a7ed66969af872c12ac63 Mon Sep 17 00:00:00 2001 From: Valentin Staykov <79150443+V-Staykov@users.noreply.github.com> Date: Mon, 7 Oct 2024 11:43:27 +0300 Subject: [PATCH 4/9] refactor: extract stage batches logic into batches processor (#1251) * refactor: extract stage batches logic into batches processor * refactor: rename batches_processor to stage_batches_processor * fix(stage_batches): set new db tx upon progress save * refactor: create datastream routine runner * fix: bug with entry channe closing and hash check * fix: typo --- zk/datastream/client/stream_client.go | 40 +- .../test_datastream_compare.go | 2 +- zk/erigon_db/db.go | 4 + zk/hermez_db/db.go | 5 + zk/stages/stage_batches.go | 507 +++--------------- zk/stages/stage_batches_datastream.go | 108 ++++ zk/stages/stage_batches_processor.go | 485 +++++++++++++++++ zk/stages/stage_batches_test.go | 2 +- zk/stages/stages.go | 2 +- zk/stages/test_utils.go | 4 +- 10 files changed, 706 insertions(+), 453 deletions(-) create mode 100644 zk/stages/stage_batches_datastream.go create mode 100644 zk/stages/stage_batches_processor.go diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 461b3d9371c..ba85dd8c8ff 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -27,6 +27,7 @@ type EntityDefinition struct { const ( versionProto = 2 // converted to proto versionAddedBlockEnd = 3 // Added block end + entryChannelSize = 100000 ) var ( @@ -44,9 +45,10 @@ type StreamClient struct { checkTimeout time.Duration // time to wait for data before reporting an error // atomic - lastWrittenTime atomic.Int64 - streaming atomic.Bool - progress atomic.Uint64 + lastWrittenTime atomic.Int64 + streaming atomic.Bool + progress atomic.Uint64 + stopReadingToChannel atomic.Bool // Channels entryChan chan interface{} @@ -88,8 +90,8 @@ func (c *StreamClient) IsVersion3() bool { return c.version >= versionAddedBlockEnd } -func (c *StreamClient) GetEntryChan() chan interface{} { - return c.entryChan +func (c *StreamClient) GetEntryChan() *chan interface{} { + return &c.entryChan } // GetL2BlockByNumber queries the data stream by sending the L2 block start bookmark for the certain block number @@ -227,7 +229,7 @@ func (c *StreamClient) Stop() { c.conn.Close() c.conn = nil - close(c.entryChan) + c.clearEntryCHannel() } // Command header: Get status @@ -323,12 +325,29 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu return nil } +func (c *StreamClient) clearEntryCHannel() { + select { + case <-c.entryChan: + close(c.entryChan) + for range c.entryChan { + } + default: + } +} + +// close old entry chan and read all elements before opening a new one +func (c *StreamClient) renewEntryChannel() { + c.clearEntryCHannel() + c.entryChan = make(chan interface{}, entryChannelSize) +} + func (c *StreamClient) EnsureConnected() (bool, error) { if c.conn == nil { if err := c.tryReConnect(); err != nil { return false, fmt.Errorf("failed to reconnect the datastream client: %w", err) } - c.entryChan = make(chan interface{}, 100000) + + c.renewEntryChannel() } return true, nil @@ -368,9 +387,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() error { c.conn = nil } - // reset the channels as there could be data ahead of the bookmark we want to track here. - // c.resetChannels() - return err2 } @@ -474,6 +490,10 @@ func (c *StreamClient) tryReConnect() error { return err } +func (c *StreamClient) StopReadingToChannel() { + c.stopReadingToChannel.Store(true) +} + type FileEntryIterator interface { NextFileEntry() (*types.FileEntry, error) } diff --git a/zk/datastream/test/data_stream_compare/test_datastream_compare.go b/zk/datastream/test/data_stream_compare/test_datastream_compare.go index d5093a482c9..9cc63c3d348 100644 --- a/zk/datastream/test/data_stream_compare/test_datastream_compare.go +++ b/zk/datastream/test/data_stream_compare/test_datastream_compare.go @@ -81,7 +81,7 @@ func readFromClient(client *client.StreamClient, total int) ([]interface{}, erro LOOP: for { - entry := <-client.GetEntryChan() + entry := <-*client.GetEntryChan() switch entry.(type) { case types.FullL2Block: diff --git a/zk/erigon_db/db.go b/zk/erigon_db/db.go index a944408e22d..b3df0e88709 100644 --- a/zk/erigon_db/db.go +++ b/zk/erigon_db/db.go @@ -29,6 +29,10 @@ func NewErigonDb(tx kv.RwTx) *ErigonDb { } } +func (db *ErigonDb) SetNewTx(tx kv.RwTx) { + db.tx = tx +} + func (db ErigonDb) WriteHeader( blockNo *big.Int, blockHash common.Hash, diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 429ba5e72c5..bbdee1a8cef 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -112,6 +112,11 @@ func NewHermezDb(tx kv.RwTx) *HermezDb { return db } +func (db *HermezDb) SetNewTx(tx kv.RwTx) { + db.tx = tx + db.HermezDbReader.tx = tx +} + func CreateHermezBuckets(tx kv.RwTx) error { for _, t := range HermezDbTables { if err := tx.CreateBucket(t); err != nil { diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 2409178db41..f7cbffad6b1 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -20,19 +20,18 @@ import ( "github.com/ledgerwatch/erigon/zk/erigon_db" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/erigon/zk/sequencer" - txtype "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/zk/datastream/client" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" ) const ( - HIGHEST_KNOWN_FORK = 12 - STAGE_PROGRESS_SAVE = 3000000 + HIGHEST_KNOWN_FORK = 12 + STAGE_PROGRESS_SAVE = 3000000 + NEW_BLOCKS_ON_DS_LIMIT = 10000 ) var ( @@ -46,40 +45,23 @@ type ErigonDb interface { } type HermezDb interface { - WriteForkId(batchNumber uint64, forkId uint64) error - WriteForkIdBlockOnce(forkId, blockNum uint64) error - WriteBlockBatch(l2BlockNumber uint64, batchNumber uint64) error - WriteEffectiveGasPricePercentage(txHash common.Hash, effectiveGasPricePercentage uint8) error - DeleteEffectiveGasPricePercentages(txHashes *[]common.Hash) error - - WriteStateRoot(l2BlockNumber uint64, rpcRoot common.Hash) error - DeleteForkIds(fromBatchNum, toBatchNum uint64) error DeleteBlockBatches(fromBlockNum, toBlockNum uint64) error - CheckGlobalExitRootWritten(ger common.Hash) (bool, error) - WriteBlockGlobalExitRoot(l2BlockNo uint64, ger common.Hash) error - WriteGlobalExitRoot(ger common.Hash) error DeleteBlockGlobalExitRoots(fromBlockNum, toBlockNum uint64) error DeleteGlobalExitRoots(l1BlockHashes *[]common.Hash) error - WriteReusedL1InfoTreeIndex(l2BlockNo uint64) error DeleteReusedL1InfoTreeIndexes(fromBlockNum, toBlockNum uint64) error - WriteBlockL1BlockHash(l2BlockNo uint64, l1BlockHash common.Hash) error DeleteBlockL1BlockHashes(fromBlockNum, toBlockNum uint64) error - WriteBatchGlobalExitRoot(batchNumber uint64, ger *types.GerUpdate) error - WriteIntermediateTxStateRoot(l2BlockNumber uint64, txHash common.Hash, rpcRoot common.Hash) error WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error WriteBlockL1InfoTreeIndexProgress(blockNumber uint64, l1Index uint64) error - WriteLatestUsedGer(blockNo uint64, ger common.Hash) error } type DatastreamClient interface { ReadAllEntriesToChannel() error - GetEntryChan() chan interface{} + GetEntryChan() *chan interface{} GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error) GetLatestL2Block() (*types.FullL2Block, error) - GetLastWrittenTimeAtomic() *atomic.Int64 GetStreamingAtomic() *atomic.Bool GetProgressAtomic() *atomic.Uint64 EnsureConnected() (bool, error) @@ -87,6 +69,12 @@ type DatastreamClient interface { Stop() } +type DatastreamReadRunner interface { + StartRead() + StopRead() + RestartReadFromBlock(fromBlock uint64) +} + type dsClientCreatorHandler func(context.Context, *ethconfig.Zk, uint64) (DatastreamClient, error) type BatchesCfg struct { @@ -129,7 +117,6 @@ func SpawnStageBatches( ctx context.Context, tx kv.RwTx, cfg BatchesCfg, - quiet bool, ) error { logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Starting batches stage", logPrefix)) @@ -170,11 +157,6 @@ func SpawnStageBatches( return fmt.Errorf("get batch no by l2 block error: %v", err) } - highestVerifiedBatch, err := stages.GetStageProgress(tx, stages.L1VerificationsBatchNo) - if err != nil { - return errors.New("could not retrieve l1 verifications batch no progress") - } - startSyncTime := time.Now() latestForkId, err := stages.GetStageProgress(tx, stages.ForkId) @@ -198,86 +180,52 @@ func SpawnStageBatches( stageProgressBlockNo = highestDSL2Block.L2BlockNumber } - log.Debug(fmt.Sprintf("[%s] Highest block in datastream", logPrefix), "block", highestDSL2Block.L2BlockNumber) - log.Debug(fmt.Sprintf("[%s] Highest block in db", logPrefix), "block", stageProgressBlockNo) + log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo) dsClientProgress := cfg.dsClient.GetProgressAtomic() dsClientProgress.Store(stageProgressBlockNo) - // start routine to download blocks and push them in a channel - if !cfg.dsClient.GetStreamingAtomic().Load() { - log.Info(fmt.Sprintf("[%s] Starting stream", logPrefix), "startBlock", stageProgressBlockNo) - // this will download all blocks from datastream and push them in a channel - // if no error, break, else continue trying to get them - // Create bookmark - - connected := false - for i := 0; i < 5; i++ { - connected, err = cfg.dsClient.EnsureConnected() - if err != nil { - log.Error(fmt.Sprintf("[%s] Error connecting to datastream", logPrefix), "error", err) - continue - } - if connected { - break - } - } - - go func() { - log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine", logPrefix)) - defer log.Info(fmt.Sprintf("[%s] Finished downloading L2Blocks routine", logPrefix)) - - if connected { - if err := cfg.dsClient.ReadAllEntriesToChannel(); err != nil { - log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", logPrefix), "error", err) - } - } - }() - } // start a routine to print blocks written progress progressChan, stopProgressPrinter := zk.ProgressPrinterWithoutTotal(fmt.Sprintf("[%s] Downloaded blocks from datastream progress", logPrefix)) defer stopProgressPrinter() - lastBlockHeight := stageProgressBlockNo - highestSeenBatchNo := stageProgressBatchNo - endLoop := false - blocksWritten := uint64(0) - highestHashableL2BlockNo := uint64(0) - _, highestL1InfoTreeIndex, err := hermezDb.GetLatestBlockL1InfoTreeIndexProgress() if err != nil { return fmt.Errorf("failed to get highest used l1 info index, %w", err) } - lastForkId, err := stages.GetStageProgress(tx, stages.ForkId) - if err != nil { - return fmt.Errorf("failed to get last fork id, %w", err) - } - stageExecProgress, err := stages.GetStageProgress(tx, stages.Execution) if err != nil { return fmt.Errorf("failed to get stage exec progress, %w", err) } // just exit the stage early if there is more execution work to do - if stageExecProgress < lastBlockHeight { + if stageExecProgress < stageProgressBlockNo { log.Info(fmt.Sprintf("[%s] Execution behind, skipping stage", logPrefix)) return nil } - lastHash := emptyHash - lastBlockRoot := emptyHash - atLeastOneBlockWritten := false - startTime := time.Now() - log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream.", logPrefix)) + unwindFn := func(unwindBlock uint64) error { + return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u) + } + + batchProcessor, err := NewBatchesProcessor(ctx, logPrefix, tx, hermezDb, eriDb, cfg.zkCfg.SyncLimit, cfg.zkCfg.DebugLimit, cfg.zkCfg.DebugStepAfter, cfg.zkCfg.DebugStep, stageProgressBlockNo, stageProgressBatchNo, dsQueryClient, progressChan, unwindFn) + if err != nil { + return err + } + + // start routine to download blocks and push them in a channel + dsClientRunner := NewDatastreamClientRunner(cfg.dsClient, logPrefix) + dsClientRunner.StartRead() + defer dsClientRunner.StartRead() + entryChan := cfg.dsClient.GetEntryChan() - lastWrittenTimeAtomic := cfg.dsClient.GetLastWrittenTimeAtomic() - streamingAtomic := cfg.dsClient.GetStreamingAtomic() - prevAmountBlocksWritten := blocksWritten -LOOP: + prevAmountBlocksWritten, restartDatastreamBlock := uint64(0), uint64(0) + endLoop := false + for { // get batch start and use to update forkid // get block @@ -285,249 +233,48 @@ LOOP: // if download routine finished, should continue to read from channel until it's empty // if both download routine stopped and channel empty - stop loop select { - case entry := <-entryChan: - switch entry := entry.(type) { - case *types.BatchStart: - // check if the batch is invalid so that we can replicate this over in the stream - // when we re-populate it - if entry.BatchType == types.BatchTypeInvalid { - if err = hermezDb.WriteInvalidBatch(entry.Number); err != nil { - return err - } - // we need to write the fork here as well because the batch will never get processed as it is invalid - // but, we need it re-populate our own stream - if err = hermezDb.WriteForkId(entry.Number, entry.ForkId); err != nil { - return err - } - } - case *types.BatchEnd: - if entry.StateRoot != lastBlockRoot { - log.Debug(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) - } - // keep a record of the last block processed when we receive the batch end - if err = hermezDb.WriteBatchEnd(lastBlockHeight); err != nil { - return err - } - case *types.FullL2Block: - log.Debug(fmt.Sprintf("[%s] Retrieved %d (%s) block from stream", logPrefix, entry.L2BlockNumber, entry.L2Blockhash.String())) - if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit { - // stop the node going into a crazy loop - time.Sleep(2 * time.Second) - break LOOP - } - - // handle batch boundary changes - we do this here instead of reading the batch start channel because - // channels can be read in random orders which then creates problems in detecting fork changes during - // execution - if entry.BatchNumber > highestSeenBatchNo && lastForkId < entry.ForkId { - if entry.ForkId > HIGHEST_KNOWN_FORK { - message := fmt.Sprintf("unsupported fork id %v received from the data stream", entry.ForkId) - panic(message) - } - err = stages.SaveStageProgress(tx, stages.ForkId, entry.ForkId) - if err != nil { - return fmt.Errorf("save stage progress error: %v", err) - } - lastForkId = entry.ForkId - err = hermezDb.WriteForkId(entry.BatchNumber, entry.ForkId) - if err != nil { - return fmt.Errorf("write fork id error: %v", err) - } - // NOTE (RPC): avoided use of 'writeForkIdBlockOnce' by reading instead batch by forkId, and then lowest block number in batch - } - - // ignore genesis or a repeat of the last block - if entry.L2BlockNumber == 0 { - continue - } - // skip but warn on already processed blocks - if entry.L2BlockNumber <= stageProgressBlockNo { - if entry.L2BlockNumber < stageProgressBlockNo { - // only warn if the block is very old, we expect the very latest block to be requested - // when the stage is fired up for the first time - log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", logPrefix, entry.L2BlockNumber)) - } - - dbBatchNum, err := hermezDb.GetBatchNoByL2Block(entry.L2BlockNumber) - if err != nil { - return err - } - - if entry.BatchNumber > dbBatchNum { - // if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks - log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", logPrefix), - "block", entry.L2BlockNumber, "ds batch", entry.BatchNumber, "db batch", dbBatchNum) - if err := rollback(logPrefix, eriDb, hermezDb, dsQueryClient, entry.L2BlockNumber, tx, u); err != nil { - return err - } - cfg.dsClient.Stop() - return nil - } - continue - } - - var dbParentBlockHash common.Hash - if entry.L2BlockNumber > 0 { - dbParentBlockHash, err = eriDb.ReadCanonicalHash(entry.L2BlockNumber - 1) - if err != nil { - return fmt.Errorf("failed to retrieve parent block hash for datastream block %d: %w", - entry.L2BlockNumber, err) - } - } - - dsParentBlockHash := lastHash - if dsParentBlockHash == emptyHash { - parentBlockDS, _, err := dsQueryClient.GetL2BlockByNumber(entry.L2BlockNumber - 1) - if err != nil { - return err - } - - if parentBlockDS != nil { - dsParentBlockHash = parentBlockDS.L2Blockhash - } - } - - if dbParentBlockHash != dsParentBlockHash { - // unwind/rollback blocks until the latest common ancestor block - log.Warn(fmt.Sprintf("[%s] Parent block hashes mismatch on block %d. Triggering unwind...", logPrefix, entry.L2BlockNumber), - "db parent block hash", dbParentBlockHash, "ds parent block hash", dsParentBlockHash) - if err := rollback(logPrefix, eriDb, hermezDb, dsQueryClient, entry.L2BlockNumber, tx, u); err != nil { - return err - } - cfg.dsClient.Stop() - return nil - } - - // skip if we already have this block - if entry.L2BlockNumber < lastBlockHeight+1 { - log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", logPrefix, entry.L2BlockNumber)) - continue - } - - // check for sequential block numbers - if entry.L2BlockNumber > lastBlockHeight+1 { - log.Warn(fmt.Sprintf("[%s] Stream skipped ahead, unwinding to block %d", logPrefix, entry.L2BlockNumber)) - badBlock, err := eriDb.ReadCanonicalHash(entry.L2BlockNumber) - if err != nil { - return fmt.Errorf("failed to get bad block: %v", err) - } - u.UnwindTo(entry.L2BlockNumber, badBlock) - return nil - } - - // batch boundary - record the highest hashable block number (last block in last full batch) - if entry.BatchNumber > highestSeenBatchNo { - highestHashableL2BlockNo = entry.L2BlockNumber - 1 - } - highestSeenBatchNo = entry.BatchNumber - - /////// DEBUG BISECTION /////// - // exit stage when debug bisection flags set and we're at the limit block - if cfg.zkCfg.DebugLimit > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugLimit { - fmt.Printf("[%s] Debug limit reached, stopping stage\n", logPrefix) - endLoop = true - } - - // if we're above StepAfter, and we're at a step, move the stages on - if cfg.zkCfg.DebugStep > 0 && cfg.zkCfg.DebugStepAfter > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugStepAfter { - if entry.L2BlockNumber%cfg.zkCfg.DebugStep == 0 { - fmt.Printf("[%s] Debug step reached, stopping stage\n", logPrefix) - endLoop = true - } - } - /////// END DEBUG BISECTION /////// - - // store our finalized state if this batch matches the highest verified batch number on the L1 - if entry.BatchNumber == highestVerifiedBatch { - rawdb.WriteForkchoiceFinalized(tx, entry.L2Blockhash) - } - - if lastHash != emptyHash { - entry.ParentHash = lastHash - } else { - // first block in the loop so read the parent hash - previousHash, err := eriDb.ReadCanonicalHash(entry.L2BlockNumber - 1) - if err != nil { - return fmt.Errorf("failed to get genesis header: %v", err) - } - entry.ParentHash = previousHash - } - - if err := writeL2Block(eriDb, hermezDb, entry, highestL1InfoTreeIndex); err != nil { - return fmt.Errorf("writeL2Block error: %v", err) - } - dsClientProgress.Store(entry.L2BlockNumber) - - // make sure to capture the l1 info tree index changes so we can store progress - if uint64(entry.L1InfoTreeIndex) > highestL1InfoTreeIndex { - highestL1InfoTreeIndex = uint64(entry.L1InfoTreeIndex) - } - - lastHash = entry.L2Blockhash - lastBlockRoot = entry.StateRoot - - atLeastOneBlockWritten = true - lastBlockHeight = entry.L2BlockNumber - blocksWritten++ - progressChan <- blocksWritten - - if endLoop && cfg.zkCfg.DebugLimit > 0 { - break LOOP - } - case *types.GerUpdate: - if entry.GlobalExitRoot == emptyHash { - log.Warn(fmt.Sprintf("[%s] Skipping GER update with empty root", logPrefix)) - break - } + case entry := <-*entryChan: + if restartDatastreamBlock, endLoop, err = batchProcessor.ProcessEntry(entry); err != nil { + return err + } + dsClientProgress.Store(batchProcessor.LastBlockHeight()) - // NB: we won't get these post Etrog (fork id 7) - if err := hermezDb.WriteBatchGlobalExitRoot(entry.BatchNumber, entry); err != nil { - return fmt.Errorf("write batch global exit root error: %v", err) - } + if restartDatastreamBlock > 0 { + dsClientRunner.RestartReadFromBlock(restartDatastreamBlock) } case <-ctx.Done(): log.Warn(fmt.Sprintf("[%s] Context done", logPrefix)) endLoop = true default: - if atLeastOneBlockWritten { - // first check to see if anything has come in from the stream yet, if it has then wait a little longer - // because there could be more. - // if no blocks available should and time since last block written is > 500ms - // consider that we are at the tip and blocks come in the datastream as they are produced - // stop the current iteration of the stage - lastWrittenTs := lastWrittenTimeAtomic.Load() - timePassedAfterlastBlock := time.Since(time.Unix(0, lastWrittenTs)) - if timePassedAfterlastBlock > cfg.zkCfg.DatastreamNewBlockTimeout { - log.Info(fmt.Sprintf("[%s] No new blocks in %d miliseconds. Ending the stage.", logPrefix, timePassedAfterlastBlock.Milliseconds()), "lastBlockHeight", lastBlockHeight) - endLoop = true - } + time.Sleep(10 * time.Millisecond) + } + + // if ds end reached check again for new blocks in the stream + // if there are too many new blocks get them as well before ending stage + if batchProcessor.LastBlockHeight() >= highestDSL2Block.L2BlockNumber { + newLatestDSL2Block, err := dsQueryClient.GetLatestL2Block() + if err != nil { + return fmt.Errorf("failed to retrieve the latest datastream l2 block: %w", err) + } + if newLatestDSL2Block.L2BlockNumber > highestDSL2Block.L2BlockNumber+NEW_BLOCKS_ON_DS_LIMIT { + highestDSL2Block = newLatestDSL2Block } else { - timePassedAfterlastBlock := time.Since(startTime) - if timePassedAfterlastBlock.Seconds() > 10 { - // if the connection ropped, continue with next stages while it tries to reconnect - // otherwise it will get stuck in "waiting for at least one block to be written" loop - // if !streamingAtomic.Load() { - // endLoop = true - // break - // } - - if !streamingAtomic.Load() { - log.Info(fmt.Sprintf("[%s] Datastream disconnected. Ending the stage.", logPrefix)) - break LOOP - } - - log.Info(fmt.Sprintf("[%s] Waiting for at least one new block.", logPrefix)) - startTime = time.Now() - } + endLoop = true } - time.Sleep(10 * time.Millisecond) } - if blocksWritten != prevAmountBlocksWritten && blocksWritten%STAGE_PROGRESS_SAVE == 0 { - if err = saveStageProgress(tx, logPrefix, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId); err != nil { + if endLoop { + log.Info(fmt.Sprintf("[%s] Total blocks written: %d", logPrefix, batchProcessor.TotalBlocksWritten())) + break + } + + // this can be after the loop break because we save progress at the end of stage anyways. no need to do it twice + // commit progress from time to time + if batchProcessor.TotalBlocksWritten() != prevAmountBlocksWritten && batchProcessor.TotalBlocksWritten()%STAGE_PROGRESS_SAVE == 0 { + if err = saveStageProgress(tx, logPrefix, batchProcessor.HighestHashableL2BlockNo(), batchProcessor.HighestSeenBatchNumber(), batchProcessor.LastBlockHeight(), batchProcessor.LastForkId()); err != nil { return err } - if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(lastBlockHeight, highestL1InfoTreeIndex); err != nil { + if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(batchProcessor.LastBlockHeight(), highestL1InfoTreeIndex); err != nil { return err } @@ -536,36 +283,33 @@ LOOP: return fmt.Errorf("failed to commit tx, %w", err) } - tx, err = cfg.db.BeginRw(ctx) - if err != nil { + if tx, err = cfg.db.BeginRw(ctx); err != nil { return fmt.Errorf("failed to open tx, %w", err) } - hermezDb = hermez_db.NewHermezDb(tx) - eriDb = erigon_db.NewErigonDb(tx) + hermezDb.SetNewTx(tx) + eriDb.SetNewTx(tx) + batchProcessor.SetNewTx(tx) } - prevAmountBlocksWritten = blocksWritten + prevAmountBlocksWritten = batchProcessor.TotalBlocksWritten() } - if endLoop { - log.Info(fmt.Sprintf("[%s] Total blocks read: %d", logPrefix, blocksWritten)) - break - } } - if lastBlockHeight == stageProgressBlockNo { + // no new progress, nothing to save + if batchProcessor.LastBlockHeight() == stageProgressBlockNo { return nil } - if err = saveStageProgress(tx, logPrefix, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId); err != nil { + if err = saveStageProgress(tx, logPrefix, batchProcessor.HighestHashableL2BlockNo(), batchProcessor.HighestSeenBatchNumber(), batchProcessor.LastBlockHeight(), batchProcessor.LastForkId()); err != nil { return err } - if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(lastBlockHeight, highestL1InfoTreeIndex); err != nil { + if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(batchProcessor.LastBlockHeight(), highestL1InfoTreeIndex); err != nil { return err } // stop printing blocks written progress routine elapsed := time.Since(startSyncTime) - log.Info(fmt.Sprintf("[%s] Finished writing blocks", logPrefix), "blocksWritten", blocksWritten, "elapsed", elapsed) + log.Info(fmt.Sprintf("[%s] Finished writing blocks", logPrefix), "blocksWritten", batchProcessor.TotalBlocksWritten(), "elapsed", elapsed) if freshTx { if err := tx.Commit(); err != nil { @@ -845,119 +589,6 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx return nil } -// writeL2Block writes L2Block to ErigonDb and HermezDb -// writes header, body, forkId and blockBatch -func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, highestL1InfoTreeIndex uint64) error { - bn := new(big.Int).SetUint64(l2Block.L2BlockNumber) - txs := make([]ethTypes.Transaction, 0, len(l2Block.L2Txs)) - for _, transaction := range l2Block.L2Txs { - ltx, _, err := txtype.DecodeTx(transaction.Encoded, transaction.EffectiveGasPricePercentage, l2Block.ForkId) - if err != nil { - return fmt.Errorf("decode tx error: %v", err) - } - txs = append(txs, ltx) - - if err := hermezDb.WriteEffectiveGasPricePercentage(ltx.Hash(), transaction.EffectiveGasPricePercentage); err != nil { - return fmt.Errorf("write effective gas price percentage error: %v", err) - } - - if err := hermezDb.WriteStateRoot(l2Block.L2BlockNumber, transaction.IntermediateStateRoot); err != nil { - return fmt.Errorf("write rpc root error: %v", err) - } - - if err := hermezDb.WriteIntermediateTxStateRoot(l2Block.L2BlockNumber, ltx.Hash(), transaction.IntermediateStateRoot); err != nil { - return fmt.Errorf("write rpc root error: %v", err) - } - } - txCollection := ethTypes.Transactions(txs) - txHash := ethTypes.DeriveSha(txCollection) - - gasLimit := utils.GetBlockGasLimitForFork(l2Block.ForkId) - - _, err := eriDb.WriteHeader(bn, l2Block.L2Blockhash, l2Block.StateRoot, txHash, l2Block.ParentHash, l2Block.Coinbase, uint64(l2Block.Timestamp), gasLimit) - if err != nil { - return fmt.Errorf("write header error: %v", err) - } - - didStoreGer := false - l1InfoTreeIndexReused := false - - if l2Block.GlobalExitRoot != emptyHash { - gerWritten, err := hermezDb.CheckGlobalExitRootWritten(l2Block.GlobalExitRoot) - if err != nil { - return fmt.Errorf("get global exit root error: %v", err) - } - - if !gerWritten { - if err := hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write block global exit root error: %v", err) - } - - if err := hermezDb.WriteGlobalExitRoot(l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write global exit root error: %v", err) - } - didStoreGer = true - } - } - - if l2Block.L1BlockHash != emptyHash { - if err := hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { - return fmt.Errorf("write block global exit root error: %v", err) - } - } - - if l2Block.L1InfoTreeIndex != 0 { - if err := hermezDb.WriteBlockL1InfoTreeIndex(l2Block.L2BlockNumber, uint64(l2Block.L1InfoTreeIndex)); err != nil { - return err - } - - // if the info tree index of this block is lower than the highest we've seen - // we need to write the GER and l1 block hash regardless of the logic above. - // this can only happen in post etrog blocks, and we need the GER/L1 block hash - // for the stream and also for the block info root to be correct - if uint64(l2Block.L1InfoTreeIndex) <= highestL1InfoTreeIndex { - l1InfoTreeIndexReused = true - if err := hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write block global exit root error: %w", err) - } - if err := hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { - return fmt.Errorf("write block global exit root error: %w", err) - } - if err := hermezDb.WriteReusedL1InfoTreeIndex(l2Block.L2BlockNumber); err != nil { - return fmt.Errorf("write reused l1 info tree index error: %w", err) - } - } - } - - // if we haven't reused the l1 info tree index, and we have also written the GER - // then we need to write the latest used GER for this batch to the table - // we always want the last written GER in this table as it's at the batch level, so it can and should - // be overwritten - if !l1InfoTreeIndexReused && didStoreGer { - if err := hermezDb.WriteLatestUsedGer(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write latest used ger error: %w", err) - } - } - - if err := eriDb.WriteBody(bn, l2Block.L2Blockhash, txs); err != nil { - return fmt.Errorf("write body error: %v", err) - } - - if err := hermezDb.WriteForkId(l2Block.BatchNumber, l2Block.ForkId); err != nil { - return fmt.Errorf("write block batch error: %v", err) - } - - if err := hermezDb.WriteForkIdBlockOnce(l2Block.ForkId, l2Block.L2BlockNumber); err != nil { - return fmt.Errorf("write fork id block error: %v", err) - } - - if err := hermezDb.WriteBlockBatch(l2Block.L2BlockNumber, l2Block.BatchNumber); err != nil { - return fmt.Errorf("write block batch error: %v", err) - } - - return nil -} - // rollback performs the unwinding of blocks: // 1. queries the latest common ancestor for datastream and db, // 2. resolves the unwind block (as the latest block in the previous batch, comparing to the found ancestor block) diff --git a/zk/stages/stage_batches_datastream.go b/zk/stages/stage_batches_datastream.go new file mode 100644 index 00000000000..fefd7c17188 --- /dev/null +++ b/zk/stages/stage_batches_datastream.go @@ -0,0 +1,108 @@ +package stages + +import ( + "fmt" + "math/rand" + "sync/atomic" + "time" + + "github.com/ledgerwatch/log/v3" +) + +type DatastreamClientRunner struct { + dsClient DatastreamClient + logPrefix string + stopRunner atomic.Bool + isReading atomic.Bool +} + +func NewDatastreamClientRunner(dsClient DatastreamClient, logPrefix string) *DatastreamClientRunner { + return &DatastreamClientRunner{ + dsClient: dsClient, + logPrefix: logPrefix, + } +} + +func (r *DatastreamClientRunner) StartRead() error { + if r.isReading.Load() { + return fmt.Errorf("tried starting datastream client runner thread while another is running") + } + + go func() { + routineId := rand.Intn(1000000) + + log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine ID: %d", r.logPrefix, routineId)) + defer log.Info(fmt.Sprintf("[%s] Ended downloading L2Blocks routine ID: %d", r.logPrefix, routineId)) + + r.isReading.Store(true) + defer r.isReading.Store(false) + + for { + if r.stopRunner.Load() { + log.Info(fmt.Sprintf("[%s] Downloading L2Blocks routine stopped intentionally", r.logPrefix)) + break + } + + // start routine to download blocks and push them in a channel + if !r.dsClient.GetStreamingAtomic().Load() { + log.Info(fmt.Sprintf("[%s] Starting stream", r.logPrefix)) + // this will download all blocks from datastream and push them in a channel + // if no error, break, else continue trying to get them + // Create bookmark + + if err := r.connectDatastream(); err != nil { + log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err) + } + + if err := r.dsClient.ReadAllEntriesToChannel(); err != nil { + log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", r.logPrefix), "error", err) + } + } + } + }() + + return nil +} + +func (r *DatastreamClientRunner) StopRead() { + r.stopRunner.Store(true) +} + +func (r *DatastreamClientRunner) RestartReadFromBlock(fromBlock uint64) error { + r.StopRead() + + //wait for the old routine to be finished before continuing + counter := 0 + for { + if !r.isReading.Load() { + break + } + counter++ + if counter > 100 { + return fmt.Errorf("failed to stop reader routine correctly") + } + time.Sleep(100 * time.Millisecond) + } + + // set new block + r.dsClient.GetProgressAtomic().Store(fromBlock) + + log.Info(fmt.Sprintf("[%s] Restarting datastream from block %d", r.logPrefix, fromBlock)) + + return r.StartRead() +} + +func (r *DatastreamClientRunner) connectDatastream() (err error) { + var connected bool + for i := 0; i < 5; i++ { + if connected, err = r.dsClient.EnsureConnected(); err != nil { + log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err) + continue + } + if connected { + return nil + } + } + + return fmt.Errorf("failed to connect to datastream") +} diff --git a/zk/stages/stage_batches_processor.go b/zk/stages/stage_batches_processor.go new file mode 100644 index 00000000000..9f23270aa78 --- /dev/null +++ b/zk/stages/stage_batches_processor.go @@ -0,0 +1,485 @@ +package stages + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync/atomic" + "time" + + "github.com/gateway-fm/cdk-erigon-lib/common" + "github.com/gateway-fm/cdk-erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" + ethTypes "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/zk/datastream/types" + txtype "github.com/ledgerwatch/erigon/zk/tx" + "github.com/ledgerwatch/erigon/zk/utils" + "github.com/ledgerwatch/log/v3" +) + +type ProcessorErigonDb interface { + WriteHeader(batchNo *big.Int, blockHash common.Hash, stateRoot, txHash, parentHash common.Hash, coinbase common.Address, ts, gasLimit uint64) (*ethTypes.Header, error) + WriteBody(batchNo *big.Int, headerHash common.Hash, txs []ethTypes.Transaction) error + ReadCanonicalHash(L2BlockNumber uint64) (common.Hash, error) +} + +type ProcessorHermezDb interface { + WriteForkId(batchNumber uint64, forkId uint64) error + WriteForkIdBlockOnce(forkId, blockNum uint64) error + WriteBlockBatch(l2BlockNumber uint64, batchNumber uint64) error + WriteEffectiveGasPricePercentage(txHash common.Hash, effectiveGasPricePercentage uint8) error + + WriteStateRoot(l2BlockNumber uint64, rpcRoot common.Hash) error + + CheckGlobalExitRootWritten(ger common.Hash) (bool, error) + WriteBlockGlobalExitRoot(l2BlockNo uint64, ger common.Hash) error + WriteGlobalExitRoot(ger common.Hash) error + + WriteReusedL1InfoTreeIndex(l2BlockNo uint64) error + WriteBlockL1BlockHash(l2BlockNo uint64, l1BlockHash common.Hash) error + WriteBatchGlobalExitRoot(batchNumber uint64, ger *types.GerUpdate) error + WriteIntermediateTxStateRoot(l2BlockNumber uint64, txHash common.Hash, rpcRoot common.Hash) error + WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error + WriteLatestUsedGer(blockNo uint64, ger common.Hash) error + WriteInvalidBatch(batchNumber uint64) error + WriteBatchEnd(lastBlockHeight uint64) error + GetBatchNoByL2Block(l2BlockNumber uint64) (uint64, error) +} + +type DsQueryClient interface { + GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error) + GetProgressAtomic() *atomic.Uint64 +} + +type BatchesProcessor struct { + ctx context.Context + logPrefix string + tx kv.RwTx + hermezDb ProcessorHermezDb + eriDb ProcessorErigonDb + syncBlockLimit, + debugBlockLimit, + debugStepAfter, + debugStep, + stageProgressBlockNo, + lastForkId, + highestHashableL2BlockNo, + lastBlockHeight, + highestSeenBatchNo, + blocksWritten, + highestVerifiedBatch uint64 + highestL1InfoTreeIndex uint32 + lastBlockRoot, + lastBlockHash common.Hash + dsQueryClient DsQueryClient + progressChan chan uint64 + unwindFn func(uint64) error +} + +func NewBatchesProcessor( + ctx context.Context, + logPrefix string, + tx kv.RwTx, + hermezDb ProcessorHermezDb, + eriDb ProcessorErigonDb, + syncBlockLimit, debugBlockLimit, debugStepAfter, debugStep, stageProgressBlockNo, stageProgressBatchNo uint64, + dsQueryClient DsQueryClient, + progressChan chan uint64, + unwindFn func(uint64) error, +) (*BatchesProcessor, error) { + highestVerifiedBatch, err := stages.GetStageProgress(tx, stages.L1VerificationsBatchNo) + if err != nil { + return nil, errors.New("could not retrieve l1 verifications batch no progress") + } + + lastForkId, err := stages.GetStageProgress(tx, stages.ForkId) + if err != nil { + return nil, fmt.Errorf("failed to get last fork id, %w", err) + } + + return &BatchesProcessor{ + ctx: ctx, + logPrefix: logPrefix, + tx: tx, + hermezDb: hermezDb, + eriDb: eriDb, + syncBlockLimit: syncBlockLimit, + debugBlockLimit: debugBlockLimit, + debugStep: debugStep, + debugStepAfter: debugStepAfter, + stageProgressBlockNo: stageProgressBlockNo, + lastBlockHeight: stageProgressBlockNo, + highestSeenBatchNo: stageProgressBatchNo, + highestVerifiedBatch: highestVerifiedBatch, + dsQueryClient: dsQueryClient, + progressChan: progressChan, + lastBlockHash: emptyHash, + lastBlockRoot: emptyHash, + lastForkId: lastForkId, + unwindFn: unwindFn, + }, nil +} + +func (p *BatchesProcessor) ProcessEntry(entry interface{}) (rollbackBlock uint64, endLoop bool, err error) { + switch entry := entry.(type) { + case *types.BatchStart: + return 0, false, p.processBatchStartEntry(entry) + case *types.BatchEnd: + return 0, false, p.processBatchEndEntry(entry) + case *types.FullL2Block: + return p.processFullBlock(entry) + case *types.GerUpdate: + return 0, false, p.processGerUpdate(entry) + default: + return 0, false, fmt.Errorf("unknown entry type: %T", entry) + } +} + +func (p *BatchesProcessor) processGerUpdate(gerUpdate *types.GerUpdate) error { + if gerUpdate.GlobalExitRoot == emptyHash { + log.Warn(fmt.Sprintf("[%s] Skipping GER update with empty root", p.logPrefix)) + return nil + } + + // NB: we won't get these post Etrog (fork id 7) + if err := p.hermezDb.WriteBatchGlobalExitRoot(gerUpdate.BatchNumber, gerUpdate); err != nil { + return fmt.Errorf("write batch global exit root error: %v", err) + } + + return nil +} + +func (p *BatchesProcessor) processBatchEndEntry(batchEnd *types.BatchEnd) (err error) { + if batchEnd.StateRoot != p.lastBlockRoot { + log.Debug(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", p.logPrefix, batchEnd.StateRoot, p.lastBlockRoot)) + } + // keep a record of the last block processed when we receive the batch end + if err = p.hermezDb.WriteBatchEnd(p.lastBlockHeight); err != nil { + return err + } + return nil +} + +func (p *BatchesProcessor) processBatchStartEntry(batchStart *types.BatchStart) (err error) { + // check if the batch is invalid so that we can replicate this over in the stream + // when we re-populate it + if batchStart.BatchType == types.BatchTypeInvalid { + if err = p.hermezDb.WriteInvalidBatch(batchStart.Number); err != nil { + return err + } + // we need to write the fork here as well because the batch will never get processed as it is invalid + // but, we need it re-populate our own stream + if err = p.hermezDb.WriteForkId(batchStart.Number, batchStart.ForkId); err != nil { + return err + } + } + + return nil +} + +func (p *BatchesProcessor) processFullBlock(blockEntry *types.FullL2Block) (restartStreamFromBlock uint64, endLoop bool, err error) { + log.Debug(fmt.Sprintf("[%s] Retrieved %d (%s) block from stream", p.logPrefix, blockEntry.L2BlockNumber, blockEntry.L2Blockhash.String())) + if p.syncBlockLimit > 0 && blockEntry.L2BlockNumber >= p.syncBlockLimit { + // stop the node going into a crazy loop + time.Sleep(2 * time.Second) + return 0, true, nil + } + + // handle batch boundary changes - we do this here instead of reading the batch start channel because + // channels can be read in random orders which then creates problems in detecting fork changes during + // execution + if blockEntry.BatchNumber > p.highestSeenBatchNo && p.lastForkId < blockEntry.ForkId { + if blockEntry.ForkId > HIGHEST_KNOWN_FORK { + message := fmt.Sprintf("unsupported fork id %v received from the data stream", blockEntry.ForkId) + panic(message) + } + if err = stages.SaveStageProgress(p.tx, stages.ForkId, blockEntry.ForkId); err != nil { + return 0, false, fmt.Errorf("save stage progress error: %v", err) + } + p.lastForkId = blockEntry.ForkId + if err = p.hermezDb.WriteForkId(blockEntry.BatchNumber, blockEntry.ForkId); err != nil { + return 0, false, fmt.Errorf("write fork id error: %v", err) + } + // NOTE (RPC): avoided use of 'writeForkIdBlockOnce' by reading instead batch by forkId, and then lowest block number in batch + } + + // ignore genesis or a repeat of the last block + if blockEntry.L2BlockNumber == 0 { + return 0, false, nil + } + // skip but warn on already processed blocks + if blockEntry.L2BlockNumber <= p.stageProgressBlockNo { + if blockEntry.L2BlockNumber < p.stageProgressBlockNo { + // only warn if the block is very old, we expect the very latest block to be requested + // when the stage is fired up for the first time + log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", p.logPrefix, blockEntry.L2BlockNumber)) + } + + dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber) + if err != nil { + return 0, false, err + } + + if blockEntry.BatchNumber > dbBatchNum { + // if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks + log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", p.logPrefix), + "block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum) + if err := p.unwindFn(blockEntry.L2BlockNumber); err != nil { + return blockEntry.L2BlockNumber, false, err + } + } + return 0, false, nil + } + + var dbParentBlockHash common.Hash + if blockEntry.L2BlockNumber > 1 { + dbParentBlockHash, err = p.eriDb.ReadCanonicalHash(p.lastBlockHeight) + if err != nil { + return 0, false, fmt.Errorf("failed to retrieve parent block hash for datastream block %d: %w", + blockEntry.L2BlockNumber, err) + } + } + + dsParentBlockHash := p.lastBlockHash + dsBlockNumber := p.lastBlockHeight + if dsParentBlockHash == emptyHash { + parentBlockDS, _, err := p.dsQueryClient.GetL2BlockByNumber(blockEntry.L2BlockNumber - 1) + if err != nil { + return 0, false, err + } + + if parentBlockDS != nil { + dsParentBlockHash = parentBlockDS.L2Blockhash + if parentBlockDS.L2BlockNumber > 0 { + dsBlockNumber = parentBlockDS.L2BlockNumber + } + } + } + + if blockEntry.L2BlockNumber > 1 && dbParentBlockHash != dsParentBlockHash { + // unwind/rollback blocks until the latest common ancestor block + log.Warn(fmt.Sprintf("[%s] Parent block hashes mismatch on block %d. Triggering unwind...", p.logPrefix, blockEntry.L2BlockNumber), + "db parent block hash", dbParentBlockHash, + "ds parent block number", dsBlockNumber, + "ds parent block hash", dsParentBlockHash, + "ds parent block number", blockEntry.L2BlockNumber-1, + ) + //parent blockhash is wrong, so unwind to it, then restat stream from it to get the correct one + p.unwindFn(blockEntry.L2BlockNumber - 1) + return blockEntry.L2BlockNumber - 1, false, nil + } + + // skip if we already have this block + if blockEntry.L2BlockNumber < p.lastBlockHeight+1 { + log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed unwinding...", p.logPrefix, blockEntry.L2BlockNumber)) + p.unwindFn(blockEntry.L2BlockNumber) + } + + // check for sequential block numbers + if blockEntry.L2BlockNumber > p.lastBlockHeight+1 { + log.Warn(fmt.Sprintf("[%s] Stream skipped ahead, restarting datastream to block %d", p.logPrefix, blockEntry.L2BlockNumber)) + return p.lastBlockHeight + 1, false, nil + } + + // batch boundary - record the highest hashable block number (last block in last full batch) + if blockEntry.BatchNumber > p.highestSeenBatchNo { + p.highestHashableL2BlockNo = blockEntry.L2BlockNumber - 1 + } + p.highestSeenBatchNo = blockEntry.BatchNumber + + /////// DEBUG BISECTION /////// + // exit stage when debug bisection flags set and we're at the limit block + if p.debugBlockLimit > 0 && blockEntry.L2BlockNumber > p.debugBlockLimit { + log.Info(fmt.Sprintf("[%s] Debug limit reached, stopping stage\n", p.logPrefix)) + endLoop = true + } + + // if we're above StepAfter, and we're at a step, move the stages on + if p.debugStep > 0 && p.debugStepAfter > 0 && blockEntry.L2BlockNumber > p.debugStepAfter { + if blockEntry.L2BlockNumber%p.debugStep == 0 { + log.Info(fmt.Sprintf("[%s] Debug step reached, stopping stage\n", p.logPrefix)) + endLoop = true + } + } + /////// END DEBUG BISECTION /////// + + // store our finalized state if this batch matches the highest verified batch number on the L1 + if blockEntry.BatchNumber == p.highestVerifiedBatch { + rawdb.WriteForkchoiceFinalized(p.tx, blockEntry.L2Blockhash) + } + + if p.lastBlockHash != emptyHash { + blockEntry.ParentHash = p.lastBlockHash + } else { + // first block in the loop so read the parent hash + previousHash, err := p.eriDb.ReadCanonicalHash(blockEntry.L2BlockNumber - 1) + if err != nil { + return 0, false, fmt.Errorf("failed to get genesis header: %v", err) + } + blockEntry.ParentHash = previousHash + } + + if err := p.writeL2Block(blockEntry); err != nil { + return 0, false, fmt.Errorf("writeL2Block error: %v", err) + } + + p.dsQueryClient.GetProgressAtomic().Store(blockEntry.L2BlockNumber) + + // make sure to capture the l1 info tree index changes so we can store progress + if blockEntry.L1InfoTreeIndex > p.highestL1InfoTreeIndex { + p.highestL1InfoTreeIndex = blockEntry.L1InfoTreeIndex + } + + p.lastBlockHash = blockEntry.L2Blockhash + p.lastBlockRoot = blockEntry.StateRoot + + p.lastBlockHeight = blockEntry.L2BlockNumber + p.blocksWritten++ + p.progressChan <- p.blocksWritten + + if p.debugBlockLimit == 0 { + endLoop = false + } + return 0, endLoop, nil +} + +// writeL2Block writes L2Block to ErigonDb and HermezDb +// writes header, body, forkId and blockBatch +func (p *BatchesProcessor) writeL2Block(l2Block *types.FullL2Block) error { + bn := new(big.Int).SetUint64(l2Block.L2BlockNumber) + txs := make([]ethTypes.Transaction, 0, len(l2Block.L2Txs)) + for _, transaction := range l2Block.L2Txs { + ltx, _, err := txtype.DecodeTx(transaction.Encoded, transaction.EffectiveGasPricePercentage, l2Block.ForkId) + if err != nil { + return fmt.Errorf("decode tx error: %v", err) + } + txs = append(txs, ltx) + + if err := p.hermezDb.WriteEffectiveGasPricePercentage(ltx.Hash(), transaction.EffectiveGasPricePercentage); err != nil { + return fmt.Errorf("write effective gas price percentage error: %v", err) + } + + if err := p.hermezDb.WriteStateRoot(l2Block.L2BlockNumber, transaction.IntermediateStateRoot); err != nil { + return fmt.Errorf("write rpc root error: %v", err) + } + + if err := p.hermezDb.WriteIntermediateTxStateRoot(l2Block.L2BlockNumber, ltx.Hash(), transaction.IntermediateStateRoot); err != nil { + return fmt.Errorf("write rpc root error: %v", err) + } + } + txCollection := ethTypes.Transactions(txs) + txHash := ethTypes.DeriveSha(txCollection) + + gasLimit := utils.GetBlockGasLimitForFork(l2Block.ForkId) + + if _, err := p.eriDb.WriteHeader(bn, l2Block.L2Blockhash, l2Block.StateRoot, txHash, l2Block.ParentHash, l2Block.Coinbase, uint64(l2Block.Timestamp), gasLimit); err != nil { + return fmt.Errorf("write header error: %v", err) + } + + didStoreGer := false + l1InfoTreeIndexReused := false + + if l2Block.GlobalExitRoot != emptyHash { + gerWritten, err := p.hermezDb.CheckGlobalExitRootWritten(l2Block.GlobalExitRoot) + if err != nil { + return fmt.Errorf("get global exit root error: %v", err) + } + + if !gerWritten { + if err := p.hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write block global exit root error: %v", err) + } + + if err := p.hermezDb.WriteGlobalExitRoot(l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write global exit root error: %v", err) + } + didStoreGer = true + } + } + + if l2Block.L1BlockHash != emptyHash { + if err := p.hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { + return fmt.Errorf("write block global exit root error: %v", err) + } + } + + if l2Block.L1InfoTreeIndex != 0 { + if err := p.hermezDb.WriteBlockL1InfoTreeIndex(l2Block.L2BlockNumber, uint64(l2Block.L1InfoTreeIndex)); err != nil { + return err + } + + // if the info tree index of this block is lower than the highest we've seen + // we need to write the GER and l1 block hash regardless of the logic above. + // this can only happen in post etrog blocks, and we need the GER/L1 block hash + // for the stream and also for the block info root to be correct + if l2Block.L1InfoTreeIndex <= p.highestL1InfoTreeIndex { + l1InfoTreeIndexReused = true + if err := p.hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write block global exit root error: %w", err) + } + if err := p.hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { + return fmt.Errorf("write block global exit root error: %w", err) + } + if err := p.hermezDb.WriteReusedL1InfoTreeIndex(l2Block.L2BlockNumber); err != nil { + return fmt.Errorf("write reused l1 info tree index error: %w", err) + } + } + } + + // if we haven't reused the l1 info tree index, and we have also written the GER + // then we need to write the latest used GER for this batch to the table + // we always want the last written GER in this table as it's at the batch level, so it can and should + // be overwritten + if !l1InfoTreeIndexReused && didStoreGer { + if err := p.hermezDb.WriteLatestUsedGer(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write latest used ger error: %w", err) + } + } + + if err := p.eriDb.WriteBody(bn, l2Block.L2Blockhash, txs); err != nil { + return fmt.Errorf("write body error: %v", err) + } + + if err := p.hermezDb.WriteForkId(l2Block.BatchNumber, l2Block.ForkId); err != nil { + return fmt.Errorf("write block batch error: %v", err) + } + + if err := p.hermezDb.WriteForkIdBlockOnce(l2Block.ForkId, l2Block.L2BlockNumber); err != nil { + return fmt.Errorf("write fork id block error: %v", err) + } + + if err := p.hermezDb.WriteBlockBatch(l2Block.L2BlockNumber, l2Block.BatchNumber); err != nil { + return fmt.Errorf("write block batch error: %v", err) + } + + return nil +} + +func (p *BatchesProcessor) AtLeastOneBlockWritten() bool { + return p.lastBlockHeight > 0 +} + +func (p *BatchesProcessor) LastBlockHeight() uint64 { + return p.lastBlockHeight +} + +func (p *BatchesProcessor) HighestSeenBatchNumber() uint64 { + return p.highestSeenBatchNo +} +func (p *BatchesProcessor) LastForkId() uint64 { + return p.lastForkId +} + +func (p *BatchesProcessor) TotalBlocksWritten() uint64 { + return p.blocksWritten +} + +func (p *BatchesProcessor) HighestHashableL2BlockNo() uint64 { + return p.highestHashableL2BlockNo +} + +func (p *BatchesProcessor) SetNewTx(tx kv.RwTx) { + p.tx = tx +} diff --git a/zk/stages/stage_batches_test.go b/zk/stages/stage_batches_test.go index 1bc39ca895f..e81fcc051c4 100644 --- a/zk/stages/stage_batches_test.go +++ b/zk/stages/stage_batches_test.go @@ -75,7 +75,7 @@ func TestUnwindBatches(t *testing.T) { ///////// // ACT // ///////// - err = SpawnStageBatches(s, u, ctx, tx, cfg, true) + err = SpawnStageBatches(s, u, ctx, tx, cfg) require.NoError(t, err) tx.Commit() diff --git a/zk/stages/stages.go b/zk/stages/stages.go index 4fc497f0f09..b22c7e7f1b4 100644 --- a/zk/stages/stages.go +++ b/zk/stages/stages.go @@ -292,7 +292,7 @@ func DefaultZkStages( if badBlockUnwind { return nil } - return SpawnStageBatches(s, u, ctx, tx, batchesCfg, test) + return SpawnStageBatches(s, u, ctx, tx, batchesCfg) }, Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { return UnwindBatchesStage(u, tx, batchesCfg, ctx) diff --git a/zk/stages/test_utils.go b/zk/stages/test_utils.go index df250ebf717..af9ec190587 100644 --- a/zk/stages/test_utils.go +++ b/zk/stages/test_utils.go @@ -46,8 +46,8 @@ func (c *TestDatastreamClient) ReadAllEntriesToChannel() error { return nil } -func (c *TestDatastreamClient) GetEntryChan() chan interface{} { - return c.entriesChan +func (c *TestDatastreamClient) GetEntryChan() *chan interface{} { + return &c.entriesChan } func (c *TestDatastreamClient) GetErrChan() chan error { From 8169bb6a0581f127da97e83ef99608c6605f28e3 Mon Sep 17 00:00:00 2001 From: Scott Fairclough <70711990+hexoscott@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:52:43 +0100 Subject: [PATCH 5/9] add correlation to send/receive requests to executor (#1283) --- zk/legacy_executor_verifier/executor.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/zk/legacy_executor_verifier/executor.go b/zk/legacy_executor_verifier/executor.go index b4f2aa21f37..be5fa50c6fc 100644 --- a/zk/legacy_executor_verifier/executor.go +++ b/zk/legacy_executor_verifier/executor.go @@ -19,6 +19,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "github.com/google/uuid" ) var ( @@ -162,9 +163,18 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() + correlation := uuid.New().String() witnessSize := humanize.Bytes(uint64(len(p.Witness))) dataStreamSize := humanize.Bytes(uint64(len(p.DataStream))) - log.Info("Sending request to grpc server", "grpcUrl", e.grpcUrl, "ourRoot", request.StateRoot, "oldRoot", oldStateRoot, "batch", request.BatchNumber, "witness-size", witnessSize, "data-stream-size", dataStreamSize) + log.Info("Sending request to grpc server", + "grpcUrl", e.grpcUrl, + "ourRoot", request.StateRoot, + "oldRoot", oldStateRoot, + "batch", request.BatchNumber, + "witness-size", witnessSize, + "data-stream-size", dataStreamSize, + "blocks-count", len(request.BlockNumbers), + "correlation", correlation) size := 1024 * 1024 * 256 // 256mb maximum size - hack for now until trimmed witness is proved off @@ -239,7 +249,8 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com "exec-root", common.BytesToHash(resp.NewStateRoot), "our-root", request.StateRoot, "exec-old-root", common.BytesToHash(resp.OldStateRoot), - "our-old-root", oldStateRoot) + "our-old-root", oldStateRoot, + "correlation", correlation) for addr, all := range resp.ReadWriteAddresses { log.Debug("executor result", From 6fabed45652b315754751c7d494ad7b35f2a9eb6 Mon Sep 17 00:00:00 2001 From: Moretti Georgiev Date: Mon, 7 Oct 2024 17:57:57 +0300 Subject: [PATCH 6/9] Add l1 contract address retrieval (#1284) * refactor: retrieve contract addresses from l1 Signed-off-by: Moreti Georgiev * refactor: add zkevm.l1-contract-address-retrieve flag to hermezconfig-dev Signed-off-by: Moreti Georgiev --------- Signed-off-by: Moreti Georgiev --- cmd/utils/flags.go | 5 +++ eth/backend.go | 68 +++++++++++++++++++++++++++++------ eth/ethconfig/config_zkevm.go | 1 + hermezconfig-dev.yaml.example | 1 + turbo/cli/default_flags.go | 1 + turbo/cli/flags_zkevm.go | 5 ++- 6 files changed, 67 insertions(+), 14 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7a9f2818e0c..92fa2f1fc71 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -466,6 +466,11 @@ var ( Usage: "Check the contract address on the L1", Value: true, } + L1ContractAddressRetrieveFlag = cli.BoolFlag{ + Name: "zkevm.l1-contract-address-retrieve", + Usage: "Retrieve the contracts addresses from the L1", + Value: true, + } RebuildTreeAfterFlag = cli.Uint64Flag{ Name: "zkevm.rebuild-tree-after", Usage: "Rebuild the state tree after this many blocks behind", diff --git a/eth/backend.go b/eth/backend.go index 6f3b2bc3489..bb341749b39 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -848,17 +848,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Rollup ID", "rollupId", cfg.L1RollupId) - // check contract addresses in config against L1 - if cfg.Zk.L1ContractAddressCheck { - success, err := l1ContractAddressCheck(ctx, cfg.Zk, backend.l1Syncer) - if !success || err != nil { - //log.Warn("Contract address check failed", "success", success, "err", err) - panic("Contract address check failed") - } - log.Info("Contract address check passed") - } else { - log.Info("Contract address check skipped") - } + // Check if L1 contracts addresses should be retrieved from the L1 chain + l1ContractAddressProcess(ctx, cfg.Zk, backend.l1Syncer) l1InfoTreeSyncer := syncer.NewL1Syncer( ctx, @@ -1554,6 +1545,57 @@ func checkPortIsFree(addr string) (free bool) { return false } +func l1ContractAddressProcess(ctx context.Context, cfg *ethconfig.Zk, l1BlockSyncer *syncer.L1Syncer) { + if cfg.L1ContractAddressRetrieve { + l1ContractAddress(ctx, cfg, l1BlockSyncer) + return + } + l1ContractAddressValidate(ctx, cfg, l1BlockSyncer) +} + +func l1ContractAddress(ctx context.Context, cfg *ethconfig.Zk, l1BlockSyncer *syncer.L1Syncer) { + if err := l1ContractAdressFromZKevm(ctx, cfg, l1BlockSyncer); err != nil { + panic("Failed to retrieve contract addresses from L1") + } + log.Info("Contract addresses retrieved from L1") +} + +func l1ContractAdressFromZKevm(ctx context.Context, cfg *ethconfig.Zk, l1BlockSyncer *syncer.L1Syncer) error { + l1AddrRollup, err := l1BlockSyncer.CallRollupManager(ctx, &cfg.AddressZkevm) + if err != nil { + return err + } + cfg.AddressRollup = l1AddrRollup + + l1AddrGerManager, err := l1BlockSyncer.CallGlobalExitRootManager(ctx, &cfg.AddressZkevm) + if err != nil { + return err + } + cfg.AddressGerManager = l1AddrGerManager + + l1AddrSequencer, err := l1BlockSyncer.CallTrustedSequencer(ctx, &cfg.AddressZkevm) + if err != nil { + return err + } + cfg.AddressSequencer = l1AddrSequencer + + return nil +} + +func l1ContractAddressValidate(ctx context.Context, cfg *ethconfig.Zk, l1BlockSyncer *syncer.L1Syncer) { + if !cfg.L1ContractAddressCheck { + log.Info("Contract address check skipped") + return + } + + success, err := l1ContractAddressCheck(ctx, cfg, l1BlockSyncer) + if !success || err != nil { + panic("Contract address check failed") + } + + log.Info("Contract address check passed") +} + func l1ContractAddressCheck(ctx context.Context, cfg *ethconfig.Zk, l1BlockSyncer *syncer.L1Syncer) (bool, error) { l1AddrRollup, err := l1BlockSyncer.CallRollupManager(ctx, &cfg.AddressZkevm) if err != nil { @@ -1564,6 +1606,7 @@ func l1ContractAddressCheck(ctx context.Context, cfg *ethconfig.Zk, l1BlockSynce log.Warn("L1 contract address check failed (AddressRollup)", "expected", cfg.AddressRollup, "actual", l1AddrRollup) return false, nil } + log.Warn("🚨 zkevm.address-rollup configuration parameter is deprecated and it will be removed in upcoming releases") if cfg.AddressAdmin != (libcommon.Address{}) { log.Warn("🚨 zkevm.address-admin configuration parameter is deprecated and it will be removed in upcoming releases") @@ -1577,6 +1620,7 @@ func l1ContractAddressCheck(ctx context.Context, cfg *ethconfig.Zk, l1BlockSynce log.Warn("L1 contract address check failed (AddressGerManager)", "expected", cfg.AddressGerManager, "actual", l1AddrGerManager) return false, nil } + log.Warn("🚨 zkevm.address-ger-manager configuration parameter is deprecated and it will be removed in upcoming releases") l1AddrSequencer, err := l1BlockSyncer.CallTrustedSequencer(ctx, &cfg.AddressZkevm) if err != nil { @@ -1586,5 +1630,7 @@ func l1ContractAddressCheck(ctx context.Context, cfg *ethconfig.Zk, l1BlockSynce log.Warn("L1 contract address check failed (AddressSequencer)", "expected", cfg.AddressSequencer, "actual", l1AddrSequencer) return false, nil } + log.Warn("🚨 zkevm.address-sequencer configuration parameter is deprecated and it will be removed in upcoming releases") + return true, nil } diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 4b7b1e155dd..7bf61a783f4 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -22,6 +22,7 @@ type Zk struct { AddressZkevm common.Address AddressGerManager common.Address L1ContractAddressCheck bool + L1ContractAddressRetrieve bool L1RollupId uint64 L1BlockRange uint64 L1QueryDelay uint64 diff --git a/hermezconfig-dev.yaml.example b/hermezconfig-dev.yaml.example index 76b02a7608b..41a3775de1d 100644 --- a/hermezconfig-dev.yaml.example +++ b/hermezconfig-dev.yaml.example @@ -23,6 +23,7 @@ zkevm.executor-strict: true zkevm.executor-urls: 51.210.116.237:50071 zkevm.l1-contract-address-check: false +zkevm.l1-contract-address-retrieve: false externalcl: true http.api: [eth, debug, net, trace, web3, erigon, txpool, zkevm] diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index a517b260c30..2eb99dadd7e 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -191,6 +191,7 @@ var DefaultFlags = []cli.Flag{ &utils.L1FirstBlockFlag, &utils.L1FinalizedBlockRequirementFlag, &utils.L1ContractAddressCheckFlag, + &utils.L1ContractAddressRetrieveFlag, &utils.RpcRateLimitsFlag, &utils.RpcGetBatchWitnessConcurrencyLimitFlag, &utils.DatastreamVersionFlag, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index b9e750097b1..394eb155ccf 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -136,6 +136,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { L1FirstBlock: ctx.Uint64(utils.L1FirstBlockFlag.Name), L1FinalizedBlockRequirement: ctx.Uint64(utils.L1FinalizedBlockRequirementFlag.Name), L1ContractAddressCheck: ctx.Bool(utils.L1ContractAddressCheckFlag.Name), + L1ContractAddressRetrieve: ctx.Bool(utils.L1ContractAddressRetrieveFlag.Name), RpcGetBatchWitnessConcurrencyLimit: ctx.Int(utils.RpcGetBatchWitnessConcurrencyLimitFlag.Name), DatastreamVersion: ctx.Int(utils.DatastreamVersionFlag.Name), RebuildTreeAfter: ctx.Uint64(utils.RebuildTreeAfterFlag.Name), @@ -216,10 +217,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { } } - checkFlag(utils.AddressSequencerFlag.Name, cfg.AddressSequencer) - checkFlag(utils.AddressRollupFlag.Name, cfg.AddressRollup) checkFlag(utils.AddressZkevmFlag.Name, cfg.AddressZkevm) - checkFlag(utils.AddressGerManagerFlag.Name, cfg.AddressGerManager) checkFlag(utils.L1ChainIdFlag.Name, cfg.L1ChainId) checkFlag(utils.L1RpcUrlFlag.Name, cfg.L1RpcUrl) @@ -231,4 +229,5 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { checkFlag(utils.L1QueryDelayFlag.Name, cfg.L1QueryDelay) checkFlag(utils.TxPoolRejectSmartContractDeployments.Name, cfg.TxPoolRejectSmartContractDeployments) checkFlag(utils.L1ContractAddressCheckFlag.Name, cfg.L1ContractAddressCheck) + checkFlag(utils.L1ContractAddressRetrieveFlag.Name, cfg.L1ContractAddressCheck) } From 13f7a82d53fce774aead12c07c7d57c3a7afa310 Mon Sep 17 00:00:00 2001 From: Moretti Georgiev Date: Mon, 7 Oct 2024 17:58:50 +0300 Subject: [PATCH 7/9] feat: add warning for unknown flag (#1282) * refactor: split setFlagsFromConfigFile into smaller functions Signed-off-by: Moreti Georgiev * feat: log warning when flag is unknown Signed-off-by: Moreti Georgiev * refactor: make failed setting flag error message more descriptive Signed-off-by: Moreti Georgiev --------- Signed-off-by: Moreti Georgiev --- cmd/cdk-erigon/main.go | 148 +++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 51 deletions(-) diff --git a/cmd/cdk-erigon/main.go b/cmd/cdk-erigon/main.go index d8422b8faef..c4cc1375977 100644 --- a/cmd/cdk-erigon/main.go +++ b/cmd/cdk-erigon/main.go @@ -75,62 +75,108 @@ func runErigon(cliCtx *cli.Context) error { } func setFlagsFromConfigFile(ctx *cli.Context, filePath string) error { - fileExtension := filepath.Ext(filePath) - - fileConfig := make(map[string]interface{}) + cfg, err := fileConfig(filePath) + if err != nil { + return err + } - if fileExtension == ".yaml" { - yamlFile, err := os.ReadFile(filePath) - if err != nil { - return err - } - err = yaml.Unmarshal(yamlFile, fileConfig) - if err != nil { - return err - } - } else if fileExtension == ".toml" { - tomlFile, err := os.ReadFile(filePath) - if err != nil { - return err + for key, value := range cfg { + if ctx.IsSet(key) { + continue } - err = toml.Unmarshal(tomlFile, &fileConfig) - if err != nil { + + if err := setFlag(ctx, key, value); err != nil { return err } - } else { - return errors.New("config files only accepted are .yaml and .toml") - } - // sets global flags to value in yaml/toml file - for key, value := range fileConfig { - if !ctx.IsSet(key) { - if reflect.ValueOf(value).Kind() == reflect.Slice { - sliceInterface := value.([]interface{}) - s := make([]string, len(sliceInterface)) - for i, v := range sliceInterface { - s[i] = fmt.Sprintf("%v", v) - } - if err := ctx.Set(key, strings.Join(s, ",")); err != nil { - if deprecatedFlag, found := erigoncli.DeprecatedFlags[key]; found { - if deprecatedFlag == "" { - return fmt.Errorf("failed setting %s flag it is deprecated, remove it", key) - } - return fmt.Errorf("failed setting %s flag it is deprecated, use %s instead", key, deprecatedFlag) - } - return fmt.Errorf("failed setting %s flag with values=%s error=%s", key, s, err) - } - } else { - if err := ctx.Set(key, fmt.Sprintf("%v", value)); err != nil { - if deprecatedFlag, found := erigoncli.DeprecatedFlags[key]; found { - if deprecatedFlag == "" { - return fmt.Errorf("failed setting %s flag it is deprecated, remove it", key) - } - return fmt.Errorf("failed setting %s flag it is deprecated, use %s instead", key, deprecatedFlag) - } - return fmt.Errorf("failed setting %s flag with value=%v error=%s", key, value, err) - } - } - } } return nil } + +func fileConfig(filePath string) (map[string]interface{}, error) { + fileExtension := filepath.Ext(filePath) + switch fileExtension { + case ".yaml": + return yamlConfig(filePath) + case ".toml": + return tomlConfig(filePath) + default: + return nil, errors.New("config files only accepted are .yaml and .toml") + } +} + +func yamlConfig(filePath string) (map[string]interface{}, error) { + cfg := make(map[string]interface{}) + yamlFile, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + err = yaml.Unmarshal(yamlFile, &cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +func tomlConfig(filePath string) (map[string]interface{}, error) { + cfg := make(map[string]interface{}) + + tomlFile, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + err = toml.Unmarshal(tomlFile, &cfg) + if err != nil { + return nil, err + } + + return cfg, nil +} + +func setFlag(ctx *cli.Context, key string, value interface{}) error { + isSlice := reflect.ValueOf(value).Kind() == reflect.Slice + if isSlice { + return setMultiValueFlag(ctx, key, value) + } + return setSingleValueFlag(ctx, key, value) +} + +func setMultiValueFlag(ctx *cli.Context, key string, value interface{}) error { + sliceInterface := value.([]interface{}) + slice := make([]string, len(sliceInterface)) + for i, v := range sliceInterface { + slice[i] = fmt.Sprintf("%v", v) + } + + return setFlagInContext(ctx, key, strings.Join(slice, ",")) +} + +func setSingleValueFlag(ctx *cli.Context, key string, value interface{}) error { + return setFlagInContext(ctx, key, fmt.Sprintf("%v", value)) +} + +func setFlagInContext(ctx *cli.Context, key, value string) error { + if err := ctx.Set(key, value); err != nil { + return handleFlagError(key, value, err) + } + return nil +} + +func handleFlagError(key, value string, err error) error { + if deprecatedFlag, found := erigoncli.DeprecatedFlags[key]; found { + if deprecatedFlag == "" { + return fmt.Errorf("failed setting %s flag: it is deprecated, remove it", key) + } + return fmt.Errorf("failed setting %s flag: it is deprecated, use %s instead", key, deprecatedFlag) + } + + errUnknownFlag := fmt.Errorf("no such flag -%s", key) + if err.Error() == errUnknownFlag.Error() { + log.Warn("🚨 failed setting flag: unknown flag provided", "key", key, "value", value) + return nil + } + + return fmt.Errorf("failed setting %s flag with value=%s, error=%s", key, value, err) +} From d98050d71106fdbaa89ac4e5fd760d23ab63f09b Mon Sep 17 00:00:00 2001 From: Thiago Coimbra Lemos Date: Mon, 7 Oct 2024 12:01:50 -0300 Subject: [PATCH 8/9] [Tool] Document with all RPC endpoints (#1256) * adds a tool to generate a markdown document with all implemented rpc endpoints * add gha workflow to check RPC endpoints doc --- .github/workflows/doc-rpc.yml | 26 +++++ docs/endpoints/Makefile | 13 +++ docs/endpoints/README.md | 19 ++++ docs/endpoints/endpoints.md | 200 ++++++++++++++++++++++++++++++++++ docs/endpoints/main.go | 106 ++++++++++++++++++ docs/endpoints/template.md | 16 +++ 6 files changed, 380 insertions(+) create mode 100644 .github/workflows/doc-rpc.yml create mode 100644 docs/endpoints/Makefile create mode 100644 docs/endpoints/README.md create mode 100644 docs/endpoints/endpoints.md create mode 100644 docs/endpoints/main.go create mode 100644 docs/endpoints/template.md diff --git a/.github/workflows/doc-rpc.yml b/.github/workflows/doc-rpc.yml new file mode 100644 index 00000000000..5418dc825e6 --- /dev/null +++ b/.github/workflows/doc-rpc.yml @@ -0,0 +1,26 @@ +name: RPC endpoint doc +on: + push: + branches: + - zkevm + pull_request: + branches: + - zkevm + types: + - opened + - reopened + - synchronize + - ready_for_review + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version: '1.19' + - name: Check RPC endpoints doc + run: | + cd ./docs/endpoints + make check-doc \ No newline at end of file diff --git a/docs/endpoints/Makefile b/docs/endpoints/Makefile new file mode 100644 index 00000000000..d9af1597ee3 --- /dev/null +++ b/docs/endpoints/Makefile @@ -0,0 +1,13 @@ +DOC_NAME:="endpoints.md" + +.PHONY: gen-doc +gen-doc: + go run main.go $(DOC_NAME) + +.PHONY: check-doc +check-doc: + go run main.go tmp$(DOC_NAME) + cmp -s ./$(DOC_NAME) ./tmp$(DOC_NAME); \ + RETVAL=$$?; \ + rm ./tmp$(DOC_NAME); \ + exit $$RETVAL \ No newline at end of file diff --git a/docs/endpoints/README.md b/docs/endpoints/README.md new file mode 100644 index 00000000000..e68b2f6a3bc --- /dev/null +++ b/docs/endpoints/README.md @@ -0,0 +1,19 @@ +# JSON RPC Endpoints + +This tool is used to generate the list of supported endpoints provided by the JSON-RPC server as a markdown document. + +It uses reflection to go through all API interfaces and then merge the information with the content of the [template.md](./template.md) as the base to generate the [endpoints.md](./endpoints.md) file. + +To generate the file ensure you have `make` and `go` installed, then run available on this directory: + +```bash +make gen-doc +``` + +The [endpoints.md](./endpoints.md) must always be generated when a new endpoint is added, removed or changed so we can have this file pushed to the repo for further use. + +There is also a command to check if the current file is compatible with the current code. This command is meant to be used in the CI do ensure the doc is updated. + +```bash +make check-doc +``` \ No newline at end of file diff --git a/docs/endpoints/endpoints.md b/docs/endpoints/endpoints.md new file mode 100644 index 00000000000..b6733ffa0d1 --- /dev/null +++ b/docs/endpoints/endpoints.md @@ -0,0 +1,200 @@ + + +# RPC Endpoints + +> DO NOT EDIT THIS FILE. +This document was auto generated by [./main.go](./main.go) based on the interfaces the JSON-RPC server use to expose the public endpoints +For more details read the [./README.md](./README.md) + +Here you will find the list of all supported JSON RPC endpoints. +If the endpoint is not in the list below, it means this specific endpoint is not supported yet, feel free to open an issue requesting it to be added and please explain the reason why you need it. + +## admin + +- admin_nodeInfo +- admin_peers + +## bor + +- bor_getAuthor +- bor_getCurrentProposer +- bor_getCurrentValidators +- bor_getRootHash +- bor_getSigners +- bor_getSignersAtHash +- bor_getSnapshot +- bor_getSnapshotAtHash + +## debug + +- debug_accountAt +- debug_accountRange +- debug_getModifiedAccountsByHash +- debug_getModifiedAccountsByNumber +- debug_storageRangeAt +- debug_traceBlockByHash +- debug_traceBlockByNumber +- debug_traceCall +- debug_traceTransaction +- debug_traceTransactionCounters + +## engine + +- engine_exchangeTransitionConfigurationV1 +- engine_forkchoiceUpdatedV1 +- engine_forkchoiceUpdatedV2 +- engine_getPayloadBodiesByHashV1 +- engine_getPayloadBodiesByRangeV1 +- engine_getPayloadV1 +- engine_getPayloadV2 +- engine_newPayloadV1 +- engine_newPayloadV2 + +## erigon + +- erigon_blockNumber +- erigon_cumulativeChainTraffic +- erigon_forks +- erigon_getBalanceChangesInBlock +- erigon_getBlockByTimestamp +- erigon_getBlockReceiptsByBlockHash +- erigon_getHeaderByHash +- erigon_getHeaderByNumber +- erigon_getLatestLogs +- erigon_getLogs +- erigon_getLogsByHash +- erigon_nodeInfo + +## eth + +- eth_accounts +- eth_blockNumber +- eth_call +- eth_chainId +- eth_coinbase +- eth_createAccessList +- eth_estimateGas +- eth_gasPrice +- eth_getBalance +- eth_getBlockByHash +- eth_getBlockByNumber +- eth_getBlockReceipts +- eth_getBlockTransactionCountByHash +- eth_getBlockTransactionCountByNumber +- eth_getCode +- eth_getFilterChanges +- eth_getFilterLogs +- eth_getLogs +- eth_getProof +- eth_getRawTransactionByBlockHashAndIndex +- eth_getRawTransactionByBlockNumberAndIndex +- eth_getRawTransactionByHash +- eth_getStorageAt +- eth_getTransactionByBlockHashAndIndex +- eth_getTransactionByBlockNumberAndIndex +- eth_getTransactionByHash +- eth_getTransactionCount +- eth_getTransactionReceipt +- eth_getUncleByBlockHashAndIndex +- eth_getUncleByBlockNumberAndIndex +- eth_getUncleCountByBlockHash +- eth_getUncleCountByBlockNumber +- eth_getWork +- eth_hashrate +- eth_mining +- eth_newBlockFilter +- eth_newFilter +- eth_newPendingTransactionFilter +- eth_protocolVersion +- eth_sendRawTransaction +- eth_sendTransaction +- eth_sign +- eth_signTransaction +- eth_submitHashrate +- eth_submitWork +- eth_syncing +- eth_uninstallFilter + +## graphql + +- graphql_getBlockDetails +- graphql_getChainID + +## net + +- net_listening +- net_peerCount +- net_version + +## otterscan + +- otterscan_getApiLevel +- otterscan_getBlockDetails +- otterscan_getBlockDetailsByHash +- otterscan_getBlockTransactions +- otterscan_getContractCreator +- otterscan_getInternalOperations +- otterscan_getTransactionBySenderAndNonce +- otterscan_getTransactionError +- otterscan_hasCode +- otterscan_searchTransactionsAfter +- otterscan_searchTransactionsBefore +- otterscan_traceTransaction + +## parity + +- parity_listStorageKeys + +## trace + +- trace_block +- trace_call +- trace_callMany +- trace_filter +- trace_get +- trace_rawTransaction +- trace_replayBlockTransactions +- trace_replayTransaction +- trace_transaction + +## txpool + +- txpool_content +- txpool_limbo + +## web3 + +- web3_clientVersion +- web3_sha3 + +## zkevm + +- zkevm_batchNumber +- zkevm_batchNumberByBlockNumber +- zkevm_consolidatedBlockNumber +- zkevm_estimateCounters +- zkevm_getBatchByNumber +- zkevm_getBatchCountersByNumber +- zkevm_getBatchWitness +- zkevm_getBlockRangeWitness +- zkevm_getExitRootTable +- zkevm_getExitRootsByGER +- zkevm_getForkById +- zkevm_getForkId +- zkevm_getForkIdByBatchNumber +- zkevm_getForks +- zkevm_getFullBlockByHash +- zkevm_getFullBlockByNumber +- zkevm_getL2BlockInfoTree +- zkevm_getLatestGlobalExitRoot +- zkevm_getProverInput +- zkevm_getVersionHistory +- zkevm_getWitness +- zkevm_isBlockConsolidated +- zkevm_isBlockVirtualized +- zkevm_verifiedBatchNumber +- zkevm_virtualBatchNumber diff --git a/docs/endpoints/main.go b/docs/endpoints/main.go new file mode 100644 index 00000000000..4acdd236d6a --- /dev/null +++ b/docs/endpoints/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "fmt" + "os" + "reflect" + "sort" + "text/template" + "unicode" + "unicode/utf8" + + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/commands" +) + +const disclaimer = `DO NOT EDIT THIS FILE. +This document was auto generated by [./main.go](./main.go) based on the interfaces the JSON-RPC server use to expose the public endpoints +For more details read the [./README.md](./README.md)` + +type Document struct { + Disclaimer string + EndpointGroups []keyValue +} + +type keyValue struct { + Key any + Value any +} + +func main() { + apiInterfaces := []keyValue{ + {"admin", (*commands.AdminAPI)(nil)}, + {"bor", (*commands.BorAPI)(nil)}, + {"debug", (*commands.PrivateDebugAPI)(nil)}, + {"engine", (*commands.EngineAPI)(nil)}, + {"erigon", (*commands.ErigonAPI)(nil)}, + {"eth", (*commands.EthAPI)(nil)}, + {"graphql", (*commands.GraphQLAPI)(nil)}, + {"net", (*commands.NetAPI)(nil)}, + {"otterscan", (*commands.OtterscanAPI)(nil)}, + {"parity", (*commands.ParityAPI)(nil)}, + {"trace", (*commands.TraceAPI)(nil)}, + {"txpool", (*commands.TxPoolAPI)(nil)}, + {"web3", (*commands.Web3API)(nil)}, + {"zkevm", (*commands.ZkEvmAPI)(nil)}, + } + + endpointGroups := []keyValue{} + for _, apiInterface := range apiInterfaces { + apiPrefix := apiInterface.Key + apiInterfaceType := apiInterface.Value + + apiEndpoints := []string{} + interfaceType := reflect.TypeOf(apiInterfaceType).Elem() + for methodIndex := 0; methodIndex < interfaceType.NumMethod(); methodIndex++ { + methodName := interfaceType.Method(methodIndex).Name + methodName = firstToLower(methodName) + endpointName := fmt.Sprintf("%s_%s", apiPrefix, methodName) + apiEndpoints = append(apiEndpoints, endpointName) + } + + sort.Slice(apiEndpoints, func(i, j int) bool { + return apiEndpoints[i] < apiEndpoints[j] + }) + endpointGroup := keyValue{Key: apiPrefix, Value: apiEndpoints} + + endpointGroups = append(endpointGroups, endpointGroup) + } + + fileName := "endpoints.md" + fmt.Println(os.Args) + if len(os.Args) > 1 { + fileName = os.Args[1] + } + + f, err := createOrOpen(fileName) + checkErr(err) + + t := template.Must(template.New("template.md").ParseFiles("template.md")) + err = t.Execute(f, Document{disclaimer, endpointGroups}) + checkErr(err) +} + +func checkErr(err error) { + if err != nil { + panic(err) + } +} + +func createOrOpen(name string) (*os.File, error) { + if _, err := os.Stat(name); err == nil { + os.Remove(name) + } + return os.Create(name) +} + +func firstToLower(s string) string { + r, size := utf8.DecodeRuneInString(s) + if r == utf8.RuneError && size <= 1 { + return s + } + lc := unicode.ToLower(r) + if r == lc { + return s + } + return string(lc) + s[size:] +} diff --git a/docs/endpoints/template.md b/docs/endpoints/template.md new file mode 100644 index 00000000000..9d371c17266 --- /dev/null +++ b/docs/endpoints/template.md @@ -0,0 +1,16 @@ + + +# RPC Endpoints + +> {{ .Disclaimer }} + +Here you will find the list of all supported JSON RPC endpoints. +If the endpoint is not in the list below, it means this specific endpoint is not supported yet, feel free to open an issue requesting it to be added and please explain the reason why you need it. +{{ range .EndpointGroups }} +## {{ .Key }} +{{ range .Value }} +- {{ . }} +{{- end }} +{{ end }} \ No newline at end of file From 9ded83eae573572ef8a643d23f0283c82b580621 Mon Sep 17 00:00:00 2001 From: Jerry Date: Mon, 7 Oct 2024 08:02:39 -0700 Subject: [PATCH 9/9] Bump up kurtosis version to v0.2.12 (#1263) --- .github/workflows/ci_zkevm.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci_zkevm.yml b/.github/workflows/ci_zkevm.yml index c152e14473c..c484dfa1758 100644 --- a/.github/workflows/ci_zkevm.yml +++ b/.github/workflows/ci_zkevm.yml @@ -78,7 +78,7 @@ jobs: uses: actions/checkout@v4 with: repository: 0xPolygon/kurtosis-cdk - ref: v0.2.7 + ref: v0.2.12 path: kurtosis-cdk - name: Install Kurtosis CDK tools @@ -116,7 +116,7 @@ jobs: - name: Monitor verified batches working-directory: ./kurtosis-cdk shell: bash - run: timeout 900s .github/scripts/monitor-verified-batches.sh --rpc-url $(kurtosis port print cdk-v1 cdk-erigon-node-001 http-rpc) --target 20 --timeout 900 + run: timeout 900s .github/scripts/monitor-verified-batches.sh --rpc-url $(kurtosis port print cdk-v1 cdk-erigon-node-001 rpc) --target 20 --timeout 900 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 @@ -126,7 +126,7 @@ jobs: kurtosis files download cdk-v1 bridge-config-artifact echo "BRIDGE_ADDRESS=$(/usr/local/bin/yq '.NetworkConfig.PolygonBridgeAddress' bridge-config-artifact/bridge-config.toml)" >> $GITHUB_ENV echo "ETH_RPC_URL=$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" >> $GITHUB_ENV - echo "L2_RPC_URL=$(kurtosis port print cdk-v1 cdk-erigon-node-001 http-rpc)" >> $GITHUB_ENV + echo "L2_RPC_URL=$(kurtosis port print cdk-v1 cdk-erigon-node-001 rpc)" >> $GITHUB_ENV echo "BRIDGE_API_URL=$(kurtosis port print cdk-v1 zkevm-bridge-service-001 rpc)" >> $GITHUB_ENV - name: Fund claim tx manager @@ -150,7 +150,7 @@ jobs: TestL1AddrPrivate="0x12d7de8621a77640c9241b2595ba78ce443d05e94090365ab3bb5e19df82c625" TestL2AddrPrivate="0x12d7de8621a77640c9241b2595ba78ce443d05e94090365ab3bb5e19df82c625" [ConnectionConfig] - L1NodeURL="${ETH_RPC_URL}" + L1NodeURL="http://${ETH_RPC_URL}" L2NodeURL="${L2_RPC_URL}" BridgeURL="${BRIDGE_API_URL}" L1BridgeAddr="${BRIDGE_ADDRESS}"