Skip to content

Commit

Permalink
Merge 0a5c670 into f62bc13
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkeil authored Feb 27, 2024
2 parents f62bc13 + 0a5c670 commit b5dacd0
Show file tree
Hide file tree
Showing 24 changed files with 470 additions and 226 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
},
"dependencies": {
"@chainsafe/as-sha256": "^0.4.1",
"@chainsafe/bls": "7.1.3",
"@chainsafe/bls": "ChainSafe/bls#e8bbc1a2dd803294c24a4db2ed2c9e05b714b92d",
"@chainsafe/blst": "^0.2.9",
"@chainsafe/discv5": "^9.0.0",
"@chainsafe/enr": "^3.0.0",
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/bls/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export type {IBlsVerifier} from "./interface.js";
export type {BlsMultiThreadWorkerPoolModules, JobQueueItemType} from "./multithread/index.js";
export {BlsPoolType} from "./multithread/types.js";
export {BlsMultiThreadWorkerPool} from "./multithread/index.js";
export {BlsSingleThreadVerifier} from "./singleThread.js";
52 changes: 31 additions & 21 deletions packages/beacon-node/src/chain/bls/maybeBatch.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
import {CoordType, PublicKey} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {WorkRequestSet} from "./multithread/types.js";
import {deserializeSet} from "./multithread/utils.js";

const MIN_SET_COUNT_TO_BATCH = 2;

export type SignatureSetDeserialized = {
publicKey: PublicKey;
message: Uint8Array;
signature: Uint8Array;
};

/**
* Verify signatures sets with batch verification or regular core verify depending on the set count.
* Abstracted in a separate file to be consumed by the threaded pool and the main thread implementation.
*/
export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[]): boolean {
export function verifySignatureSetsMaybeBatch(sets: WorkRequestSet[]): boolean {
try {
const deserialized = sets.map(deserializeSet);
if (sets.length >= MIN_SET_COUNT_TO_BATCH) {
return bls.Signature.verifyMultipleSignatures(
sets.map((s) => ({
publicKey: s.publicKey,
message: s.message,
// true = validate signature
signature: bls.Signature.fromBytes(s.signature, CoordType.affine, true),
}))
);
return bls.Signature.verifyMultipleSignatures(deserialized);
}

// .every on an empty array returns true
Expand All @@ -32,11 +21,32 @@ export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[]):
}

// If too few signature sets verify them without batching
return sets.every((set) => {
// true = validate signature
const sig = bls.Signature.fromBytes(set.signature, CoordType.affine, true);
return sig.verify(set.publicKey, set.message);
});
return deserialized.every(({message, publicKey, signature}) => signature.verify(publicKey, message));
} catch (_) {
// A signature could be malformed, in that case fromBytes throws error
// blst-ts `verifyMultipleSignatures` is also a fallible operation if mul_n_aggregate fails
// see https://github.com/ChainSafe/blst-ts/blob/b1ba6333f664b08e5c50b2b0d18c4f079203962b/src/lib.ts#L291
return false;
}
}

export async function asyncVerifySignatureSetsMaybeBatch(sets: WorkRequestSet[]): Promise<boolean> {
try {
const deserialized = sets.map(deserializeSet);
if (sets.length >= MIN_SET_COUNT_TO_BATCH) {
return await bls.asyncVerifyMultipleSignatures(deserialized);
}

// .every on an empty array returns true
if (sets.length === 0) {
throw Error("Empty signature sets");
}

const promises = await Promise.all(
deserialized.map(({message, publicKey, signature}) => bls.asyncVerify(message, publicKey, signature))
);
// If too few signature sets verify them without batching
return promises.every((isValid) => isValid);
} catch (_) {
// A signature could be malformed, in that case fromBytes throws error
// blst-ts `verifyMultipleSignatures` is also a fallible operation if mul_n_aggregate fails
Expand Down
158 changes: 106 additions & 52 deletions packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {IBlsVerifier, VerifySignatureOpts} from "../interface.js";
import {getAggregatedPubkey, getAggregatedPubkeysCount} from "../utils.js";
import {verifySignatureSetsMaybeBatch} from "../maybeBatch.js";
import {LinkedList} from "../../../util/array.js";
import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode, WorkResultError} from "./types.js";
import {chunkifyMaximizeChunkSize} from "./utils.js";
import {BlsPoolType, BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode} from "./types.js";
import {chunkifyMaximizeChunkSize, getJobResultError} from "./utils.js";
import {defaultPoolSize} from "./poolSize.js";
import {
JobQueueItem,
Expand All @@ -28,6 +28,11 @@ import {
jobItemSigSets,
jobItemWorkReq,
} from "./jobItem.js";
import {asyncVerifyManySignatureSets} from "./verifyManySignatureSets.js";

// defaultPoolSize should return core count - 1 to keep main thread on core. We
// also have network thread that should not be choked so we subtract 1 more
const IDEAL_POOL_SIZE = Math.max(defaultPoolSize - 1, 1);

// Worker constructor consider the path relative to the current working directory
const workerDir = process.env.NODE_ENV === "test" ? "../../../../lib/chain/bls/multithread" : "./";
Expand All @@ -38,6 +43,7 @@ export type BlsMultiThreadWorkerPoolModules = {
};

export type BlsMultiThreadWorkerPoolOptions = {
blsPoolType?: BlsPoolType;
blsVerifyAllMultiThread?: boolean;
};

Expand Down Expand Up @@ -103,6 +109,8 @@ type WorkerDescriptor = {
status: WorkerStatus;
};

type WorkRequestHandler = (workReqs: BlsWorkReq[]) => Promise<BlsWorkResult>;

/**
* Wraps "threads" library thread pool queue system with the goals:
* - Complete total outstanding jobs in total minimum time possible.
Expand All @@ -115,8 +123,11 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
private readonly logger: Logger;
private readonly metrics: Metrics | null;

private readonly blsPoolType: BlsPoolType;
private readonly blsPoolSize: number;

private readonly format: PointFormat;
private readonly workers: WorkerDescriptor[];
private readonly workers: WorkerDescriptor[] = [];
private readonly jobs = new LinkedList<JobQueueItem>();
private bufferedJobs: {
jobs: LinkedList<JobQueueItem>;
Expand All @@ -137,12 +148,29 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

// TODO: Allow to customize implementation
const implementation = bls.implementation;

// Use compressed for herumi for now.
// THe worker is not able to deserialize from uncompressed
// `Error: err _wrapDeserialize`
if (implementation === "herumi" && options.blsPoolType === BlsPoolType.libuv) {
this.logger.info("Herumi BLS implementation selected. Using Worker pool instead of libuv pool.");
options.blsPoolType = BlsPoolType.workers;
}
this.blsPoolType = options.blsPoolType ?? BlsPoolType.libuv;
this.format = implementation === "blst-native" ? PointFormat.uncompressed : PointFormat.compressed;
this.workers = this.createWorkers(implementation, blsPoolSize);

this.logger.info(`Starting BLS with blsPoolType: ${this.blsPoolType}`);
if (this.blsPoolType === BlsPoolType.workers) {
this.blsPoolSize = IDEAL_POOL_SIZE;
// Use compressed for herumi for now.
// THe worker is not able to deserialize from uncompressed
// `Error: err _wrapDeserialize`
this.workers = this.createWorkers(implementation, this.blsPoolSize);
this.logger.info(`BLS Worker pool size: ${this.workers.length}`);
} else {
const uvThreadPoolSize = Number(process.env.UV_THREADPOOL_SIZE);
// default libuv pool size is 4. no way to set or get it programmatically
// after startup. By this point it should have already been started so do
// not want to attempt to override ENV as it will likely be ignored.
this.blsPoolSize = isNaN(uvThreadPoolSize) ? 4 : uvThreadPoolSize;
this.logger.info(`BLS libuv pool size: ${this.blsPoolSize}`);
}

if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => {
Expand Down Expand Up @@ -176,8 +204,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
try {
return verifySignatureSetsMaybeBatch(
sets.map((set) => ({
publicKey: getAggregatedPubkey(set),
message: set.signingRoot.valueOf(),
publicKey: getAggregatedPubkey(set),
signature: set.signature,
}))
);
Expand Down Expand Up @@ -252,16 +280,18 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}
this.jobs.clear();

// Terminate all workers. await to ensure no workers are left hanging
await Promise.all(
Array.from(this.workers.entries()).map(([id, worker]) =>
// NOTE: 'threads' has not yet updated types, and NodeJS complains with
// [DEP0132] DeprecationWarning: Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.
(worker.worker.terminate() as unknown as Promise<void>).catch((e: Error) => {
this.logger.error("Error terminating worker", {id}, e);
})
)
);
if (this.blsPoolType === BlsPoolType.workers) {
// Terminate all workers. await to ensure no workers are left hanging
await Promise.all(
Array.from(this.workers.entries()).map(([id, worker]) =>
// NOTE: 'threads' has not yet updated types, and NodeJS complains with
// [DEP0132] DeprecationWarning: Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.
(worker.worker.terminate() as unknown as Promise<void>).catch((e: Error) => {
this.logger.error("Error terminating worker", {id}, e);
})
)
);
}
}

private createWorkers(implementation: Implementation, poolSize: number): WorkerDescriptor[] {
Expand Down Expand Up @@ -313,13 +343,14 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// TODO: Consider if limiting queue size is necessary here.
// It would be bad to reject signatures because the node is slow.
// However, if the worker communication broke jobs won't ever finish

if (
this.workers.length > 0 &&
this.workers[0].status.code === WorkerStatusCode.initializationError &&
this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError)
) {
return job.reject(this.workers[0].status.error);
if (this.blsPoolType === BlsPoolType.workers) {
if (
this.workers.length > 0 &&
this.workers[0].status.code === WorkerStatusCode.initializationError &&
this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError)
) {
return job.reject(this.workers[0].status.error);
}
}

// Append batchable sets to `bufferedJobs`, starting a timeout to push them into `jobs`.
Expand All @@ -334,8 +365,9 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
timeout: setTimeout(this.runBufferedJobs, MAX_BUFFER_WAIT_MS),
};
}
const jobs = job.opts.priority ? this.bufferedJobs.prioritizedJobs : this.bufferedJobs.jobs;
jobs.push(job);

job.opts.priority ? this.bufferedJobs.prioritizedJobs.push(job) : this.bufferedJobs.jobs.push(job);

this.bufferedJobs.sigCount += jobItemSigSets(job);
if (this.bufferedJobs.sigCount > MAX_BUFFERED_SIGS) {
clearTimeout(this.bufferedJobs.timeout);
Expand All @@ -357,25 +389,42 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}

/**
* Potentially submit jobs to an idle worker, only if there's a worker and jobs
* Prepare work and send to either workerpool or to libuv threadpool
*/
private runJob = async (): Promise<void> => {
if (this.closed) {
return;
}
// Prepare work package
const prepared = this.prepareWork();
if (prepared.length === 0) {
return;
}

if (this.blsPoolType === BlsPoolType.workers) {
await this.runJobWorkerPool(prepared);
} else {
await this.runJobLibuv(prepared);
}

// Potentially run a new job
setTimeout(this.runJob, 0);
};

private runJobLibuv = async (jobs: JobQueueItem[]): Promise<void> => {
await this._runJob(jobs, asyncVerifyManySignatureSets);
};

/**
* Potentially submit jobs to an idle worker, only if there's a worker and jobs
*/
private runJobWorkerPool = async (jobs: JobQueueItem[]): Promise<void> => {
// Find idle worker
const worker = this.workers.find((worker) => worker.status.code === WorkerStatusCode.idle);
if (!worker || worker.status.code !== WorkerStatusCode.idle) {
return;
}

// Prepare work package
const jobsInput = this.prepareWork();
if (jobsInput.length === 0) {
return;
}

// TODO: After sending the work to the worker the main thread can drop the job arguments
// and free-up memory, only needs to keep the job's Promise handlers.
// Maybe it's not useful since all data referenced in jobs is likely referenced by others
Expand All @@ -384,6 +433,16 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
worker.status = {code: WorkerStatusCode.running, workerApi};
this.workersBusy++;

await this._runJob(jobs, workerApi.verifyManySignatureSets);

worker.status = {code: WorkerStatusCode.idle, workerApi};
this.workersBusy--;
};

/**
* Potentially submit jobs to an idle worker, only if there's a worker and jobs
*/
private _runJob = async (jobsInput: JobQueueItem[], runWorkRequests: WorkRequestHandler): Promise<void> => {
try {
let startedJobsDefault = 0;
let startedJobsSameMessage = 0;
Expand All @@ -398,8 +457,11 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
let workReq: BlsWorkReq;
try {
// Note: This can throw, must be handled per-job.
// Pubkey and signature aggregation is defered here
workReq = jobItemWorkReq(job, this.format, this.metrics);
// Pubkey and signature aggregation is deferred here
//
// Note: Will ensure pubkeys and sigs are serialized for workers (to cross worker
// boundaries) and remain deserialized for libuv for performance reasons
workReq = jobItemWorkReq(this.blsPoolType === BlsPoolType.libuv, job, this.format, this.metrics);
} catch (e) {
this.metrics?.blsThreadPool.errorAggregateSignatureSetsCount.inc({type: job.type});

Expand Down Expand Up @@ -441,7 +503,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// Only downside is the the job promise may be resolved twice, but that's not an issue

const [jobStartSec, jobStartNs] = process.hrtime();
const workResult = await workerApi.verifyManySignatureSets(workReqs);
const workResult = await runWorkRequests(workReqs);
const [jobEndSec, jobEndNs] = process.hrtime();
const {workerId, batchRetries, batchSigsSuccess, workerStartTime, workerEndTime, results} = workResult;

Expand Down Expand Up @@ -492,8 +554,10 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
const latencyToWorkerSec = workerStartSec - jobStartSec + (workerStartNs - jobStartNs) / 1e9;
const latencyFromWorkerSec = jobEndSec - workerEndSec + Number(jobEndNs - workerEndNs) / 1e9;

if (workerId) {
this.metrics?.blsThreadPool.jobsWorkerTime.inc({workerId}, workerJobTimeSec);
}
this.metrics?.blsThreadPool.timePerSigSet.observe(workerJobTimeSec / startedSigSets);
this.metrics?.blsThreadPool.jobsWorkerTime.inc({workerId}, workerJobTimeSec);
this.metrics?.blsThreadPool.latencyToWorker.observe(latencyToWorkerSec);
this.metrics?.blsThreadPool.latencyFromWorker.observe(latencyFromWorkerSec);
this.metrics?.blsThreadPool.successJobsSignatureSetsCount.inc(successCount);
Expand All @@ -510,12 +574,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
job.reject(e as Error);
}
}

worker.status = {code: WorkerStatusCode.idle, workerApi};
this.workersBusy--;

// Potentially run a new job
setTimeout(this.runJob, 0);
};

/**
Expand All @@ -528,6 +586,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
while (totalSigs < MAX_SIGNATURE_SETS_PER_JOB) {
const job = this.jobs.shift();
if (!job) {
// TODO: (matthewkeil) should this pull from buffer.prioritizedJobs and
// then buffer.jobs until full run?
break;
}

Expand Down Expand Up @@ -556,7 +616,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

private retryJobItemSameMessage(job: JobQueueItemSameMessage): void {
// Create new jobs for each pubkey set, and Promise.all all the results
for (const j of jobItemSameMessageToMultiSet(job)) {
for (const j of jobItemSameMessageToMultiSet(this.blsPoolType === BlsPoolType.libuv, job)) {
if (j.opts.priority) {
this.jobs.unshift(j);
} else {
Expand All @@ -578,9 +638,3 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
);
}
}

function getJobResultError(jobResult: WorkResultError | null, i: number): Error {
const workerError = jobResult ? Error(jobResult.error.message) : Error(`No jobResult for index ${i}`);
if (jobResult?.error?.stack) workerError.stack = jobResult.error.stack;
return workerError;
}
Loading

0 comments on commit b5dacd0

Please sign in to comment.