Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Trace and handle errors in running promises #10645

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { sleep } from '@aztec/foundation/sleep';
import { type InboxAbi, RollupAbi } from '@aztec/l1-artifacts';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';
Expand Down Expand Up @@ -84,7 +85,8 @@ describe('Archiver', () => {
}) as any,
});

instrumentation = mock({ isEnabled: () => true });
const tracer = new NoopTelemetryClient().getTracer();
instrumentation = mock<ArchiverInstrumentation>({ isEnabled: () => true, tracer });
archiverStore = new MemoryArchiverStore(1000);

archiver = new Archiver(
Expand Down
21 changes: 7 additions & 14 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import {
PrivateFunctionBroadcastedEvent,
UnconstrainedFunctionBroadcastedEvent,
} from '@aztec/protocol-contracts';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import groupBy from 'lodash.groupby';
import {
Expand Down Expand Up @@ -85,7 +85,7 @@ export type ArchiveSource = L2BlockSource &
* Responsible for handling robust L1 polling so that other components do not need to
* concern themselves with it.
*/
export class Archiver implements ArchiveSource {
export class Archiver implements ArchiveSource, Traceable {
/**
* A promise in which we will be continually fetching new L2 blocks.
*/
Expand All @@ -99,6 +99,8 @@ export class Archiver implements ArchiveSource {
public l1BlockNumber: bigint | undefined;
public l1Timestamp: bigint | undefined;

public readonly tracer: Tracer;

/**
* Creates a new instance of the Archiver.
* @param publicClient - A client for interacting with the Ethereum node.
Expand All @@ -118,6 +120,7 @@ export class Archiver implements ArchiveSource {
private readonly l1constants: L1RollupConstants,
private readonly log: Logger = createLogger('archiver'),
) {
this.tracer = instrumentation.tracer;
this.store = new ArchiverStoreHelper(dataStore);

this.rollup = getContract({
Expand Down Expand Up @@ -194,24 +197,14 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.sync(false), this.log, this.config.pollingIntervalMs);
this.runningPromise.start();
}

/**
* Syncs and catches exceptions.
*/
private async safeSync() {
try {
await this.sync(false);
} catch (error) {
this.log.error('Error syncing archiver', error);
}
}

/**
* Fetches logs from L1 contracts and processes them.
*/
@trackSpan('Archiver.sync', initialRun => ({ [Attributes.INITIAL_SYNC]: initialRun }))
private async sync(initialRun: boolean) {
/**
* We keep track of three "pointers" to L1 blocks:
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/archiver/src/archiver/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import {
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type Tracer,
type UpDownCounter,
ValueType,
} from '@aztec/telemetry-client';

export class ArchiverInstrumentation {
public readonly tracer: Tracer;

private blockHeight: Gauge;
private blockSize: Gauge;
private syncDuration: Histogram;
Expand All @@ -24,6 +27,7 @@ export class ArchiverInstrumentation {
private log = createLogger('archiver:instrumentation');

private constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) {
this.tracer = telemetry.getTracer('Archiver');
const meter = telemetry.getMeter('Archiver');
this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, {
description: 'The height of the latest block processed by the archiver',
Expand Down
11 changes: 8 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import {
import { ProtocolContractAddress } from '@aztec/protocol-contracts';
import { GlobalVariableBuilder, type L1Publisher, SequencerClient } from '@aztec/sequencer-client';
import { PublicProcessorFactory } from '@aztec/simulator';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';
import { createValidatorClient } from '@aztec/validator-client';
import { createWorldStateSynchronizer } from '@aztec/world-state';
Expand All @@ -85,11 +85,12 @@ import { NodeMetrics } from './node_metrics.js';
/**
* The aztec node.
*/
export class AztecNodeService implements AztecNode {
export class AztecNodeService implements AztecNode, Traceable {
private packageVersion: string;

private metrics: NodeMetrics;

public readonly tracer: Tracer;

constructor(
protected config: AztecNodeConfig,
protected readonly p2pClient: P2P,
Expand All @@ -109,6 +110,7 @@ export class AztecNodeService implements AztecNode {
) {
this.packageVersion = getPackageInfo().version;
this.metrics = new NodeMetrics(telemetry, 'AztecNodeService');
this.tracer = telemetry.getTracer('AztecNodeService');

this.log.info(`Aztec Node started on chain 0x${l1ChainId.toString(16)}`, config.l1Contracts);
}
Expand Down Expand Up @@ -782,6 +784,9 @@ export class AztecNodeService implements AztecNode {
* Simulates the public part of a transaction with the current state.
* @param tx - The transaction to simulate.
**/
@trackSpan('AztecNodeService.simulatePublicCalls', (tx: Tx) => ({
[Attributes.TX_HASH]: tx.tryGetTxHash()?.toString(),
}))
public async simulatePublicCalls(tx: Tx): Promise<PublicSimulationOutput> {
const txHash = tx.getTxHash();
const blockNumber = (await this.blockSource.getBlockNumber()) + 1;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec.js/src/utils/anvil_test_watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AnvilTestWatcher {
const isAutoMining = await this.cheatcodes.isAutoMining();

if (isAutoMining) {
this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), 1000);
this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), this.logger, 1000);
this.filledRunningPromise.start();
this.logger.info(`Watcher started for rollup at ${this.rollup.address}`);
} else {
Expand Down
10 changes: 8 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { type BotConfig, BotRunner, botConfigMappings, getBotRunnerApiHandler }
import { type AztecNode, type PXE } from '@aztec/circuit-types';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
import { type TelemetryClient } from '@aztec/telemetry-client';
import {
createAndStartTelemetryClient,
getConfigEnvVars as getTelemetryClientConfig,
} from '@aztec/telemetry-client/start';

import { extractRelevantOptions } from '../util.js';

Expand All @@ -25,14 +30,15 @@ export async function startBot(
pxe = await addPXE(options, signalHandlers, services, userLog);
}

await addBot(options, signalHandlers, services, { pxe });
const telemetry = await createAndStartTelemetryClient(getTelemetryClientConfig());
await addBot(options, signalHandlers, services, { pxe, telemetry });
}

export function addBot(
options: any,
signalHandlers: (() => Promise<void>)[],
services: NamespacedApiHandlers,
deps: { pxe?: PXE; node?: AztecNode } = {},
deps: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient },
) {
const config = extractRelevantOptions<BotConfig>(options, botConfigMappings, 'bot');

Expand Down
6 changes: 3 additions & 3 deletions yarn-project/aztec/src/cli/cmds/start_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ export async function startNode(
}

const telemetryConfig = extractRelevantOptions<TelemetryClientConfig>(options, telemetryClientConfigMappings, 'tel');
const telemetryClient = await createAndStartTelemetryClient(telemetryConfig);
const telemetry = await createAndStartTelemetryClient(telemetryConfig);

// Create and start Aztec Node
const node = await createAztecNode(nodeConfig, telemetryClient);
const node = await createAztecNode(nodeConfig, telemetry);

// Add node and p2p to services list
services.node = [node, AztecNodeApiSchema];
Expand All @@ -110,6 +110,6 @@ export async function startNode(
// Add a txs bot if requested
if (options.bot) {
const { addBot } = await import('./start_bot.js');
await addBot(options, signalHandlers, services, { pxe, node });
await addBot(options, signalHandlers, services, { pxe, node, telemetry });
}
}
1 change: 1 addition & 0 deletions yarn-project/bot/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"@aztec/foundation": "workspace:^",
"@aztec/noir-contracts.js": "workspace:^",
"@aztec/protocol-contracts": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"source-map-support": "^0.5.21",
"tslib": "^2.4.0",
Expand Down
16 changes: 12 additions & 4 deletions yarn-project/bot/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type AztecNode, type PXE, createAztecNodeClient, createLogger } from '@aztec/aztec.js';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import { Bot } from './bot.js';
import { type BotConfig } from './config.js';
import { type BotRunnerApi } from './interface.js';

export class BotRunner implements BotRunnerApi {
export class BotRunner implements BotRunnerApi, Traceable {
private log = createLogger('bot');
private bot?: Promise<Bot>;
private pxe?: PXE;
Expand All @@ -14,13 +15,19 @@ export class BotRunner implements BotRunnerApi {
private consecutiveErrors = 0;
private healthy = true;

public constructor(private config: BotConfig, dependencies: { pxe?: PXE; node?: AztecNode }) {
public readonly tracer: Tracer;

public constructor(
private config: BotConfig,
dependencies: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient },
) {
this.tracer = dependencies.telemetry.getTracer('Bot');
this.pxe = dependencies.pxe;
if (!dependencies.node && !config.nodeUrl) {
throw new Error(`Missing node URL in config or dependencies`);
}
this.node = dependencies.node ?? createAztecNodeClient(config.nodeUrl!);
this.runningPromise = new RunningPromise(() => this.#work(), config.txIntervalSeconds * 1000);
this.runningPromise = new RunningPromise(() => this.#work(), this.log, config.txIntervalSeconds * 1000);
}

/** Initializes the bot if needed. Blocks until the bot setup is finished. */
Expand Down Expand Up @@ -126,6 +133,7 @@ export class BotRunner implements BotRunnerApi {
}
}

@trackSpan('Bot.work')
async #work() {
if (this.config.maxPendingTxs > 0) {
const pendingTxs = await this.node.getPendingTxs();
Expand All @@ -146,7 +154,7 @@ export class BotRunner implements BotRunnerApi {
}

if (!this.healthy && this.config.stopWhenUnhealthy) {
this.log.error(`Stopping bot due to errors`);
this.log.fatal(`Stopping bot due to errors`);
process.exit(1); // workaround docker not restarting the container if its unhealthy. We have to exit instead
}
}
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/bot/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
{
"path": "../protocol-contracts"
},
{
"path": "../telemetry-client"
},
{
"path": "../types"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class L2BlockStream {
startingBlock?: number;
} = {},
) {
this.runningPromise = new RunningPromise(() => this.work(), this.opts.pollIntervalMS ?? 1000);
this.runningPromise = new RunningPromise(() => this.work(), log, this.opts.pollIntervalMS ?? 1000);
}

public start() {
Expand Down
16 changes: 15 additions & 1 deletion yarn-project/foundation/src/promise/running-promise.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { type Logger, createLogger } from '../log/pino-logger.js';
import { sleep } from '../sleep/index.js';
import { RunningPromise } from './running-promise.js';

describe('RunningPromise', () => {
let runningPromise: RunningPromise;
let counter: number;
let fn: () => Promise<void>;
let logger: Logger;

beforeEach(() => {
counter = 0;
fn = async () => {
counter++;
await sleep(100);
};
runningPromise = new RunningPromise(fn, 50);
logger = createLogger('test');
runningPromise = new RunningPromise(fn, logger, 50);
});

afterEach(async () => {
Expand Down Expand Up @@ -40,5 +43,16 @@ describe('RunningPromise', () => {
await runningPromise.trigger();
expect(counter).toEqual(2);
});

it('handles errors', async () => {
const failingFn = async () => {
await fn();
throw new Error('ouch');
};
runningPromise = new RunningPromise(failingFn, logger, 50);
runningPromise.start();
await sleep(90);
expect(counter).toEqual(1);
});
});
});
13 changes: 11 additions & 2 deletions yarn-project/foundation/src/promise/running-promise.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createLogger } from '../log/pino-logger.js';
import { InterruptibleSleep } from '../sleep/index.js';
import { type PromiseWithResolvers, promiseWithResolvers } from './utils.js';

Expand All @@ -12,7 +13,11 @@ export class RunningPromise {
private interruptibleSleep = new InterruptibleSleep();
private requested: PromiseWithResolvers<void> | undefined = undefined;

constructor(private fn: () => void | Promise<void>, private pollingIntervalMS = 10000) {}
constructor(
private fn: () => void | Promise<void>,
private logger = createLogger('running-promise'),
private pollingIntervalMS = 10000,
) {}

/**
* Starts the running promise.
Expand All @@ -23,7 +28,11 @@ export class RunningPromise {
const poll = async () => {
while (this.running) {
const hasRequested = this.requested !== undefined;
await this.fn();
try {
await this.fn();
} catch (err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so no cases where we'd actually want the running promise to fail on error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is a failure is an uncaught rejected promise, which suck.

Copy link
Collaborator Author

@spalladino spalladino Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just re-checked the docs, and by default it'll turn into an uncaught exception, which I understand would kill the process: https://nodejs.org/api/cli.html#--unhandled-rejectionsmode

this.logger.error('Error in running promise', err);
}

// If an immediate run had been requested *before* the function started running, resolve the request.
if (hasRequested) {
Expand Down
13 changes: 10 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
type L2Block,
type L2BlockId,
type L2BlockSource,
L2BlockStream,
type L2BlockStreamEvent,
type L2Tips,
type P2PApi,
Expand All @@ -16,7 +15,13 @@ import {
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { createLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';
import {
Attributes,
type TelemetryClient,
TraceableL2BlockStream,
WithTracer,
trackSpan,
} from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { type ENR } from '@chainsafe/enr';
Expand Down Expand Up @@ -221,7 +226,9 @@ export class P2PClient extends WithTracer implements P2P {

this.keepAttestationsInPoolFor = keepAttestationsInPoolFor;

this.blockStream = new L2BlockStream(l2BlockSource, this, this, createLogger('p2p:block_stream'), {
const tracer = telemetry.getTracer('P2PL2BlockStream');
const logger = createLogger('p2p:l2-block-stream');
this.blockStream = new TraceableL2BlockStream(l2BlockSource, this, this, tracer, 'P2PL2BlockStream', logger, {
batchSize: blockRequestBatchSize,
pollIntervalMS: blockCheckIntervalMS,
});
Expand Down
Loading
Loading