From 6f071329715aca47bea402c341d9028e8ca6ae78 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 8 Jan 2025 11:12:18 +0000 Subject: [PATCH] fix: prover node retries gathering needed txs (#11089) This PR introduces retries when the prover node gathers the necessary txs for an epoch proof. This is necessary because on a fresh boot, the prover might not be connected to any peers so it will fail to acquire the necessary data. Fix for #11063 TODO: - [x] expose new config as env vars --- .../aztec-network/templates/prover-node.yaml | 6 ++ spartan/aztec-network/values.yaml | 4 + .../src/e2e_prover/e2e_prover_test.ts | 3 + yarn-project/end-to-end/src/fixtures/utils.ts | 3 + yarn-project/foundation/src/config/env_var.ts | 3 + yarn-project/foundation/src/retry/index.ts | 3 +- yarn-project/prover-node/src/config.ts | 18 +++++ yarn-project/prover-node/src/factory.ts | 3 + .../prover-node/src/prover-node.test.ts | 59 ++++++++++---- yarn-project/prover-node/src/prover-node.ts | 79 ++++++++++++++++--- 10 files changed, 155 insertions(+), 26 deletions(-) diff --git a/spartan/aztec-network/templates/prover-node.yaml b/spartan/aztec-network/templates/prover-node.yaml index f28ea686b17..0ee0241a1d3 100644 --- a/spartan/aztec-network/templates/prover-node.yaml +++ b/spartan/aztec-network/templates/prover-node.yaml @@ -145,6 +145,12 @@ spec: value: "{{ .Values.proverNode.proverBroker.dataDirectory }}" - name: PROVER_PUBLISHER_PRIVATE_KEY value: "{{ .Values.proverNode.proverPublisherPrivateKey }}" + - name: PROVER_NODE_TX_GATHERING_TIMEOUT_MS + value: "{{ .Values.proverNode.txGathering.timeoutMs }}" + - name: PROVER_NODE_TX_GATHERING_INTERVAL_MS + value: "{{ .Values.proverNode.txGathering.intervalMs }}" + - name: PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS + value: "{{ .Values.proverNode.txGathering.maxParallelRequests }}" - name: OTEL_RESOURCE_ATTRIBUTES value: service.name={{ .Release.Name }},service.namespace={{ .Release.Namespace }},service.version={{ .Chart.AppVersion }},environment={{ .Values.environment | default "production" }} - name: L1_CHAIN_ID diff --git a/spartan/aztec-network/values.yaml b/spartan/aztec-network/values.yaml index efdac924caa..ea363b9c88d 100644 --- a/spartan/aztec-network/values.yaml +++ b/spartan/aztec-network/values.yaml @@ -151,6 +151,10 @@ proverNode: archiverViemPollingInterval: 1000 pollInterval: 1000 viemPollingInterval: 1000 + txGathering: + timeoutMs: 60000 + intervalMs: 1000 + maxParallelRequests: 100 pxe: logLevel: "debug; info: aztec:simulator, json-rpc" diff --git a/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts b/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts index aac3af2db0a..dd212a72379 100644 --- a/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts +++ b/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts @@ -275,6 +275,9 @@ export class FullProverTest { quoteProviderBondAmount: 1000n, proverMinimumEscrowAmount: 3000n, proverTargetEscrowAmount: 6000n, + txGatheringTimeoutMs: 60000, + txGatheringIntervalMs: 1000, + txGatheringMaxParallelRequests: 100, }; this.proverNode = await createProverNode(proverConfig, { aztecNodeTxProvider: this.aztecNode, diff --git a/yarn-project/end-to-end/src/fixtures/utils.ts b/yarn-project/end-to-end/src/fixtures/utils.ts index a0713351efe..85ae078728f 100644 --- a/yarn-project/end-to-end/src/fixtures/utils.ts +++ b/yarn-project/end-to-end/src/fixtures/utils.ts @@ -745,6 +745,9 @@ export async function createAndSyncProverNode( quoteProviderBondAmount: 1000n, proverMinimumEscrowAmount: 1000n, proverTargetEscrowAmount: 2000n, + txGatheringTimeoutMs: 60000, + txGatheringIntervalMs: 1000, + txGatheringMaxParallelRequests: 100, }; // Use testing l1 publisher diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index d45d8d36f63..8689a5512d5 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -122,6 +122,9 @@ export type EnvVar = | 'PROVER_NODE_POLLING_INTERVAL_MS' | 'PROVER_NODE_MAX_PENDING_JOBS' | 'PROVER_NODE_MAX_PARALLEL_BLOCKS_PER_EPOCH' + | 'PROVER_NODE_TX_GATHERING_TIMEOUT_MS' + | 'PROVER_NODE_TX_GATHERING_INTERVAL_MS' + | 'PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS' | 'PROVER_PUBLISH_RETRY_INTERVAL_MS' | 'PROVER_PUBLISHER_PRIVATE_KEY' | 'PROVER_REAL_PROOFS' diff --git a/yarn-project/foundation/src/retry/index.ts b/yarn-project/foundation/src/retry/index.ts index ce05747f7da..1fc503743cb 100644 --- a/yarn-project/foundation/src/retry/index.ts +++ b/yarn-project/foundation/src/retry/index.ts @@ -1,3 +1,4 @@ +import { TimeoutError } from '../error/index.js'; import { createLogger } from '../log/index.js'; import { sleep } from '../sleep/index.js'; import { Timer } from '../timer/index.js'; @@ -93,7 +94,7 @@ export async function retryUntil(fn: () => Promise, name = '', await sleep(interval * 1000); if (timeout && timer.s() > timeout) { - throw new Error(name ? `Timeout awaiting ${name}` : 'Timeout'); + throw new TimeoutError(name ? `Timeout awaiting ${name}` : 'Timeout'); } } } diff --git a/yarn-project/prover-node/src/config.ts b/yarn-project/prover-node/src/config.ts index 7bdde17af5c..ffd598c8a39 100644 --- a/yarn-project/prover-node/src/config.ts +++ b/yarn-project/prover-node/src/config.ts @@ -53,6 +53,9 @@ type SpecificProverNodeConfig = { proverNodeMaxPendingJobs: number; proverNodePollingIntervalMs: number; proverNodeMaxParallelBlocksPerEpoch: number; + txGatheringTimeoutMs: number; + txGatheringIntervalMs: number; + txGatheringMaxParallelRequests: number; }; export type QuoteProviderConfig = { @@ -77,6 +80,21 @@ const specificProverNodeConfigMappings: ConfigMappingsType = { diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index f7f777626fb..84bf1b39ad4 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -71,6 +71,9 @@ export async function createProverNode( maxPendingJobs: config.proverNodeMaxPendingJobs, pollingIntervalMs: config.proverNodePollingIntervalMs, maxParallelBlocksPerEpoch: config.proverNodeMaxParallelBlocksPerEpoch, + txGatheringMaxParallelRequests: config.txGatheringMaxParallelRequests, + txGatheringIntervalMs: config.txGatheringIntervalMs, + txGatheringTimeoutMs: config.txGatheringTimeoutMs, }; const claimsMonitor = new ClaimsMonitor(publisher, telemetry, proverNodeConfig); diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 56581e5e23d..0e5068ecded 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -1,22 +1,20 @@ import { - type Body, type EpochProofClaim, EpochProofQuote, EpochProofQuotePayload, type EpochProverManager, type L1ToL2MessageSource, - type L2Block, + L2Block, type L2BlockSource, type MerkleTreeWriteOperations, P2PClientType, type ProverCoordination, type Tx, - type TxEffect, - TxHash, + type TxHash, WorldStateRunningState, type WorldStateSynchronizer, } from '@aztec/circuit-types'; -import { type ContractDataSource, EthAddress, Fr } from '@aztec/circuits.js'; +import { type ContractDataSource, EthAddress } from '@aztec/circuits.js'; import { type EpochCache } from '@aztec/epoch-cache'; import { times } from '@aztec/foundation/collection'; import { Signature } from '@aztec/foundation/eth-signature'; @@ -66,7 +64,7 @@ describe('prover-node', () => { let claim: MockProxy; // Blocks returned by the archiver - let blocks: MockProxy[]; + let blocks: L2Block[]; // Address of the publisher let address: EthAddress; @@ -120,7 +118,14 @@ describe('prover-node', () => { bondManager = mock(); telemetryClient = new NoopTelemetryClient(); - config = { maxPendingJobs: 3, pollingIntervalMs: 10, maxParallelBlocksPerEpoch: 32 }; + config = { + maxPendingJobs: 3, + pollingIntervalMs: 10, + maxParallelBlocksPerEpoch: 32, + txGatheringMaxParallelRequests: 10, + txGatheringIntervalMs: 100, + txGatheringTimeoutMs: 1000, + }; // World state returns a new mock db every time it is asked to fork worldState.fork.mockImplementation(() => Promise.resolve(mock())); @@ -141,13 +146,7 @@ describe('prover-node', () => { quoteSigner.sign.mockImplementation(payload => Promise.resolve(new EpochProofQuote(payload, Signature.empty()))); // We create 3 fake blocks with 1 tx effect each - blocks = times(3, i => - mock({ - number: i + 20, - hash: () => new Fr(i), - body: mock({ txEffects: [mock({ txHash: TxHash.random() } as TxEffect)] }), - }), - ); + blocks = times(3, i => L2Block.random(i + 20, 1)); // Archiver returns a bunch of fake blocks l2BlockSource.getBlocksForEpoch.mockResolvedValue(blocks); @@ -296,6 +295,38 @@ describe('prover-node', () => { expect(jobs.length).toEqual(1); }); + it('retries acquiring txs if they are not immediately available', async () => { + l2BlockSource.getL2EpochNumber.mockResolvedValue(11n); + publisher.getProofClaim.mockResolvedValue(claim); + const mockGetTxByHash = mockCoordination.getTxByHash.getMockImplementation(); + mockCoordination.getTxByHash.mockResolvedValue(undefined); + + await proverNode.start(); + await sleep(100); + + // initially no job will be started because the txs aren't available + expect(jobs).toHaveLength(0); + expect(mockCoordination.getTxByHash).toHaveBeenCalled(); + + mockCoordination.getTxByHash.mockImplementation(mockGetTxByHash); + await sleep(100); + + // now it should have all the txs necessary to start proving + expect(jobs[0].epochNumber).toEqual(10n); + expect(jobs.length).toEqual(1); + }); + + it('does not start proving if txs are not all available', async () => { + l2BlockSource.getL2EpochNumber.mockResolvedValue(11n); + publisher.getProofClaim.mockResolvedValue(claim); + + mockCoordination.getTxByHash.mockResolvedValue(undefined); + + await proverNode.start(); + await sleep(2000); + expect(jobs).toHaveLength(0); + }); + it('does not start proving if there is a claim for proven epoch during initial sync', async () => { l2BlockSource.getProvenL2EpochNumber.mockResolvedValue(10); publisher.getProofClaim.mockResolvedValue(claim); diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 10a65b3594f..4f341ae58d7 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -11,12 +11,16 @@ import { type ProverNodeApi, type Service, type Tx, + type TxHash, type WorldStateSynchronizer, tryStop, } from '@aztec/circuit-types'; import { type ContractDataSource } from '@aztec/circuits.js'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { compact } from '@aztec/foundation/collection'; +import { TimeoutError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; +import { retryUntil } from '@aztec/foundation/retry'; import { DateProvider } from '@aztec/foundation/timer'; import { type Maybe } from '@aztec/foundation/types'; import { type P2P } from '@aztec/p2p'; @@ -36,6 +40,9 @@ export type ProverNodeOptions = { pollingIntervalMs: number; maxPendingJobs: number; maxParallelBlocksPerEpoch: number; + txGatheringTimeoutMs: number; + txGatheringIntervalMs: number; + txGatheringMaxParallelRequests: number; }; /** @@ -76,6 +83,9 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr pollingIntervalMs: 1_000, maxPendingJobs: 100, maxParallelBlocksPerEpoch: 32, + txGatheringTimeoutMs: 60_000, + txGatheringIntervalMs: 1_000, + txGatheringMaxParallelRequests: 100, ...compact(options), }; @@ -302,21 +312,68 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr } private async gatherTxs(epochNumber: bigint, blocks: L2Block[]) { - const txs = await Promise.all( - blocks.flatMap(block => - block.body.txEffects - .map(tx => tx.txHash) - .map(txHash => this.coordination.getTxByHash(txHash).then(tx => [block.number, txHash, tx] as const)), - ), + let txsToFind: TxHash[] = []; + const txHashToBlock = new Map(); + const results = new Map(); + + for (const block of blocks) { + for (const tx of block.body.txEffects) { + txsToFind.push(tx.txHash); + txHashToBlock.set(tx.txHash.toString(), block.number); + } + } + + const totalTxsRequired = txsToFind.length; + this.log.info( + `Gathering a total of ${totalTxsRequired} txs for epoch=${epochNumber} made up of ${blocks.length} blocks`, + { epochNumber }, ); - const notFound = txs.filter(([_blockNum, _txHash, tx]) => !tx); - if (notFound.length) { - const notFoundList = notFound.map(([blockNum, txHash]) => `${txHash.toString()} (block ${blockNum})`).join(', '); - throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`); + let iteration = 0; + try { + await retryUntil( + async () => { + const batch = [...txsToFind]; + txsToFind = []; + const batchResults = await asyncPool(this.options.txGatheringMaxParallelRequests, batch, async txHash => { + const tx = await this.coordination.getTxByHash(txHash); + return [txHash, tx] as const; + }); + let found = 0; + for (const [txHash, maybeTx] of batchResults) { + if (maybeTx) { + found++; + results.set(txHash.toString(), maybeTx); + } else { + txsToFind.push(txHash); + } + } + + this.log.verbose( + `Gathered ${found}/${batch.length} txs in iteration ${iteration} for epoch ${epochNumber}. In total ${results.size}/${totalTxsRequired} have been retrieved.`, + { epochNumber }, + ); + iteration++; + + // stop when we found all transactions + return txsToFind.length === 0; + }, + 'Gather txs', + this.options.txGatheringTimeoutMs / 1_000, + this.options.txGatheringIntervalMs / 1_000, + ); + } catch (err) { + if (err && err instanceof TimeoutError) { + const notFoundList = txsToFind + .map(txHash => `${txHash.toString()} (block ${txHashToBlock.get(txHash.toString())})`) + .join(', '); + throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`); + } else { + throw err; + } } - return txs.map(([_blockNumber, _txHash, tx]) => tx!); + return Array.from(results.values()); } /** Extracted for testing purposes. */