Skip to content

Commit

Permalink
fix: fix get committees streams (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko authored Mar 26, 2023
1 parent 5b560d1 commit c6c7609
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,20 @@ export class ConsensusProviderService {
const readStream = got.stream.get(urljoin(apiURL, subUrl), {
timeout: { ...REQUEST_TIMEOUT_POLICY_MS, response: this.config.get('CL_API_GET_RESPONSE_TIMEOUT') },
});
const promisedStream = async () =>
new Promise((resolve, reject) => {
readStream.on('response', (r: Response) => {
if (r.statusCode != 200) reject(new HTTPError(r));
resolve(readStream);
});
readStream.on('error', (e) => reject(e));
})
.then((r: Request) => r)
.catch((e) => {
if (e instanceof HTTPError) {
throw new ResponseError(errRequest(<string>e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
return await promisedStream();

return new Promise((resolve, reject) => {
readStream.on('response', (r: Response) => {
if (r.statusCode != 200) reject(new HTTPError(r));
resolve(readStream);
});
readStream.on('error', (e) => reject(e));
})
.then((r: Request) => r)
.catch((e) => {
if (e instanceof HTTPError) {
throw new ResponseError(errRequest(<string>e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
}
}
58 changes: 40 additions & 18 deletions src/duty/attestation/attestation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,30 +165,52 @@ export class AttestationService {
@TrackTask('get-attestation-committees')
protected async getAttestationCommittees(stateSlot: Slot): Promise<Map<string, number[]>> {
const committees = new Map<string, number[]>();
const processCommittees = async (epoch: Epoch) => {
const stream = await this.clClient.getAttestationCommitteesInfo(stateSlot, epoch);
const pipeline = chain([stream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 5 })]);
pipeline.on('data', async (batch) => {
for (const data of batch) {
const committee: AttestationCommitteeInfo = data.value;
// validator doesn't attests by default
committee.validators.forEach((index) =>
this.summary.epoch(epoch).set({ epoch: epoch, val_id: Number(index), att_happened: false }),
);
committees.set(
`${committee.index}_${committee.slot}`,
committee.validators.map((v) => Number(v)),
);
}
await unblock();
});
const [prevStream, currStream] = await allSettled([
this.clClient.getAttestationCommitteesInfo(stateSlot, this.processedEpoch - 1),
this.clClient.getAttestationCommitteesInfo(stateSlot, this.processedEpoch),
]);
const prevPipeline = chain([
prevStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 5 }),
(batch) => processBatch(this.processedEpoch - 1, batch),
]);
const currPipeline = chain([
currStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 5 }),
(batch) => processBatch(this.processedEpoch, batch),
]);

const processBatch = (epoch: Epoch, batch) => {
for (const data of batch) {
const committee: AttestationCommitteeInfo = data.value;
// validator doesn't attests by default
committee.validators.forEach((index) =>
this.summary.epoch(epoch).set({ epoch: epoch, val_id: Number(index), att_happened: false }),
);
committees.set(
`${committee.index}_${committee.slot}`,
committee.validators.map((v) => Number(v)),
);
}
return batch;
};

const pipelineFinish = async (pipeline) => {
pipeline.on('data', (batch) => batch);
return new Promise((resolve, reject) => {
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
}).finally(() => pipeline.destroy());
};

await allSettled([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]);
await allSettled([pipelineFinish(prevPipeline), pipelineFinish(currPipeline)]);

return committees;
}
}
4 changes: 1 addition & 3 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { ConfigService } from 'common/config';
import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { bigNumberSqrt } from 'common/functions/bigNumberSqrt';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand Down Expand Up @@ -40,7 +39,7 @@ export class StateService {
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 1000 })]);
pipeline.on('data', async (batch) => {
pipeline.on('data', (batch) => {
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
Expand All @@ -61,7 +60,6 @@ export class StateService {
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
}
}
await unblock();
});
await new Promise((resolve, reject) => {
pipeline.on('error', (error) => reject(error));
Expand Down

0 comments on commit c6c7609

Please sign in to comment.