Skip to content

Commit

Permalink
fix: attestation pool for electra (#6744)
Browse files Browse the repository at this point in the history
* feat: attestationPool to group by slot by data root by committee index for electra

* fix: gossip validation and assert.notNull() util

* fix: remove light-client stats.html

* fix: lint and check-types
  • Loading branch information
twoeths authored and g11tech committed Jun 25, 2024
1 parent b415167 commit 60fc855
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 129 deletions.
16 changes: 10 additions & 6 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ export type Endpoints = {
/** HashTreeRoot of AttestationData that validator want's aggregated */
attestationDataRoot: Root;
slot: Slot;
index: number;
committeeIndex: number;
},
{query: {attestation_data_root: string; slot: number; index: number}},
{query: {attestation_data_root: string; slot: number; committeeIndex: number}},
allForks.Attestation,
VersionMeta
>;
Expand Down Expand Up @@ -805,16 +805,20 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
url: "/eth/v1/validator/aggregate_attestation",
method: "GET",
req: {
writeReq: ({attestationDataRoot, slot, index}) => ({
query: {attestation_data_root: toHexString(attestationDataRoot), slot, index},
writeReq: ({attestationDataRoot, slot, committeeIndex}) => ({
query: {attestation_data_root: toHexString(attestationDataRoot), slot, committeeIndex},
}),
parseReq: ({query}) => ({
attestationDataRoot: fromHexString(query.attestation_data_root),
slot: query.slot,
index: query.slot,
committeeIndex: query.slot,
}),
schema: {
query: {attestation_data_root: Schema.StringRequired, slot: Schema.UintRequired, index: Schema.UintRequired},
query: {
attestation_data_root: Schema.StringRequired,
slot: Schema.UintRequired,
committeeIndex: Schema.UintRequired,
},
},
},
resp: {
Expand Down
98 changes: 52 additions & 46 deletions packages/api/test/unit/beacon/testData/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ export const eventTestData: EventData = {
block: "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf",
executionOptimistic: false,
},
[EventType.attestation]: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
[EventType.attestation]: {
version: ForkName.altair,
data: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
},
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit.fromJson({
message: {epoch: "1", validator_index: "1"},
signature:
Expand Down Expand Up @@ -72,44 +75,47 @@ export const eventTestData: EventData = {
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
[EventType.attesterSlashing]: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
[EventType.attesterSlashing]: {
version: ForkName.altair,
data: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
}),
},
[EventType.blsToExecutionChange]: ssz.capella.SignedBLSToExecutionChange.fromJson({
message: {
validator_index: "1",
Expand Down
4 changes: 2 additions & 2 deletions packages/api/test/unit/beacon/testData/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ export const testData: GenericServerTestCases<Endpoints> = {
res: {data: ssz.altair.SyncCommitteeContribution.defaultValue()},
},
getAggregatedAttestation: {
args: {attestationDataRoot: ZERO_HASH, slot: 32000},
res: {data: ssz.phase0.Attestation.defaultValue()},
args: {attestationDataRoot: ZERO_HASH, slot: 32000, index: 2},
res: {data: ssz.phase0.Attestation.defaultValue(), meta: {version: ForkName.phase0}},
},
publishAggregateAndProofs: {
args: {signedAggregateAndProofs: [ssz.phase0.SignedAggregateAndProof.defaultValue()]},
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ export function getBeaconPoolApi({
async getPoolAttestations({slot, committeeIndex}) {
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? attestations[0].data.slot) ?? ForkName.phase0;

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
}

return {data: attestations};
return {data: attestations, meta: {version: fork}};
},

async getPoolAttesterSlashings() {
Expand Down
11 changes: 7 additions & 4 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,23 +1063,26 @@ export function getValidatorApi({
};
},

async getAggregatedAttestation({attestationDataRoot, slot}) {
async getAggregatedAttestation({attestationDataRoot, slot, committeeIndex}) {
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHex(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, dataRootHex);
const aggregate = chain.attestationPool.getAggregate(slot, committeeIndex, dataRootHex);

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
throw new ApiError(
404,
`No aggregated attestation for slot=${slot} committeeIndex=${committeeIndex}, dataRoot=${dataRootHex}`
);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
version: config.getForkName(slot),
meta: {version: config.getForkName(slot)},
};
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
getBlockRootAtSlot,
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef} from "@lodestar/utils";
import {toHex, MapDef, assert} from "@lodestar/utils";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -141,10 +141,8 @@ export class AggregatedAttestationPool {
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
if (committeeIndex === null) {
// this should not happen because attestation should be validated before reaching this
throw Error(`Invalid attestation slot=${slot} committeeIndex=${committeeIndex}`);
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
if (!attestationGroup) {
attestationGroup = new MatchingDataAttestationGroup(committee, attestation.data);
Expand Down
79 changes: 30 additions & 49 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray} from "@chainsafe/ssz";
import {Slot, RootHex, allForks} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Slot, RootHex, allForks, isElectraAttestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
Expand Down Expand Up @@ -36,6 +36,8 @@ type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
* the native aggregation scheme.
Expand All @@ -60,8 +62,8 @@ type DataRootHex = string;
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, AggregateFast>>(
() => new Map<DataRootHex, AggregateFast>()
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>>(
() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>()
);
private lowestPermissibleSlot = 0;

Expand Down Expand Up @@ -117,23 +119,35 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");

// Pre-aggregate the contribution with existing items
const aggregate = aggregateByRoot.get(attDataRootHex);
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
if (aggregateByIndex === undefined) {
aggregateByIndex = new Map<CommitteeIndex, AggregateFast>();
aggregateByRoot.set(attDataRootHex, aggregateByIndex);
}
const aggregate = aggregateByIndex.get(committeeIndex);
if (aggregate) {
// Aggregate mutating
return aggregateAttestationInto(aggregate, attestation);
} else {
// Create new aggregate
aggregateByRoot.set(attDataRootHex, attestationToAggregate(attestation));
aggregateByIndex.set(committeeIndex, attestationToAggregate(attestation));
return InsertOutcome.NewData;
}
}

/**
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex);
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
Expand Down Expand Up @@ -166,8 +180,10 @@ export class AttestationPool {

for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
for (const aggFast of aggregateByRoot.values()) {
attestations.push(fastToAttestation(aggFast));
for (const aggFastByIndex of aggregateByRoot.values()) {
for (const aggFast of aggFastByIndex.values()) {
attestations.push(fastToAttestation(aggFast));
}
}
}
}
Expand All @@ -180,35 +196,13 @@ export class AttestationPool {
// - Insert attestations coming from gossip and API

/**
* Aggregate a new contribution into `aggregate` mutating it
* Aggregate a new attestation into `aggregate` mutating it
*/
function aggregateAttestationInto(aggregate: AggregateFast, attestation: allForks.Attestation): InsertOutcome {
const bitIndex = attestation.aggregationBits.getSingleTrueBit();

// Should never happen, attestations are verified against this exact condition before
if (bitIndex === null) {
throw Error("Invalid attestation not exactly one bit set");
}

if ("committeeBits" in attestation && !("committeeBits" in aggregate)) {
throw Error("Attempt to aggregate electra attestation into phase0 attestation");
}

if (!("committeeBits" in attestation) && "committeeBits" in aggregate) {
throw Error("Attempt to aggregate phase0 attestation into electra attestation");
}

if ("committeeBits" in attestation) {
// We assume attestation.committeeBits should already be validated in api and gossip handler and should be non-null
const attestationCommitteeIndex = attestation.committeeBits.getSingleTrueBit();
const aggregateCommitteeIndex = (aggregate as AggregateFastElectra).committeeBits.getSingleTrueBit();

if (attestationCommitteeIndex !== aggregateCommitteeIndex) {
throw Error(
`Committee index mismatched: attestation ${attestationCommitteeIndex} aggregate ${aggregateCommitteeIndex}`
);
}
}
assert.notNull(bitIndex, "Invalid attestation in pool, not exactly one bit set");

if (aggregate.aggregationBits.get(bitIndex) === true) {
return InsertOutcome.AlreadyKnown;
Expand All @@ -226,7 +220,7 @@ function aggregateAttestationInto(aggregate: AggregateFast, attestation: allFork
* Format `contribution` into an efficient `aggregate` to add more contributions in with aggregateContributionInto()
*/
function attestationToAggregate(attestation: allForks.Attestation): AggregateFast {
if ("committeeBits" in attestation) {
if (isElectraAttestation(attestation)) {
return {
data: attestation.data,
// clone because it will be mutated
Expand All @@ -247,18 +241,5 @@ function attestationToAggregate(attestation: allForks.Attestation): AggregateFas
* Unwrap AggregateFast to phase0.Attestation
*/
function fastToAttestation(aggFast: AggregateFast): allForks.Attestation {
if ("committeeBits" in aggFast) {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
committeeBits: aggFast.committeeBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
} else {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
}
return {...aggFast, signature: aggFast.signature.toBytes(PointFormat.compressed)};
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function validateAggregateAndProof(
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NOT_EXACTLY_ONE_COMMITTEE_BIT_SET});
}
// [REJECT] aggregate.data.index == 0
if (attData.index === 0) {
if (attData.index !== 0) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX});
}
} else {
Expand Down
Loading

0 comments on commit 60fc855

Please sign in to comment.