Skip to content

Commit

Permalink
fix: prover node retries gathering needed txs (#11089)
Browse files Browse the repository at this point in the history
This PR introduces retries when the prover node gathers the necessary
txs for an epoch proof. This is necessary because on a fresh boot, the
prover might not be connected to any peers so it will fail to acquire
the necessary data.

Fix for #11063 

TODO:
- [x] expose new config as env vars
  • Loading branch information
alexghr authored Jan 8, 2025
1 parent 4ed1530 commit 6f07132
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 26 deletions.
6 changes: 6 additions & 0 deletions spartan/aztec-network/templates/prover-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ spec:
value: "{{ .Values.proverNode.proverBroker.dataDirectory }}"
- name: PROVER_PUBLISHER_PRIVATE_KEY
value: "{{ .Values.proverNode.proverPublisherPrivateKey }}"
- name: PROVER_NODE_TX_GATHERING_TIMEOUT_MS
value: "{{ .Values.proverNode.txGathering.timeoutMs }}"
- name: PROVER_NODE_TX_GATHERING_INTERVAL_MS
value: "{{ .Values.proverNode.txGathering.intervalMs }}"
- name: PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS
value: "{{ .Values.proverNode.txGathering.maxParallelRequests }}"
- name: OTEL_RESOURCE_ATTRIBUTES
value: service.name={{ .Release.Name }},service.namespace={{ .Release.Namespace }},service.version={{ .Chart.AppVersion }},environment={{ .Values.environment | default "production" }}
- name: L1_CHAIN_ID
Expand Down
4 changes: 4 additions & 0 deletions spartan/aztec-network/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ proverNode:
archiverViemPollingInterval: 1000
pollInterval: 1000
viemPollingInterval: 1000
txGathering:
timeoutMs: 60000
intervalMs: 1000
maxParallelRequests: 100

pxe:
logLevel: "debug; info: aztec:simulator, json-rpc"
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ export class FullProverTest {
quoteProviderBondAmount: 1000n,
proverMinimumEscrowAmount: 3000n,
proverTargetEscrowAmount: 6000n,
txGatheringTimeoutMs: 60000,
txGatheringIntervalMs: 1000,
txGatheringMaxParallelRequests: 100,
};
this.proverNode = await createProverNode(proverConfig, {
aztecNodeTxProvider: this.aztecNode,
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/src/fixtures/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,9 @@ export async function createAndSyncProverNode(
quoteProviderBondAmount: 1000n,
proverMinimumEscrowAmount: 1000n,
proverTargetEscrowAmount: 2000n,
txGatheringTimeoutMs: 60000,
txGatheringIntervalMs: 1000,
txGatheringMaxParallelRequests: 100,
};

// Use testing l1 publisher
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ export type EnvVar =
| 'PROVER_NODE_POLLING_INTERVAL_MS'
| 'PROVER_NODE_MAX_PENDING_JOBS'
| 'PROVER_NODE_MAX_PARALLEL_BLOCKS_PER_EPOCH'
| 'PROVER_NODE_TX_GATHERING_TIMEOUT_MS'
| 'PROVER_NODE_TX_GATHERING_INTERVAL_MS'
| 'PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS'
| 'PROVER_PUBLISH_RETRY_INTERVAL_MS'
| 'PROVER_PUBLISHER_PRIVATE_KEY'
| 'PROVER_REAL_PROOFS'
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/foundation/src/retry/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { TimeoutError } from '../error/index.js';
import { createLogger } from '../log/index.js';
import { sleep } from '../sleep/index.js';
import { Timer } from '../timer/index.js';
Expand Down Expand Up @@ -93,7 +94,7 @@ export async function retryUntil<T>(fn: () => Promise<T | undefined>, name = '',
await sleep(interval * 1000);

if (timeout && timer.s() > timeout) {
throw new Error(name ? `Timeout awaiting ${name}` : 'Timeout');
throw new TimeoutError(name ? `Timeout awaiting ${name}` : 'Timeout');
}
}
}
18 changes: 18 additions & 0 deletions yarn-project/prover-node/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type SpecificProverNodeConfig = {
proverNodeMaxPendingJobs: number;
proverNodePollingIntervalMs: number;
proverNodeMaxParallelBlocksPerEpoch: number;
txGatheringTimeoutMs: number;
txGatheringIntervalMs: number;
txGatheringMaxParallelRequests: number;
};

export type QuoteProviderConfig = {
Expand All @@ -77,6 +80,21 @@ const specificProverNodeConfigMappings: ConfigMappingsType<SpecificProverNodeCon
description: 'The Maximum number of blocks to process in parallel while proving an epoch',
...numberConfigHelper(32),
},
txGatheringTimeoutMs: {
env: 'PROVER_NODE_TX_GATHERING_TIMEOUT_MS',
description: 'The maximum amount of time to wait for tx data to be available',
...numberConfigHelper(60_000),
},
txGatheringIntervalMs: {
env: 'PROVER_NODE_TX_GATHERING_INTERVAL_MS',
description: 'How often to check that tx data is available',
...numberConfigHelper(1_000),
},
txGatheringMaxParallelRequests: {
env: 'PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS',
description: 'How many txs to load up a time',
...numberConfigHelper(100),
},
};

const quoteProviderConfigMappings: ConfigMappingsType<QuoteProviderConfig> = {
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/prover-node/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export async function createProverNode(
maxPendingJobs: config.proverNodeMaxPendingJobs,
pollingIntervalMs: config.proverNodePollingIntervalMs,
maxParallelBlocksPerEpoch: config.proverNodeMaxParallelBlocksPerEpoch,
txGatheringMaxParallelRequests: config.txGatheringMaxParallelRequests,
txGatheringIntervalMs: config.txGatheringIntervalMs,
txGatheringTimeoutMs: config.txGatheringTimeoutMs,
};

const claimsMonitor = new ClaimsMonitor(publisher, telemetry, proverNodeConfig);
Expand Down
59 changes: 45 additions & 14 deletions yarn-project/prover-node/src/prover-node.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import {
type Body,
type EpochProofClaim,
EpochProofQuote,
EpochProofQuotePayload,
type EpochProverManager,
type L1ToL2MessageSource,
type L2Block,
L2Block,
type L2BlockSource,
type MerkleTreeWriteOperations,
P2PClientType,
type ProverCoordination,
type Tx,
type TxEffect,
TxHash,
type TxHash,
WorldStateRunningState,
type WorldStateSynchronizer,
} from '@aztec/circuit-types';
import { type ContractDataSource, EthAddress, Fr } from '@aztec/circuits.js';
import { type ContractDataSource, EthAddress } from '@aztec/circuits.js';
import { type EpochCache } from '@aztec/epoch-cache';
import { times } from '@aztec/foundation/collection';
import { Signature } from '@aztec/foundation/eth-signature';
Expand Down Expand Up @@ -66,7 +64,7 @@ describe('prover-node', () => {
let claim: MockProxy<EpochProofClaim>;

// Blocks returned by the archiver
let blocks: MockProxy<L2Block>[];
let blocks: L2Block[];

// Address of the publisher
let address: EthAddress;
Expand Down Expand Up @@ -120,7 +118,14 @@ describe('prover-node', () => {
bondManager = mock<BondManager>();

telemetryClient = new NoopTelemetryClient();
config = { maxPendingJobs: 3, pollingIntervalMs: 10, maxParallelBlocksPerEpoch: 32 };
config = {
maxPendingJobs: 3,
pollingIntervalMs: 10,
maxParallelBlocksPerEpoch: 32,
txGatheringMaxParallelRequests: 10,
txGatheringIntervalMs: 100,
txGatheringTimeoutMs: 1000,
};

// World state returns a new mock db every time it is asked to fork
worldState.fork.mockImplementation(() => Promise.resolve(mock<MerkleTreeWriteOperations>()));
Expand All @@ -141,13 +146,7 @@ describe('prover-node', () => {
quoteSigner.sign.mockImplementation(payload => Promise.resolve(new EpochProofQuote(payload, Signature.empty())));

// We create 3 fake blocks with 1 tx effect each
blocks = times(3, i =>
mock<L2Block>({
number: i + 20,
hash: () => new Fr(i),
body: mock<Body>({ txEffects: [mock<TxEffect>({ txHash: TxHash.random() } as TxEffect)] }),
}),
);
blocks = times(3, i => L2Block.random(i + 20, 1));

// Archiver returns a bunch of fake blocks
l2BlockSource.getBlocksForEpoch.mockResolvedValue(blocks);
Expand Down Expand Up @@ -296,6 +295,38 @@ describe('prover-node', () => {
expect(jobs.length).toEqual(1);
});

it('retries acquiring txs if they are not immediately available', async () => {
l2BlockSource.getL2EpochNumber.mockResolvedValue(11n);
publisher.getProofClaim.mockResolvedValue(claim);
const mockGetTxByHash = mockCoordination.getTxByHash.getMockImplementation();
mockCoordination.getTxByHash.mockResolvedValue(undefined);

await proverNode.start();
await sleep(100);

// initially no job will be started because the txs aren't available
expect(jobs).toHaveLength(0);
expect(mockCoordination.getTxByHash).toHaveBeenCalled();

mockCoordination.getTxByHash.mockImplementation(mockGetTxByHash);
await sleep(100);

// now it should have all the txs necessary to start proving
expect(jobs[0].epochNumber).toEqual(10n);
expect(jobs.length).toEqual(1);
});

it('does not start proving if txs are not all available', async () => {
l2BlockSource.getL2EpochNumber.mockResolvedValue(11n);
publisher.getProofClaim.mockResolvedValue(claim);

mockCoordination.getTxByHash.mockResolvedValue(undefined);

await proverNode.start();
await sleep(2000);
expect(jobs).toHaveLength(0);
});

it('does not start proving if there is a claim for proven epoch during initial sync', async () => {
l2BlockSource.getProvenL2EpochNumber.mockResolvedValue(10);
publisher.getProofClaim.mockResolvedValue(claim);
Expand Down
79 changes: 68 additions & 11 deletions yarn-project/prover-node/src/prover-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import {
type ProverNodeApi,
type Service,
type Tx,
type TxHash,
type WorldStateSynchronizer,
tryStop,
} from '@aztec/circuit-types';
import { type ContractDataSource } from '@aztec/circuits.js';
import { asyncPool } from '@aztec/foundation/async-pool';
import { compact } from '@aztec/foundation/collection';
import { TimeoutError } from '@aztec/foundation/error';
import { createLogger } from '@aztec/foundation/log';
import { retryUntil } from '@aztec/foundation/retry';
import { DateProvider } from '@aztec/foundation/timer';
import { type Maybe } from '@aztec/foundation/types';
import { type P2P } from '@aztec/p2p';
Expand All @@ -36,6 +40,9 @@ export type ProverNodeOptions = {
pollingIntervalMs: number;
maxPendingJobs: number;
maxParallelBlocksPerEpoch: number;
txGatheringTimeoutMs: number;
txGatheringIntervalMs: number;
txGatheringMaxParallelRequests: number;
};

/**
Expand Down Expand Up @@ -76,6 +83,9 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr
pollingIntervalMs: 1_000,
maxPendingJobs: 100,
maxParallelBlocksPerEpoch: 32,
txGatheringTimeoutMs: 60_000,
txGatheringIntervalMs: 1_000,
txGatheringMaxParallelRequests: 100,
...compact(options),
};

Expand Down Expand Up @@ -302,21 +312,68 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr
}

private async gatherTxs(epochNumber: bigint, blocks: L2Block[]) {
const txs = await Promise.all(
blocks.flatMap(block =>
block.body.txEffects
.map(tx => tx.txHash)
.map(txHash => this.coordination.getTxByHash(txHash).then(tx => [block.number, txHash, tx] as const)),
),
let txsToFind: TxHash[] = [];
const txHashToBlock = new Map<string, number>();
const results = new Map<string, Tx>();

for (const block of blocks) {
for (const tx of block.body.txEffects) {
txsToFind.push(tx.txHash);
txHashToBlock.set(tx.txHash.toString(), block.number);
}
}

const totalTxsRequired = txsToFind.length;
this.log.info(
`Gathering a total of ${totalTxsRequired} txs for epoch=${epochNumber} made up of ${blocks.length} blocks`,
{ epochNumber },
);

const notFound = txs.filter(([_blockNum, _txHash, tx]) => !tx);
if (notFound.length) {
const notFoundList = notFound.map(([blockNum, txHash]) => `${txHash.toString()} (block ${blockNum})`).join(', ');
throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`);
let iteration = 0;
try {
await retryUntil(
async () => {
const batch = [...txsToFind];
txsToFind = [];
const batchResults = await asyncPool(this.options.txGatheringMaxParallelRequests, batch, async txHash => {
const tx = await this.coordination.getTxByHash(txHash);
return [txHash, tx] as const;
});
let found = 0;
for (const [txHash, maybeTx] of batchResults) {
if (maybeTx) {
found++;
results.set(txHash.toString(), maybeTx);
} else {
txsToFind.push(txHash);
}
}

this.log.verbose(
`Gathered ${found}/${batch.length} txs in iteration ${iteration} for epoch ${epochNumber}. In total ${results.size}/${totalTxsRequired} have been retrieved.`,
{ epochNumber },
);
iteration++;

// stop when we found all transactions
return txsToFind.length === 0;
},
'Gather txs',
this.options.txGatheringTimeoutMs / 1_000,
this.options.txGatheringIntervalMs / 1_000,
);
} catch (err) {
if (err && err instanceof TimeoutError) {
const notFoundList = txsToFind
.map(txHash => `${txHash.toString()} (block ${txHashToBlock.get(txHash.toString())})`)
.join(', ');
throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`);
} else {
throw err;
}
}

return txs.map(([_blockNumber, _txHash, tx]) => tx!);
return Array.from(results.values());
}

/** Extracted for testing purposes. */
Expand Down

0 comments on commit 6f07132

Please sign in to comment.