Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: double accounting #138

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/common/functions/allSettled.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// wait for all promises to resolve and throws if any error occurs
export async function allSettled(values: Promise<any>[]): Promise<any[]> {
const results = await Promise.allSettled(values);
const failed = results.filter((r: PromiseSettledResult<any>) => r.status == 'rejected');
if (failed.length > 0) {
throw new global.AggregateError(
failed.map((r: PromiseRejectedResult) => r.reason),
failed.flatMap((r: any) => Array.from(r.reason.message, r.reason.stack || '')).join('\n'),
);
}

return results.map((r: PromiseFulfilledResult<any>) => r.value);
}
3 changes: 2 additions & 1 deletion src/duty/attestation/attestation.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class AttestationMetrics {
this.logger.log('Calculating attestation metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([
await allSettled([
this.perfectAttestationsLastEpoch(),
this.missedAttestationsLastEpoch(),
this.highIncDelayAttestationsLastEpoch(),
Expand Down
8 changes: 7 additions & 1 deletion src/duty/attestation/attestation.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { unblock } from 'common/functions/unblock';
import { PrometheusService } from 'common/prometheus';

import { SummaryService } from '../summary';
Expand Down Expand Up @@ -42,6 +43,8 @@ export class AttestationRewards {
Math.trunc(perfect.source * epochMeta.state.base_reward * 32 * sourceParticipation) +
Math.trunc(perfect.target * epochMeta.state.base_reward * 32 * targetParticipation) +
Math.trunc(perfect.head * epochMeta.state.base_reward * 32 * headParticipation);
const maxBatchSize = 10000;
let index = 0;
for (const v of this.summary.epoch(epoch).values()) {
// Calculate attestation rewards from previous epoch
const pv = this.summary.epoch(epoch - 1).get(v.val_id);
Expand Down Expand Up @@ -88,7 +91,10 @@ export class AttestationRewards {
att_missed_reward,
att_penalty,
});
index++;
if (index % maxBatchSize == 0) {
await unblock();
}
}
return true;
}
}
5 changes: 3 additions & 2 deletions src/duty/attestation/attestation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { streamArray } from 'stream-json/streamers/StreamArray';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
Expand Down Expand Up @@ -67,7 +68,7 @@ export class AttestationService {

protected async processAttestation(epoch: Epoch, attestation: SlotAttestation, committee: number[]) {
const attestationFlags = { source: [], target: [], head: [] };
const [canonHead, canonTarget, canonSource] = await Promise.all([
const [canonHead, canonTarget, canonSource] = await allSettled([
this.getCanonSlotRoot(attestation.slot),
this.getCanonSlotRoot(attestation.target_epoch * this.slotsInEpoch),
this.getCanonSlotRoot(attestation.source_epoch * this.slotsInEpoch),
Expand Down Expand Up @@ -177,7 +178,7 @@ export class AttestationService {
}).finally(() => pipeline.destroy());
};

await Promise.all([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]);
await allSettled([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]);
return committees;
}
}
5 changes: 3 additions & 2 deletions src/duty/duty.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { ClickhouseService } from '../storage';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class DutyMetrics {
@TrackTask('calc-all-duties-metrics')
public async calculate(epoch: Epoch, possibleHighRewardValidators: string[]): Promise<any> {
this.logger.log('Calculating duties metrics of user validators');
await Promise.all([
await allSettled([
this.withPossibleHighReward(epoch, possibleHighRewardValidators),
this.stateMetrics.calculate(epoch),
this.withdrawalsMetrics.calculate(epoch),
Expand All @@ -45,7 +46,7 @@ export class DutyMetrics {
}

private async withPossibleHighReward(epoch: Epoch, possibleHighRewardValidators: string[]): Promise<void> {
await Promise.all([
await allSettled([
this.attestationMetrics.calculate(epoch, possibleHighRewardValidators),
this.proposeMetrics.calculate(epoch, possibleHighRewardValidators),
this.syncMetrics.calculate(epoch, possibleHighRewardValidators),
Expand Down
3 changes: 2 additions & 1 deletion src/duty/duty.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { AttestationRewards } from './attestation';
Expand All @@ -26,6 +27,6 @@ export class DutyRewards {
// todo: 'Slashed' case
// todo: 'Inactivity leak' case
this.logger.log('Calculate rewards for all duties');
await Promise.all([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]);
await allSettled([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]);
}
}
24 changes: 13 additions & 11 deletions src/duty/duty.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConfigService } from 'common/config';
import { BlockHeaderResponse, ConsensusProviderService } from 'common/eth-providers';
import { BlockCacheService } from 'common/eth-providers/consensus-provider/block-cache';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
Expand Down Expand Up @@ -41,18 +42,19 @@ export class DutyService {
) {}

public async checkAndWrite({ epoch, stateSlot }: { epoch: Epoch; stateSlot: Slot }): Promise<string[]> {
// Prefetch will be done before main checks because duty by state requests are heavy
// and while we wait for their responses we fetch blocks and headers.
// If for some reason prefetch task will be slower than duty by state requests,
// blocks and headers will be fetched inside tasks of checks
const [, , possibleHighRewardVals] = await Promise.all([
this.prefetch(epoch),
const [, , possibleHighRewardVals] = await allSettled([
// Prefetch will be done before main checks because duty by state requests are heavy
// and while we wait for their responses we fetch blocks and headers.
// If for some reason prefetch task will be slower than duty by state requests,
// blocks and headers will be fetched inside tasks of checks
// so, it can be optional when failed
this.prefetch(epoch).catch(() => undefined),
this.checkAll(epoch, stateSlot),
// optional task to get possible high reward validators for head epoch
// Optional task to get possible high reward validators for head epoch
// it's nice to have but not critical
this.getPossibleHighRewardValidators().catch(() => []),
]);
await Promise.all([this.writeEpochMeta(epoch), this.writeSummary(epoch)]);
await allSettled([this.writeEpochMeta(epoch), this.writeSummary(epoch)]);
this.summary.clear();
await this.storage.updateEpochProcessing({ epoch, is_stored: true });
return possibleHighRewardVals;
Expand All @@ -62,7 +64,7 @@ export class DutyService {
protected async checkAll(epoch: Epoch, stateSlot: Slot): Promise<any> {
this.summary.clear();
this.logger.log('Checking duties of validators');
await Promise.all([
await allSettled([
this.state.check(epoch, stateSlot),
this.attestation.check(epoch, stateSlot),
this.sync.check(epoch, stateSlot),
Expand All @@ -85,7 +87,7 @@ export class DutyService {
const toFetch = slots.map((s) => [this.clClient.getBlockHeader(s), this.clClient.getBlockInfo(s)]).flat();
while (toFetch.length > 0) {
const chunk = toFetch.splice(0, 32);
await Promise.all(chunk);
await allSettled(chunk);
}
}

Expand All @@ -95,7 +97,7 @@ export class DutyService {
const headEpoch = Math.trunc(actualSlotHeader.header.message.slot / this.config.get('FETCH_INTERVAL_SLOTS'));
this.logger.log('Getting possible high reward validator indexes');
const propDependentRoot = await this.clClient.getDutyDependentRoot(headEpoch);
const [sync, prop] = await Promise.all([
const [sync, prop] = await allSettled([
this.clClient.getSyncCommitteeInfo('finalized', headEpoch),
this.clClient.getCanonicalProposerDuties(headEpoch, propDependentRoot),
]);
Expand Down
3 changes: 2 additions & 1 deletion src/duty/propose/propose.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand All @@ -24,7 +25,7 @@ export class ProposeMetrics {
this.logger.log('Calculating propose metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]);
await allSettled([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]);
}

private async goodProposes() {
Expand Down
3 changes: 2 additions & 1 deletion src/duty/state/state.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { Owner, PrometheusService, PrometheusValStatus, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { LidoSourceService } from 'common/validators-registry/lido-source';
Expand All @@ -29,7 +30,7 @@ export class StateMetrics {
this.logger.log('Calculating state metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([
await allSettled([
this.operatorsIdentifies(),
this.nosStats(),
this.userValidatorsStats(),
Expand Down
45 changes: 25 additions & 20 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import { chain } from 'stream-chain';
import { parser } from 'stream-json';
import { pick } from 'stream-json/filters/Pick';
import { streamArray } from 'stream-json/streamers/StreamArray';
import { batch } from 'stream-json/utils/Batch';

import { ConfigService } from 'common/config';
import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { bigNumberSqrt } from 'common/functions/bigNumberSqrt';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';

import { bigNumberSqrt } from '../../common/functions/bigNumberSqrt';
import { SummaryService } from '../summary';

@Injectable()
Expand All @@ -37,27 +39,30 @@ export class StateService {
this.logger.log('Processing all validators state');
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray()]);
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 10000 })]);
await new Promise((resolve, reject) => {
pipeline.on('data', async (data) => {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
pipeline.on('data', async (batch) => {
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
}
}
await unblock();
});
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
Expand Down
3 changes: 2 additions & 1 deletion src/duty/summary/summary.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand Down Expand Up @@ -32,7 +33,7 @@ export class SummaryMetrics {
this.logger.log('Calculating propose metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([this.userRewards(), this.avgChainRewards(), this.common()]);
await allSettled([this.userRewards(), this.avgChainRewards(), this.common()]);
}

private async common() {
Expand Down
5 changes: 3 additions & 2 deletions src/duty/sync/sync.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand All @@ -28,7 +29,7 @@ export class SyncMetrics {
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();

await Promise.all([
await allSettled([
this.userAvgSyncPercent(),
this.otherAvgSyncPercent(),
this.operatorAvgSyncPercents(),
Expand All @@ -55,7 +56,7 @@ export class SyncMetrics {

private async syncParticipation(possibleHighRewardValidators: string[]) {
const chainAvgSyncPercent = await this.chainAvgSyncPercent();
await Promise.all([
await allSettled([
this.goodSyncParticipationLastEpoch(chainAvgSyncPercent),
this.badSyncParticipationLastEpoch(chainAvgSyncPercent),
this.badSyncParticipationLastNEpoch(chainAvgSyncPercent),
Expand Down
3 changes: 1 addition & 2 deletions src/duty/sync/sync.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class SyncRewards {
protected readonly summary: SummaryService,
) {}

public calculate(epoch: Epoch) {
public async calculate(epoch: Epoch) {
const epochMeta = this.summary.epoch(epoch).getMeta();
let sync_earned_reward = 0;
let sync_missed_reward = 0;
Expand All @@ -30,6 +30,5 @@ export class SyncRewards {

this.summary.epoch(epoch).set({ epoch, val_id: v.val_id, sync_earned_reward, sync_penalty, sync_missed_reward });
}
return true;
}
}
3 changes: 2 additions & 1 deletion src/duty/withdrawal/withdrawals.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand All @@ -29,7 +30,7 @@ export class WithdrawalsMetrics {
this.logger.log('Calculating withdrawals metrics');
this.processedEpoch = epoch;
this.operators = this.registryService.getOperators();
await Promise.all([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]);
await allSettled([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]);
}

private async userNodeOperatorsWithdrawalsStats() {
Expand Down
5 changes: 3 additions & 2 deletions src/duty/withdrawal/withdrawals.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { BlockInfoResponse, ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';

import { range } from '../../common/functions/range';
import { SummaryService } from '../summary';

@Injectable()
Expand All @@ -30,7 +31,7 @@ export class WithdrawalsService {
const firstSlotInEpoch = epoch * slotsInEpoch;
const slots: number[] = range(firstSlotInEpoch, firstSlotInEpoch + slotsInEpoch);
const toFetch = slots.map((s) => this.clClient.getBlockInfo(s));
const blocks = (await Promise.all(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[];
const blocks = (await allSettled(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[];
for (const block of blocks) {
const withdrawals = block.message.body.execution_payload.withdrawals ?? [];
for (const withdrawal of withdrawals) {
Expand Down
Loading