Skip to content

Commit

Permalink
feat: speed up writing to DB
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Jan 11, 2023
1 parent acf3b5a commit b7de04d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 49 deletions.
3 changes: 1 addition & 2 deletions src/duty/duty.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ export class DutyService {
this.checkAll(epoch, stateSlot),
this.getPossibleHighRewardValidators(),
]);
await this.writeEpochMeta(epoch);
await this.writeSummary();
await Promise.all([this.writeEpochMeta(epoch), this.writeSummary()]);
await this.storage.updateEpochProcessing({ epoch, is_stored: true });
return possibleHighRewardVals;
}
Expand Down
52 changes: 26 additions & 26 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,33 @@ export class StateService {
this.logger.log('Processing all validators state');
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const pipeline = chain([
readStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
(data) => {
const state: StateValidatorResponse = data.value;
const index = BigInt(state.index);
const operator = keysIndexed.get(state.validator.pubkey);
this.summary.set(index, {
epoch,
val_id: index,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray()]);
const streamTask = async () =>
new Promise((resolve, reject) => {
pipeline.on('data', (data) => {
const state: StateValidatorResponse = data.value;
const index = BigInt(state.index);
const operator = keysIndexed.get(state.validator.pubkey);
this.summary.set(index, {
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance);
}
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance);
}
return { val_id: state.index, val_pubkey: state.validator.pubkey };
},
]);
await this.storage.writeIndexes(pipeline);
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
});
await streamTask().finally(() => pipeline.destroy());
// todo: change to bigint.sqrt
const baseReward = Math.trunc((64 * 10 ** 9) / Math.trunc(Math.sqrt(Number(activeValidatorsEffectiveBalance))));
this.summary.setMeta({
Expand Down
1 change: 1 addition & 0 deletions src/duty/summary/summary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface ValidatorDutySummary {
epoch: bigint;
///
val_id: bigint;
val_pubkey?: string;
val_nos_id?: number;
val_nos_name?: string;
val_slashed?: boolean;
Expand Down
39 changes: 18 additions & 21 deletions src/storage/clickhouse/clickhouse.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { Duplex } from 'stream';

import { ClickHouseClient, createClient } from '@clickhouse/client';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService, OnModuleInit } from '@nestjs/common';
Expand Down Expand Up @@ -124,29 +122,28 @@ export class ClickhouseService implements OnModuleInit {
return { max: 0n };
}

@TrackTask('write-indexes')
public async writeIndexes(pipeline: Duplex): Promise<void> {
await this.db
.insert({
table: 'validators_index',
values: pipeline,
format: 'JSONEachRow',
})
.finally(() => pipeline.destroy());
}

@TrackTask('write-summary')
public async writeSummary(summary: ValidatorDutySummary[]): Promise<void> {
while (summary.length > 0) {
const chunk = summary.splice(0, this.chunkSize);
await this.retry(
async () =>
await this.db.insert({
table: 'validators_summary',
values: chunk,
format: 'JSONEachRow',
}),
);
await Promise.all([
this.retry(
async () =>
await this.db.insert({
table: 'validators_index',
values: chunk.map((v) => ({ val_id: v.val_id, val_pubkey: v.val_pubkey })),
format: 'JSONEachRow',
}),
),
this.retry(
async () =>
await this.db.insert({
table: 'validators_summary',
values: chunk.map((v) => ({ ...v, val_pubkey: undefined })),
format: 'JSONEachRow',
}),
),
]);
}
}

Expand Down

0 comments on commit b7de04d

Please sign in to comment.