Skip to content

Commit

Permalink
client: Enhance skeleton sync to process batches of new payloads and …
Browse files Browse the repository at this point in the history
…fcUs (#2309)

* client: Enhance skeleton sync to process batches of new payloads and fcUs

* beacon sync run loop as now block execution callback is outside lock

* set chain hardfork and log chain head

* make flow concurrency safe

* reset linked on backstep

* fix skeleton specs

* remove run with unlock

* move locking to execution from vm

* handle edge case

* Stub out peers in peerpool

* improve condition check

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* improve wording

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* improve wording

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* improve wording

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* improve wording

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* improve wording

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>

* incorporate feedback

* restore waiting for interval clearing

* clearify return value

Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com>
  • Loading branch information
g11tech and acolytec3 authored Oct 24, 2022
1 parent e65e00f commit 72a0736
Show file tree
Hide file tree
Showing 9 changed files with 870 additions and 590 deletions.
8 changes: 8 additions & 0 deletions packages/client/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ export interface ConfigOptions {
*/
maxFetcherJobs?: number

/**
* Max outgoing multi-peer requests by the fetcher at any given time
*/
maxFetcherRequests?: number

/**
* Number of peers needed before syncing
*
Expand Down Expand Up @@ -262,6 +267,7 @@ export class Config {
public static readonly PORT_DEFAULT = 30303
public static readonly MAXPERREQUEST_DEFAULT = 50
public static readonly MAXFETCHERJOBS_DEFAULT = 100
public static readonly MAXFETCHERREQUESTS_DEFAULT = 5
public static readonly MINPEERS_DEFAULT = 1
public static readonly MAXPEERS_DEFAULT = 25
public static readonly DNSADDR_DEFAULT = '8.8.8.8'
Expand All @@ -285,6 +291,7 @@ export class Config {
public readonly txLookupLimit: number
public readonly maxPerRequest: number
public readonly maxFetcherJobs: number
public readonly maxFetcherRequests: number
public readonly minPeers: number
public readonly maxPeers: number
public readonly dnsAddr: string
Expand Down Expand Up @@ -330,6 +337,7 @@ export class Config {
this.txLookupLimit = options.txLookupLimit ?? 2350000
this.maxPerRequest = options.maxPerRequest ?? Config.MAXPERREQUEST_DEFAULT
this.maxFetcherJobs = options.maxFetcherJobs ?? Config.MAXFETCHERJOBS_DEFAULT
this.maxFetcherRequests = options.maxPerRequest ?? Config.MAXFETCHERREQUESTS_DEFAULT
this.minPeers = options.minPeers ?? Config.MINPEERS_DEFAULT
this.maxPeers = options.maxPeers ?? Config.MAXPEERS_DEFAULT
this.dnsAddr = options.dnsAddr ?? Config.DNSADDR_DEFAULT
Expand Down
176 changes: 103 additions & 73 deletions packages/client/lib/execution/vmexecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { ConsensusType, Hardfork } from '@ethereumjs/common'
import { DefaultStateManager } from '@ethereumjs/statemanager'
import { Trie } from '@ethereumjs/trie'
import { bufferToHex } from '@ethereumjs/util'
import { Lock, bufferToHex } from '@ethereumjs/util'
import { VM } from '@ethereumjs/vm'

import { Event } from '../types'
Expand All @@ -23,6 +23,7 @@ import type { Block } from '@ethereumjs/block'
import type { RunBlockOpts, TxReceipt } from '@ethereumjs/vm'

export class VMExecution extends Execution {
private _lock = new Lock()
public vm: VM
public hardfork: string = ''

Expand Down Expand Up @@ -69,31 +70,50 @@ export class VMExecution extends Execution {
}
}

/**
* Run a function after acquiring a lock. It is implied that we have already
* initialized the module (or we are calling this from the init function, like
* `_setCanonicalGenesisBlock`)
* @param action - function to run after acquiring a lock
* @hidden
*/
private async runWithLock<T>(action: () => Promise<T>): Promise<T> {
try {
await this._lock.acquire()
const value = await action()
return value
} finally {
this._lock.release()
}
}

/**
* Initializes VM execution. Must be called before run() is called
*/
async open(): Promise<void> {
await this.vm.init()
if (typeof this.vm.blockchain.getIteratorHead !== 'function') {
throw new Error('cannot get iterator head: blockchain has no getIteratorHead function')
}
const headBlock = await this.vm.blockchain.getIteratorHead()
const { number } = headBlock.header
if (typeof this.vm.blockchain.getTotalDifficulty !== 'function') {
throw new Error('cannot get iterator head: blockchain has no getTotalDifficulty function')
}
const td = await this.vm.blockchain.getTotalDifficulty(headBlock.header.hash())
this.config.execCommon.setHardforkByBlockNumber(number, td)
this.hardfork = this.config.execCommon.hardfork()
this.config.logger.info(`Initializing VM execution hardfork=${this.hardfork}`)
if (number === BigInt(0)) {
if (typeof this.vm.blockchain.genesisState !== 'function') {
throw new Error('cannot get iterator head: blockchain has no genesisState function')
return this.runWithLock<void>(async () => {
await this.vm.init()
if (typeof this.vm.blockchain.getIteratorHead !== 'function') {
throw new Error('cannot get iterator head: blockchain has no getIteratorHead function')
}
await this.vm.eei.generateCanonicalGenesis(this.vm.blockchain.genesisState())
}
// TODO: Should a run be started to execute any left over blocks?
// void this.run()
const headBlock = await this.vm.blockchain.getIteratorHead()
const { number } = headBlock.header
if (typeof this.vm.blockchain.getTotalDifficulty !== 'function') {
throw new Error('cannot get iterator head: blockchain has no getTotalDifficulty function')
}
const td = await this.vm.blockchain.getTotalDifficulty(headBlock.header.hash())
this.config.execCommon.setHardforkByBlockNumber(number, td)
this.hardfork = this.config.execCommon.hardfork()
this.config.logger.info(`Initializing VM execution hardfork=${this.hardfork}`)
if (number === BigInt(0)) {
if (typeof this.vm.blockchain.genesisState !== 'function') {
throw new Error('cannot get iterator head: blockchain has no genesisState function')
}
await this.vm.eei.generateCanonicalGenesis(this.vm.blockchain.genesisState())
}
// TODO: Should a run be started to execute any left over blocks?
// void this.run()
})
}

/**
Expand All @@ -105,27 +125,29 @@ export class VMExecution extends Execution {
* @param receipts If we built this block, pass the receipts to not need to run the block again
*/
async runWithoutSetHead(opts: RunBlockOpts, receipts?: TxReceipt[]): Promise<void> {
const { block } = opts
if (receipts === undefined) {
const result = await this.vm.runBlock(opts)
receipts = result.receipts
}
if (receipts !== undefined) {
// Save receipts
this.pendingReceipts?.set(block.hash().toString('hex'), receipts)
}
// Bypass updating head by using blockchain db directly
const [hash, num] = [block.hash(), block.header.number]
const td =
(await this.chain.getTd(block.header.parentHash, block.header.number - BigInt(1))) +
block.header.difficulty
return this.runWithLock<void>(async () => {
const { block } = opts
if (receipts === undefined) {
const result = await this.vm.runBlock(opts)
receipts = result.receipts
}
if (receipts !== undefined) {
// Save receipts
this.pendingReceipts?.set(block.hash().toString('hex'), receipts)
}
// Bypass updating head by using blockchain db directly
const [hash, num] = [block.hash(), block.header.number]
const td =
(await this.chain.getTd(block.header.parentHash, block.header.number - BigInt(1))) +
block.header.difficulty

await this.chain.blockchain.dbManager.batch([
DBSetTD(td, num, hash),
...DBSetBlockOrHeader(block),
DBSetHashToNumber(hash, num),
...DBSaveLookups(hash, num),
])
await this.chain.blockchain.dbManager.batch([
DBSetTD(td, num, hash),
...DBSetBlockOrHeader(block),
DBSetHashToNumber(hash, num),
...DBSaveLookups(hash, num),
])
})
}

/**
Expand All @@ -134,25 +156,27 @@ export class VMExecution extends Execution {
* @param blocks Array of blocks to save pending receipts and set the last block as the head
*/
async setHead(blocks: Block[]): Promise<void> {
const vmHeadBlock = blocks[blocks.length - 1]
if (!(await this.vm.stateManager.hasStateRoot(vmHeadBlock.header.stateRoot))) {
// If we set blockchain iterator to somewhere where we don't have stateroot
// execution run will always fail
throw Error(
`vmHeadBlock's stateRoot not found number=${vmHeadBlock.header.number} root=${short(
vmHeadBlock.header.stateRoot
)}`
)
}
await this.chain.blockchain.setIteratorHead('vm', vmHeadBlock.hash())
await this.chain.putBlocks(blocks, true)
for (const block of blocks) {
const receipts = this.pendingReceipts?.get(block.hash().toString('hex'))
if (receipts) {
void this.receiptsManager?.saveReceipts(block, receipts)
this.pendingReceipts?.delete(block.hash().toString('hex'))
return this.runWithLock<void>(async () => {
const vmHeadBlock = blocks[blocks.length - 1]
if (!(await this.vm.stateManager.hasStateRoot(vmHeadBlock.header.stateRoot))) {
// If we set blockchain iterator to somewhere where we don't have stateroot
// execution run will always fail
throw Error(
`vmHeadBlock's stateRoot not found number=${vmHeadBlock.header.number} root=${short(
vmHeadBlock.header.stateRoot
)}`
)
}
}
await this.chain.putBlocks(blocks, true)
for (const block of blocks) {
const receipts = this.pendingReceipts?.get(block.hash().toString('hex'))
if (receipts) {
void this.receiptsManager?.saveReceipts(block, receipts)
this.pendingReceipts?.delete(block.hash().toString('hex'))
}
}
await this.chain.blockchain.setIteratorHead('vm', vmHeadBlock.hash())
})
}

/**
Expand Down Expand Up @@ -232,15 +256,17 @@ export class VMExecution extends Execution {
skipBlockValidation = true
}

// we are skipping header validation because the block has been picked from the
// blockchain and header should have already been validated while putBlock
const result = await this.vm.runBlock({
block,
root: parentState,
skipBlockValidation,
skipHeaderValidation: true,
await this.runWithLock<void>(async () => {
// we are skipping header validation because the block has been picked from the
// blockchain and header should have already been validated while putBlock
const result = await this.vm.runBlock({
block,
root: parentState,
skipBlockValidation,
skipHeaderValidation: true,
})
void this.receiptsManager?.saveReceipts(block, result.receipts)
})
void this.receiptsManager?.saveReceipts(block, result.receipts)
txCounter += block.transactions.length
// set as new head block
headBlock = block
Expand Down Expand Up @@ -297,6 +323,8 @@ export class VMExecution extends Execution {
)
numExecuted = await this.vmPromise

// TODO: one should update the iterator head later as this is dangerous for the blockchain and can cause
// problems in concurrent execution
if (errorBlock !== undefined) {
await this.chain.blockchain.setIteratorHead(
'vm',
Expand Down Expand Up @@ -350,12 +378,14 @@ export class VMExecution extends Execution {
* Stop VM execution. Returns a promise that resolves once its stopped.
*/
async stop(): Promise<boolean> {
if (this.vmPromise) {
// ensure that we wait that the VM finishes executing the block (and flushing the trie cache)
await this.vmPromise
}
await this.stateDB?.close()
await super.stop()
await this.runWithLock<void>(async () => {
if (this.vmPromise) {
// ensure that we wait that the VM finishes executing the block (and flushing the trie cache)
await this.vmPromise
}
await this.stateDB?.close()
await super.stop()
})
return true
}

Expand Down
63 changes: 41 additions & 22 deletions packages/client/lib/rpc/modules/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ export class Engine {
return response
}

// This optimistic lookup keeps skeleton updated even if for e.g. beacon sync might not have
// been intialized here but a batch of blocks new payloads arrive, most likely during sync
// We still can't switch to beacon sync here especially if the chain is pre merge and there
// is pow block which this client would like to mint and attempt proposing it
const optimisticLookup = await this.service.beaconSync?.extendChain(block)

const blockExists = await validBlock(toBuffer(blockHash), this.chain)
if (blockExists) {
const isBlockExecuted = await this.vm.stateManager.hasStateRoot(blockExists.header.stateRoot)
Expand All @@ -406,11 +412,6 @@ export class Engine {

try {
const parent = await this.chain.getBlock(toBuffer(parentHash))
const isBlockExecuted = await this.vm.stateManager.hasStateRoot(parent.header.stateRoot)
// If the parent is not executed throw an error, it will be caught and return SYNCING or ACCEPTED.
if (!isBlockExecuted) {
throw new Error(`Parent block not yet executed number=${parent.header.number}`)
}
if (!parent._common.gteHardfork(Hardfork.Merge)) {
const validTerminalBlock = await validateTerminalBlock(parent, this.chain)
if (!validTerminalBlock) {
Expand All @@ -422,14 +423,15 @@ export class Engine {
return response
}
}
} catch (error: any) {
if (!this.service.beaconSync && !this.config.disableBeaconSync) {
await this.service.switchToBeaconSync()
const isBlockExecuted = await this.vm.stateManager.hasStateRoot(parent.header.stateRoot)
// If the parent is not executed throw an error, it will be caught and return SYNCING or ACCEPTED.
if (!isBlockExecuted) {
throw new Error(`Parent block not yet executed number=${parent.header.number}`)
}
} catch (error: any) {
const status =
(await this.service.beaconSync?.extendChain(block)) === true
? Status.SYNCING
: Status.ACCEPTED
// If the transitioned to beacon sync and this block can extend beacon chain then
optimisticLookup === true ? Status.SYNCING : Status.ACCEPTED
if (status === Status.ACCEPTED) {
// Stash the block for a potential forced forkchoice update to it later.
this.remoteBlocks.set(block.hash().toString('hex'), block)
Expand Down Expand Up @@ -499,6 +501,13 @@ export class Engine {
const { headBlockHash, finalizedBlockHash, safeBlockHash } = params[0]
const payloadAttributes = params[1]

// It is possible that newPayload didnt start beacon sync as the payload it was asked to
// evaluate didn't require syncing beacon. This can happen if the EL<>CL starts and CL
// starts from a bit behind like how lodestar does
if (!this.service.beaconSync && !this.config.disableBeaconSync) {
await this.service.switchToBeaconSync()
}

/*
* Process head block
*/
Expand All @@ -519,16 +528,19 @@ export class Engine {
const response = { payloadStatus, payloadId: null }
return response
} else {
this.config.logger.debug(
`Forkchoice requested sync to new head number=${headBlock.header.number} hash=${short(
headBlock.hash()
)}`
)
await this.service.beaconSync?.setHead(headBlock)
this.remoteBlocks.delete(headBlockHash.slice(2))
}
}

// Always keep beaconSync skeleton updated so that it stays updated with any skeleton sync
// requirements that might come later because of reorg or CL restarts
this.config.logger.debug(
`Forkchoice requested update to new head number=${headBlock.header.number} hash=${short(
headBlock.hash()
)}`
)
await this.service.beaconSync?.setHead(headBlock)

// Only validate this as terminal block if this block's difficulty is non-zero,
// else this is a PoS block but its hardfork could be indeterminable if the skeleton
// is not yet connected.
Expand All @@ -547,16 +559,23 @@ export class Engine {
}
}

const isHeadExecuted = await this.vm.stateManager.hasStateRoot(headBlock.header.stateRoot)
if (!isHeadExecuted) {
// execution has not yet caught up, so lets just return sync
const payloadStatus = {
status: Status.SYNCING,
latestValidHash: null,
validationError: null,
}
const response = { payloadStatus, payloadId: null }
return response
}

const vmHeadHash = this.chain.headers.latest!.hash()
if (!vmHeadHash.equals(headBlock.hash())) {
let parentBlocks: Block[] = []
if (this.chain.headers.latest && this.chain.headers.latest.number < headBlock.header.number) {
try {
const parent = await this.chain.getBlock(toBuffer(headBlock.header.parentHash))
const isBlockExecuted = await this.vm.stateManager.hasStateRoot(parent.header.stateRoot)
if (!isBlockExecuted) {
throw new Error(`Parent block not yet executed number=${parent.header.number}`)
}
parentBlocks = await recursivelyFindParents(
vmHeadHash,
headBlock.header.parentHash,
Expand Down
Loading

0 comments on commit 72a0736

Please sign in to comment.