Skip to content

Commit 8b3fef2

Browse files
dadepog11tech
andauthored
Added retry mechanism in executionEngine for executePayload (#3854)
* Add retry and metrics to the execution engine * revert retry counter change * cleanup separate exec metrics * fix the test * retry fixes * fix the rpc fn Co-authored-by: harkamal <gajinder@g11.in>
1 parent c58690e commit 8b3fef2

File tree

11 files changed

+472
-31
lines changed

11 files changed

+472
-31
lines changed

packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts

+42-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Note: isomorphic-fetch is not well mantained and does not support abort signals
33
import fetch from "cross-fetch";
44

5-
import {ErrorAborted, TimeoutError} from "@lodestar/utils";
5+
import {ErrorAborted, TimeoutError, retry} from "@lodestar/utils";
66
import {IGauge, IHistogram} from "../../metrics/interface.js";
77
import {IJson, IRpcPayload} from "../interface.js";
88
import {encodeJwtToken} from "./jwt.js";
@@ -29,6 +29,10 @@ export type ReqOpts = {
2929
timeout?: number;
3030
// To label request metrics
3131
routeId?: string;
32+
// retry opts
33+
retryAttempts?: number;
34+
retryDelay?: number;
35+
shouldRetry?: (lastError: Error) => boolean;
3236
};
3337

3438
export type JsonRpcHttpClientMetrics = {
@@ -37,10 +41,12 @@ export type JsonRpcHttpClientMetrics = {
3741
requestUsedFallbackUrl: IGauge;
3842
activeRequests: IGauge;
3943
configUrlsCount: IGauge;
44+
retryCount: IGauge;
4045
};
4146

4247
export interface IJsonRpcHttpClient {
4348
fetch<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
49+
fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
4450
fetchBatch<R>(rpcPayloadArr: IRpcPayload[], opts?: ReqOpts): Promise<R[]>;
4551
}
4652

@@ -64,11 +70,20 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
6470
/** If returns true, do not fallback to other urls and throw early */
6571
shouldNotFallback?: (error: Error) => boolean;
6672
/**
67-
* If provided, the requests to the RPC server will be bundled with a HS256 encoded
68-
* token using this secret. Otherwise the requests to the RPC server will be unauthorized
73+
* Optional: If provided, use this jwt secret to HS256 encode and add a jwt token in the
74+
* request header which can be authenticated by the RPC server to provide access.
75+
* A fresh token is generated on each requests as EL spec mandates the ELs to check
76+
* the token freshness +-5 seconds (via `iat` property of the token claim)
77+
*
78+
* Otherwise the requests to the RPC server will be unauthorized
6979
* and it might deny responses to the RPC requests.
7080
*/
7181
jwtSecret?: Uint8Array;
82+
/** Retry attempts */
83+
retryAttempts?: number;
84+
/** Retry delay, only relevant with retry attempts */
85+
retryDelay?: number;
86+
/** Metrics for retry, could be expanded later */
7287
metrics?: JsonRpcHttpClientMetrics | null;
7388
}
7489
) {
@@ -85,13 +100,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
85100
this.jwtSecret = opts?.jwtSecret;
86101
this.metrics = opts?.metrics ?? null;
87102

88-
// Set config metric gauges once
89-
90-
const metrics = this.metrics;
91-
if (metrics) {
92-
metrics.configUrlsCount.set(urls.length);
93-
metrics.activeRequests.addCollect(() => metrics.activeRequests.set(this.activeRequests));
94-
}
103+
this.metrics?.configUrlsCount.set(urls.length);
104+
this.metrics?.activeRequests.addCollect(() => this.metrics?.activeRequests.set(this.activeRequests));
95105
}
96106

97107
/**
@@ -102,6 +112,28 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
102112
return parseRpcResponse(res, payload);
103113
}
104114

115+
/**
116+
* Perform RPC request with retry
117+
*/
118+
async fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R> {
119+
const routeId = opts?.routeId ?? "unknown";
120+
const res = await retry<IRpcResponse<R>>(
121+
async (attempt) => {
122+
/** If this is a retry, increment the retry counter for this method */
123+
if (attempt > 1) {
124+
this.opts?.metrics?.retryCount.inc({routeId});
125+
}
126+
return await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
127+
},
128+
{
129+
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
130+
retryDelay: opts?.retryDelay ?? this.opts?.retryAttempts ?? 0,
131+
shouldRetry: opts?.shouldRetry,
132+
}
133+
);
134+
return parseRpcResponse(res, payload);
135+
}
136+
105137
/**
106138
* Perform RPC batched request
107139
* Type-wise assumes all requests results have the same type

packages/beacon-node/src/execution/engine/http.ts

+36-14
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,15 @@ import {
2525
} from "./interface.js";
2626
import {PayloadIdCache} from "./payloadIdCache.js";
2727

28+
export type ExecutionEngineModules = {
29+
signal: AbortSignal;
30+
metrics?: IMetrics | null;
31+
};
32+
2833
export type ExecutionEngineHttpOpts = {
2934
urls: string[];
35+
retryAttempts: number;
36+
retryDelay: number;
3037
timeout?: number;
3138
/**
3239
* 256 bit jwt secret in hex format without the leading 0x. If provided, the execution engine
@@ -44,6 +51,8 @@ export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
4451
* port/url, one can override this and skip providing a jwt secret.
4552
*/
4653
urls: ["http://localhost:8551"],
54+
retryAttempts: 3,
55+
retryDelay: 2000,
4756
timeout: 12000,
4857
};
4958

@@ -65,12 +74,12 @@ export class ExecutionEngineHttp implements IExecutionEngine {
6574
readonly payloadIdCache = new PayloadIdCache();
6675
private readonly rpc: IJsonRpcHttpClient;
6776

68-
constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, metrics?: IMetrics | null) {
77+
constructor(opts: ExecutionEngineHttpOpts, {metrics, signal}: ExecutionEngineModules) {
6978
this.rpc = new JsonRpcHttpClient(opts.urls, {
79+
...opts,
7080
signal,
71-
timeout: opts.timeout,
72-
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
7381
metrics: metrics?.executionEnginerHttpClient,
82+
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
7483
});
7584
}
7685

@@ -103,12 +112,15 @@ export class ExecutionEngineHttp implements IExecutionEngine {
103112
const method = "engine_newPayloadV1";
104113
const serializedExecutionPayload = serializeExecutionPayload(executionPayload);
105114
const {status, latestValidHash, validationError} = await this.rpc
106-
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
107-
{method, params: [serializedExecutionPayload]},
115+
.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
116+
{
117+
method,
118+
params: [serializedExecutionPayload],
119+
},
108120
notifyNewPayloadOpts
109121
)
110122
// If there are errors by EL like connection refused, internal error, they need to be
111-
// treated seperate from being INVALID. For now, just pass the error upstream.
123+
// treated separate from being INVALID. For now, just pass the error upstream.
112124
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
113125
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
114126
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
@@ -201,14 +213,19 @@ export class ExecutionEngineHttp implements IExecutionEngine {
201213
}
202214
: undefined;
203215

204-
// TODO: propogate latestValidHash to the forkchoice, for now ignore it as we
205-
// currently do not propogate the validation status up the forkchoice
216+
// If we are just fcUing and not asking execution for payload, retry is not required
217+
// and we can move on, as the next fcU will be issued soon on the new slot
218+
const fcUReqOpts =
219+
payloadAttributes !== undefined ? forkchoiceUpdatedV1Opts : {...forkchoiceUpdatedV1Opts, retryAttempts: 1};
206220
const {
207221
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
208222
payloadId,
209-
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
210-
{method, params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes]},
211-
forkchoiceUpdatedV1Opts
223+
} = await this.rpc.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
224+
{
225+
method,
226+
params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes],
227+
},
228+
fcUReqOpts
212229
);
213230

214231
switch (status) {
@@ -253,11 +270,16 @@ export class ExecutionEngineHttp implements IExecutionEngine {
253270
*/
254271
async getPayload(payloadId: PayloadId): Promise<bellatrix.ExecutionPayload> {
255272
const method = "engine_getPayloadV1";
256-
const executionPayloadRpc = await this.rpc.fetch<
273+
const executionPayloadRpc = await this.rpc.fetchWithRetries<
257274
EngineApiRpcReturnTypes[typeof method],
258275
EngineApiRpcParamTypes[typeof method]
259-
>({method, params: [payloadId]}, getPayloadOpts);
260-
276+
>(
277+
{
278+
method,
279+
params: [payloadId],
280+
},
281+
getPayloadOpts
282+
);
261283
return parseExecutionPayload(executionPayloadRpc);
262284
}
263285

packages/beacon-node/src/execution/engine/index.ts

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import {IExecutionEngine} from "./interface.js";
22
import {ExecutionEngineDisabled} from "./disabled.js";
3-
import {ExecutionEngineHttp, ExecutionEngineHttpOpts, defaultExecutionEngineHttpOpts} from "./http.js";
3+
import {
4+
ExecutionEngineHttp,
5+
ExecutionEngineModules,
6+
ExecutionEngineHttpOpts,
7+
defaultExecutionEngineHttpOpts,
8+
} from "./http.js";
49
import {ExecutionEngineMock, ExecutionEngineMockOpts} from "./mock.js";
510

611
export {
@@ -15,17 +20,19 @@ export type ExecutionEngineOpts =
1520
| ({mode?: "http"} & ExecutionEngineHttpOpts)
1621
| ({mode: "mock"} & ExecutionEngineMockOpts)
1722
| {mode: "disabled"};
18-
1923
export const defaultExecutionEngineOpts: ExecutionEngineOpts = defaultExecutionEngineHttpOpts;
2024

21-
export function initializeExecutionEngine(opts: ExecutionEngineOpts, signal: AbortSignal): IExecutionEngine {
25+
export function initializeExecutionEngine(
26+
opts: ExecutionEngineOpts,
27+
modules: ExecutionEngineModules
28+
): IExecutionEngine {
2229
switch (opts.mode) {
2330
case "mock":
2431
return new ExecutionEngineMock(opts);
2532
case "disabled":
2633
return new ExecutionEngineDisabled();
2734
case "http":
2835
default:
29-
return new ExecutionEngineHttp(opts, signal);
36+
return new ExecutionEngineHttp(opts, modules);
3037
}
3138
}

packages/beacon-node/src/metrics/metrics/lodestar.ts

+10
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,11 @@ export function createLodestarMetrics(
10671067
help: "eth1 JsonHttpClient - total count of request errors",
10681068
labelNames: ["routeId"],
10691069
}),
1070+
retryCount: register.gauge<"routeId">({
1071+
name: "lodestar_eth1_http_client_request_retries_total",
1072+
help: "eth1 JsonHttpClient - total count of request retries",
1073+
labelNames: ["routeId"],
1074+
}),
10701075
requestUsedFallbackUrl: register.gauge({
10711076
name: "lodestar_eth1_http_client_request_used_fallback_url_total",
10721077
help: "eth1 JsonHttpClient - total count of requests on fallback url(s)",
@@ -1094,6 +1099,11 @@ export function createLodestarMetrics(
10941099
help: "ExecutionEngineHttp client - total count of request errors",
10951100
labelNames: ["routeId"],
10961101
}),
1102+
retryCount: register.gauge<"routeId">({
1103+
name: "lodestar_execution_engine_http_client_request_retries_total",
1104+
help: "ExecutionEngineHttp client - total count of request retries",
1105+
labelNames: ["routeId"],
1106+
}),
10971107
requestUsedFallbackUrl: register.gauge({
10981108
name: "lodestar_execution_engine_http_client_request_used_fallback_url_total",
10991109
help: "ExecutionEngineHttp client - total count of requests on fallback url(s)",

packages/beacon-node/src/node/nodejs.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ export class BeaconNode {
142142
{config, db, metrics, logger: logger.child(opts.logger.eth1), signal},
143143
anchorState
144144
),
145-
executionEngine: initializeExecutionEngine(opts.executionEngine, signal),
145+
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
146146
executionBuilder: opts.executionBuilder.enabled
147147
? initializeExecutionBuilder(opts.executionBuilder, config)
148148
: undefined,

0 commit comments

Comments
 (0)