Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up altair block processing 2x #3115

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ OK: 16/16 Fail: 0/16 Skip: 0/16
+ latest_block_root OK
```
OK: 3/3 Fail: 0/3 Skip: 0/3
## Block pool altair processing [Preset: mainnet]
```diff
+ Invalid signatures [Preset: mainnet] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet]
```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
Expand Down Expand Up @@ -165,7 +170,7 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
## Gossip validation [Preset: mainnet]
```diff
+ Any committee index is valid OK
+ Validation sanity OK
+ validateAttestation OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Gossip validation - Extra
Expand Down Expand Up @@ -366,4 +371,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1

---TOTAL---
OK: 206/208 Fail: 0/208 Skip: 2/208
OK: 207/209 Fail: 0/209 Skip: 2/209
6 changes: 6 additions & 0 deletions beacon_chain/consensus_object_pools/block_pools_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ type
onFinHappened*: OnFinalizedCallback
## On finalization callback

headSyncCommittees*: SyncCommitteeCache ##\
## A cache of the sync committees, as they appear in the head state -
## using the head state is slightly wrong - if a reorg deeper than
## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync
## committee messages will be rejected

EpochKey* = object
## The epoch key fully determines the shuffling for proposers and
## committees in a beacon state - the epoch level information in the state
Expand Down
39 changes: 22 additions & 17 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func init*(
finalized_checkpoint: getStateField(state.data, finalized_checkpoint),
shuffled_active_validator_indices:
cache.get_shuffled_active_validator_indices(state.data, epoch)
)
)

for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] = get_beacon_proposer_index(
Expand Down Expand Up @@ -548,6 +548,10 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# have a cache
dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq())

withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)

info "Block dag initialized",
head = shortLog(headRef),
finalizedHead = shortLog(dag.finalizedHead),
Expand Down Expand Up @@ -1037,8 +1041,8 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
dagPruneDur = Moment.now() - startTick

iterator syncSubcommittee*(
syncCommittee: openArray[ValidatorPubKey],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorPubKey =
syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex =
var
i = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE
onePastEndIdx = min(syncCommittee.len, i + SYNC_SUBCOMMITTEE_SIZE)
Expand All @@ -1060,36 +1064,33 @@ iterator syncSubcommitteePairs*(
inc i

func syncCommitteeParticipants*(dag: ChainDAGRef,
slot: Slot): seq[ValidatorPubKey] =
slot: Slot): seq[ValidatorIndex] =
withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
let
period = sync_committee_period(slot)
curPeriod = sync_committee_period(state.data.slot)

if period == curPeriod:
@(state.data.current_sync_committee.pubkeys.data)
@(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1:
@(state.data.current_sync_committee.pubkeys.data)
@(dag.headSyncCommittees.next_sync_committee)
else: @[]
else:
@[]

func getSubcommitteePositionsAux(
dag: ChainDAGRef,
syncCommittee: openarray[ValidatorPubKey],
syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex,
validatorIdx: uint64): seq[uint64] =
# TODO Can we avoid the key conversions by getting a compressed key
# out of ImmutableValidatorData2? If we had this, we can define
# the function `dag.validatorKeyBytes` and use it here.
let validatorKey = dag.validatorKey(validatorIdx)
if validatorKey.isNone():
return @[]
let validatorPubKey = validatorKey.get().toPubKey

for pos, key in toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx)):
if validatorPubKey == key:
if validatorIdx == uint64(key):
result.add uint64(pos)

func getSubcommitteePositions*(dag: ChainDAGRef,
Expand All @@ -1102,31 +1103,31 @@ func getSubcommitteePositions*(dag: ChainDAGRef,
period = sync_committee_period(slot)
curPeriod = sync_committee_period(state.data.slot)

template search(syncCommittee: openarray[ValidatorPubKey]): seq[uint64] =
template search(syncCommittee: openArray[ValidatorIndex]): seq[uint64] =
dag.getSubcommitteePositionsAux(
syncCommittee, subcommitteeIdx, validatorIdx)

if period == curPeriod:
search(state.data.current_sync_committee.pubkeys.data)
search(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1:
search(state.data.current_sync_committee.pubkeys.data)
search(dag.headSyncCommittees.next_sync_committee)
else: @[]
else:
@[]

template syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorPubKey] =
subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorIndex] =
toSeq(syncSubcommittee(dag.syncCommitteeParticipants(slot), subcommitteeIdx))

iterator syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex,
aggregationBits: SyncCommitteeAggregationBits): ValidatorPubKey =
aggregationBits: SyncCommitteeAggregationBits): ValidatorIndex =
for pos, valIdx in pairs(dag.syncCommitteeParticipants(slot, subcommitteeIdx)):
if aggregationBits[pos]:
if pos < aggregationBits.bits and aggregationBits[pos]:
yield valIdx

func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool =
Expand Down Expand Up @@ -1207,6 +1208,10 @@ proc updateHead*(

dag.db.putHeadBlock(newHead.root)

withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)

let
finalizedHead = newHead.atEpochStart(
getStateField(dag.headState.data, finalized_checkpoint).epoch)
Expand Down
16 changes: 11 additions & 5 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ proc validateSyncCommitteeMessage*(

ok((positionsInSubcommittee, cookedSignature.get()))

# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.5/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
proc validateSignedContributionAndProof*(
dag: ChainDAGRef,
syncCommitteeMsgPool: var SyncCommitteeMsgPool,
Expand Down Expand Up @@ -855,16 +855,22 @@ proc validateSignedContributionAndProof*(
initialized = false
syncCommitteeSlot = msg.message.contribution.slot + 1

for validatorPubKey in dag.syncCommitteeParticipants(
for validatorIndex in dag.syncCommitteeParticipants(
syncCommitteeSlot,
committeeIdx,
msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get
let validatorPubKey = dag.validatorKey(validatorIndex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous use of the cache here was quite important for performance. @mratsim, is dag.validatorKey(index) considered fast enough? (it's based internally on loadValid which in turn is based on fromBytesKnownOnCurve).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_sim is a good benchmark for this, because it simulates the full network traffic around sync committees and their aggregation. The ncli_db bench just replays blocks, so it covers only the sync aggregate verification code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableValidatorData2 stores uncompressed pubkeys that are quite fast to load - this was introduced a while ago, giving a decent performance boost: d859bc1

We're using this same approach for attestations and everything else, pretty much - if we're going to update it, we should update it across the board by changing the backend storage for validatorKey - sync message validation was the outlier here, and loadWithCache is in general a problematic construct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting before/after block_sim results will be a convincing evidence that there is no performance regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a benchmark for pubkeys and signature deserialization.
On a 11th gen Intel it's 14 µs per key.

image

status-im/nim-blscurve#126

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that said, it's likely a valid optimization point regardless, to load the data into cooked pubkeys on startup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure 13440 is relevant here - different data set, multiple flaws in methodology

Copy link
Contributor

@zah zah Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if loading uncompressed keys is so fast, loading them on start-up shouldn't be a problem either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate PR though, since it affects attestations significantly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not validatorPubKey.isSome():
# This should never happen (!)
warn "Invalid validator index in committee cache",
validatorIndex
return errIgnore("SignedContributionAndProof: Invalid committee cache")

if not initialized:
initialized = true
committeeAggKey.init(validatorPubKey)
committeeAggKey.init(validatorPubKey.get())
else:
committeeAggKey.aggregate(validatorPubKey)
committeeAggKey.aggregate(validatorPubKey.get())

if not initialized:
# [REJECT] The contribution has participants
Expand Down
44 changes: 40 additions & 4 deletions beacon_chain/spec/beaconstate.nim
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ proc initialize_beacon_state_from_eth1*(
deposits.len)
state.eth1_deposit_index = deposits.lenu64

var pubkeyToIndex = initTable[ValidatorPubKey, int]()
var pubkeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
for idx, deposit in deposits:
let
pubkey = deposit.pubkey
Expand All @@ -249,7 +249,7 @@ proc initialize_beacon_state_from_eth1*(
do:
if skipBlsValidation in flags or
verify_deposit_signature(cfg, deposit):
pubkeyToIndex[pubkey] = state.validators.len
pubkeyToIndex[pubkey] = ValidatorIndex(state.validators.len)
if not state.validators.add(get_validator_from_deposit(deposit)):
raiseAssert "too many validators"
if not state.balances.add(amount):
Expand Down Expand Up @@ -707,6 +707,9 @@ func get_next_sync_committee_keys(state: altair.BeaconState | merge.BeaconState)
## Return the sequence of sync committee indices (which may include
## duplicate indices) for the next sync committee, given a ``state`` at a
## sync committee period boundary.
# The sync committe depends on seed and effective balance - it can
# thus only be computed for the current epoch of the state, after balance
# updates have been performed

let epoch = get_current_epoch(state) + 1

Expand Down Expand Up @@ -744,9 +747,9 @@ proc get_next_sync_committee*(state: altair.BeaconState | merge.BeaconState):
# see signatures_batch, TODO shouldn't be here
# Deposit processing ensures all keys are valid
var attestersAgg: AggregatePublicKey
attestersAgg.init(res.pubkeys.data[0].loadWithCache().get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with using the cache here?

Copy link
Member Author

@arnetheduck arnetheduck Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache would only be used once per day for the 512 lookups and uses quite a lot of memory (absolute minimum 144 bytes per validator + hashset and threadvar overhead which is significant) - thanks to the index cache (about 4kb fixed size), the cost no longer matches the benefit. There are no other uses of this cache outside of test cases (I'd generally remove it).

There's an additional optimization that can be done to pre-seed the statecache with the index cache as long as the states line up, but that's for a separate PR - the main benefit is during replays where many blocks need to be applied - the creation of the index mapping is the elephant in the room.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the keys would end up in the cache anyway because the cache was also used in gossip validation. I guess it's now possible to introduce a more precise eviction of the cache around the update points of the SyncCommitteeCache (or the cooked keys could be stored in the SyncCommitteeCache, but this seems potentially sub-optimal if we end up needing the same validators keys in other caches).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two caches of the same data, where one would be enough: loadWithCache and validatorKey - the latter is tied to our database storage and should be preferred, also because constructing it is faster: loadWithCache works from compressed keys. If anything, we could keep cooked keys in the in-memory representation that backs validatorKey

attestersAgg.init(res.pubkeys.data[0].load().get)
for i in 1 ..< res.pubkeys.data.len:
attestersAgg.aggregate(res.pubkeys.data[i].loadWithCache().get)
attestersAgg.aggregate(res.pubkeys.data[i].load().get)

res.aggregate_pubkey = finish(attestersAgg).toPubKey()
res
Expand Down Expand Up @@ -922,3 +925,36 @@ func latest_block_root*(state: ForkyBeaconState, state_root: Eth2Digest): Eth2Di

func latest_block_root*(state: ForkyHashedBeaconState): Eth2Digest =
latest_block_root(state.data, state.root)

func get_sync_committee_cache*(
state: altair.BeaconState | merge.BeaconState, cache: var StateCache):
SyncCommitteeCache =
let period = state.slot.sync_committee_period()

cache.sync_committees.withValue(period, v) do:
return v[]

var
s = toHashSet(state.current_sync_committee.pubkeys.data)

for pk in state.next_sync_committee.pubkeys.data:
s.incl(pk)

var pubkeyIndices: Table[ValidatorPubKey, ValidatorIndex]
for i, v in state.validators:
if v.pubkey in s:
pubkeyIndices[v.pubkey] = i.ValidatorIndex

var res: SyncCommitteeCache
try:
for i in 0..<res.current_sync_committee.len():
res.current_sync_committee[i] =
pubkeyIndices[state.current_sync_committee.pubkeys[i]]
res.next_sync_committee[i] =
pubkeyIndices[state.next_sync_committee.pubkeys[i]]
except KeyError:
raiseAssert "table constructed just above"

cache.sync_committees[period] = res

res
4 changes: 3 additions & 1 deletion beacon_chain/spec/datatypes/altair.nim
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ type
## - ProposerSlashing (SignedBeaconBlockHeader)
## - AttesterSlashing (IndexedAttestation)
## - SignedVoluntaryExits
## - SyncAggregate
##
## However:
## - ETH1Data (Deposits) can contain invalid BLS signatures
##
## The block state transition has NOT been verified
Expand All @@ -373,7 +375,7 @@ type
voluntary_exits*: List[TrustedSignedVoluntaryExit, Limit MAX_VOLUNTARY_EXITS]

# [New in Altair]
sync_aggregate*: SyncAggregate # TODO TrustedSyncAggregate after batching
sync_aggregate*: TrustedSyncAggregate

SyncnetBits* = BitArray[SYNC_COMMITTEE_SUBNET_COUNT]

Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/datatypes/base.nim
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,17 @@ type
message*: AggregateAndProof
signature*: ValidatorSig

SyncCommitteeCache* = object
current_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]
next_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]

# This doesn't know about forks or branches in the DAG. It's for straight,
# linear chunks of the chain.
StateCache* = object
shuffled_active_validator_indices*:
Table[Epoch, seq[ValidatorIndex]]
beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]]
sync_committees*: Table[SyncCommitteePeriod, SyncCommitteeCache]

# This matches the mutable state of the Solidity deposit contract
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/solidity_deposit_contract/deposit_contract.sol
Expand Down
5 changes: 1 addition & 4 deletions beacon_chain/spec/eth2_apis/eth2_json_rpc_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ proc toJsonHex(data: openArray[byte]): string =

proc fromJson*(n: JsonNode, argName: string, result: var ValidatorPubKey) {.raises: [Defect, ValueError].} =
n.kind.expect(JString, argName)
var tmp = ValidatorPubKey.fromHex(n.getStr()).tryGet()
if not tmp.loadWithCache().isSome():
raise (ref ValueError)(msg: "Invalid public BLS key")
result = tmp
result = ValidatorPubKey.fromHex(n.getStr()).tryGet()

proc `%`*(pubkey: ValidatorPubKey): JsonNode =
newJString(toJsonHex(toRaw(pubkey)))
Expand Down
44 changes: 44 additions & 0 deletions beacon_chain/spec/signatures_batch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,48 @@ proc collectSignatureSets*(
volex.message.epoch,
DOMAIN_VOLUNTARY_EXIT)

block:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For documentation it would be nice to have a comment section
7. SyncAggregate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8e445dc - there's not much to write though, like proposer signature, it's just a single field with a signature in it

# 7. SyncAggregate
# ----------------------------------------------------
withState(state):
when stateFork >= BeaconStateFork.Altair and
(signed_block is altair.SignedBeaconBlock or
signed_block is merge.SignedBeaconBlock):
let
current_sync_committee =
state.data.get_sync_committee_cache(cache).current_sync_committee

var inited = false
var attestersAgg{.noInit.}: AggregatePublicKey
for i in 0 ..< current_sync_committee.len:
if signed_block.message.body.sync_aggregate.sync_committee_bits[i]:
let key = validatorKeys.load(current_sync_committee[i])
if not key.isSome():
return err("Invalid key cache")

if not inited: # first iteration
attestersAgg.init(key.get())
inited = true
else:
attestersAgg.aggregate(key.get())

if not inited:
if signed_block.message.body.sync_aggregate.sync_committee_signature !=
default(CookedSig).toValidatorSig():
return err("process_sync_aggregate: empty sync aggregates need signature of point at infinity")
else:
let
attesters = finish(attestersAgg)
previous_slot = max(state.data.slot, Slot(1)) - 1

sigs.addSignatureSet(
attesters,
get_block_root_at_slot(state.data, previous_slot),
signed_block.message.body.sync_aggregate.sync_committee_signature.loadOrExit(
"process_sync_aggregate: cannot load signature"),
state.data.fork,
state.data.genesis_validators_root,
previous_slot.epoch,
DOMAIN_SYNC_COMMITTEE)

ok()
Loading