Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix canonical reset of the chain by the skeleton #3078

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions packages/client/src/rpc/modules/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ export class Engine {
this.invalidBlocks.delete(blockHash.slice(2))

// newpayloadv3 comes with parentBeaconBlockRoot out of the payload
const { block, error } = await assembleBlock(
const { block: headBlock, error } = await assembleBlock(
{
...payload,
// ExecutionPayload only handles undefined
Expand All @@ -666,7 +666,7 @@ export class Engine {
this.chain,
this.chainCache
)
if (block === undefined || error) {
if (headBlock === undefined || error !== undefined) {
let response = error
if (!response) {
const validationError = `Error assembling block during init`
Expand All @@ -678,14 +678,14 @@ export class Engine {
return response
}

if (block.common.isActivatedEIP(4844)) {
if (headBlock.common.isActivatedEIP(4844)) {
let validationError: string | null = null
if (blobVersionedHashes === undefined || blobVersionedHashes === null) {
validationError = `Error verifying blobVersionedHashes: received none`
} else {
// Collect versioned hashes in the flat array `txVersionedHashes` to match with received
const txVersionedHashes = []
for (const tx of block.transactions) {
for (const tx of headBlock.transactions) {
if (tx instanceof BlobEIP4844Transaction) {
for (const vHash of tx.blobVersionedHashes) {
txVersionedHashes.push(vHash)
Expand Down Expand Up @@ -725,13 +725,13 @@ export class Engine {
return response
}

this.connectionManager.updatePayloadStats(block)
this.connectionManager.updatePayloadStats(headBlock)

const hardfork = block.common.hardfork()
const hardfork = headBlock.common.hardfork()
if (hardfork !== this.lastNewPayloadHF && this.lastNewPayloadHF !== '') {
this.config.logger.info(
`Hardfork change along new payload block number=${block.header.number} hash=${short(
block.hash()
`Hardfork change along new payload block number=${headBlock.header.number} hash=${short(
headBlock.hash()
)} old=${this.lastNewPayloadHF} new=${hardfork}`
)
}
Expand Down Expand Up @@ -764,9 +764,9 @@ export class Engine {

// validate 4844 transactions and fields as these validations generally happen on putBlocks
// when parent is confirmed to be in the chain. But we can do it here early
if (block.common.isActivatedEIP(4844)) {
if (headBlock.common.isActivatedEIP(4844)) {
try {
block.validateBlobTransactions(parent.header)
headBlock.validateBlobTransactions(parent.header)
} catch (error: any) {
const validationError = `Invalid 4844 transactions: ${error}`
const latestValidHash = await validHash(
Expand All @@ -788,14 +788,13 @@ export class Engine {
throw new Error(`Parent block not yet executed number=${parent.header.number}`)
}
} catch (error: any) {
const optimisticLookup = !(await this.skeleton.setHead(block, false))
// Stash the block for a potential forced forkchoice update to it later.
this.remoteBlocks.set(bytesToUnprefixedHex(headBlock.hash()), headBlock)

const optimisticLookup = !(await this.skeleton.setHead(headBlock, false))
const status =
// If the transitioned to beacon sync and this block can extend beacon chain then
optimisticLookup === true ? Status.SYNCING : Status.ACCEPTED
if (status === Status.ACCEPTED) {
// Stash the block for a potential forced forkchoice update to it later.
this.remoteBlocks.set(bytesToUnprefixedHex(block.hash()), block)
}
const response = { status, validationError: null, latestValidHash: null }
return response
}
Expand All @@ -807,8 +806,8 @@ export class Engine {
//
// Call skeleton.setHead without forcing head change to return if the block is reorged or not
// Do optimistic lookup if not reorged
const optimisticLookup = !(await this.skeleton.setHead(block, false))
this.remoteBlocks.set(bytesToUnprefixedHex(block.hash()), block)
const optimisticLookup = !(await this.skeleton.setHead(headBlock, false))
this.remoteBlocks.set(bytesToUnprefixedHex(headBlock.hash()), headBlock)

// we should check if the block exists executed in remoteBlocks or in chain as a check since stateroot
// exists in statemanager is not sufficient because an invalid crafted block with valid block hash with
Expand All @@ -829,13 +828,13 @@ export class Engine {
let blocks: Block[]
try {
// find parents till vmHead but limit lookups till engineParentLookupMaxDepth
blocks = await recursivelyFindParents(vmHead.hash(), block.header.parentHash, this.chain)
blocks = await recursivelyFindParents(vmHead.hash(), headBlock.header.parentHash, this.chain)
} catch (error) {
const response = { status: Status.SYNCING, latestValidHash: null, validationError: null }
return response
}

blocks.push(block)
blocks.push(headBlock)

let lastBlock: Block
try {
Expand All @@ -861,7 +860,18 @@ export class Engine {
setHardfork: this.chain.headers.td,
})
: false

// if can't be executed then return syncing/accepted
if (!executed) {
this.config.logger.debug(
`Skipping block(s) execution for headBlock=${headBlock.header.number} hash=${short(
headBlock.hash()
)} : pendingBlocks=${blocks.length - i}(limit=${
this.chain.config.engineNewpayloadMaxExecute
}) transactions=${block.transactions.length}(limit=${
this.chain.config.engineNewpayloadMaxTxsExecute
}) executionBusy=${this.execution.running}`
)
// determind status to be returned depending on if block could extend chain or not
const status = optimisticLookup === true ? Status.SYNCING : Status.ACCEPTED
const response = { status, latestValidHash: null, validationError: null }
Expand All @@ -874,7 +884,11 @@ export class Engine {
} catch (error) {
const validationError = `Error verifying block while running: ${error}`
this.config.logger.error(validationError)
const latestValidHash = await validHash(block.header.parentHash, this.chain, this.chainCache)
const latestValidHash = await validHash(
headBlock.header.parentHash,
this.chain,
this.chainCache
)
const response = { status: Status.INVALID, latestValidHash, validationError }
this.invalidBlocks.set(blockHash.slice(2), error as Error)
this.remoteBlocks.delete(blockHash.slice(2))
Expand All @@ -891,7 +905,7 @@ export class Engine {

const response = {
status: Status.VALID,
latestValidHash: bytesToHex(block.hash()),
latestValidHash: bytesToHex(headBlock.hash()),
validationError: null,
}
return response
Expand Down
15 changes: 11 additions & 4 deletions packages/client/src/service/fullethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ export class FullEthereumService extends Service {
} else {
if (this.config.chainCommon.gteHardfork(Hardfork.Paris) === true) {
if (!this.config.disableBeaconSync) {
void this.switchToBeaconSync()
// skip opening the beacon synchronizer before everything else (chain, execution etc)
// as it resets and messes up the entire chain
void this.switchToBeaconSync(true)
}
this.config.logger.info(`Post-merge 🐼 client mode: run with CL client.`)
} else {
Expand Down Expand Up @@ -120,7 +122,7 @@ export class FullEthereumService extends Service {
/**
* Helper to switch to {@link BeaconSynchronizer}
*/
async switchToBeaconSync() {
async switchToBeaconSync(skipOpen: boolean = false) {
if (this.synchronizer instanceof FullSynchronizer) {
await this.synchronizer.stop()
await this.synchronizer.close()
Expand All @@ -136,12 +138,13 @@ export class FullEthereumService extends Service {
execution: this.execution,
skeleton: this.skeleton!,
})
await this.synchronizer.open()
if (!skipOpen) {
await this.synchronizer.open()
}
}

async open() {
if (this.synchronizer !== undefined) {
await this.skeleton?.open()
this.config.logger.info(
`Preparing for sync using FullEthereumService with ${
this.synchronizer instanceof BeaconSynchronizer
Expand Down Expand Up @@ -170,6 +173,10 @@ export class FullEthereumService extends Service {
}
if (txs[0].length > 0) this.txPool.sendNewTxHashes(txs, [peer])
})

// skeleton needs to be opened before synchronizers are opened
// but after chain is opened, which skeleton.open() does internally
await this.skeleton?.open()
await super.open()
await this.execution.open()
this.txPool.open()
Expand Down
32 changes: 26 additions & 6 deletions packages/client/src/service/skeleton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,23 @@ export class Skeleton extends MetaDBManager {
}

async open() {
// make sure to open chain before this can be opened
await this.chain.open()

await this.runWithLock<void>(async () => {
await this.getSyncStatus()
this.logSyncStatus('Read')
this.started = new Date().getTime()
})
}

async close() {
await this.runWithLock<void>(async () => {
await this.writeSyncStatus()
this.started = 0
})
}

/**
* Returns true if the skeleton chain is linked to canonical
*/
Expand Down Expand Up @@ -312,6 +322,10 @@ export class Skeleton extends MetaDBManager {
*/
async setHead(head: Block, force = true, init = false, reorgthrow = false): Promise<boolean> {
return this.runWithLock<boolean>(async () => {
if (this.started === 0) {
throw Error(`skeleton setHead called before being opened`)
}

this.config.logger.debug(
`New skeleton head announced number=${head.header.number} hash=${short(
head.hash()
Expand Down Expand Up @@ -343,12 +357,12 @@ export class Skeleton extends MetaDBManager {
this.config.logger.info(
`Created new subchain head=${s.head} tail=${s.tail} next=${short(s.next)}`
)
// Reset the filling of canonical head from tail only on tail reorg
this.status.canonicalHeadReset = true
} else {
// Only the head differed, tail is preserved
subchain.head = head.header.number
}
// Reset the filling of canonical head from tail on reorg
this.status.canonicalHeadReset = true
}
// Put this block irrespective of the force
await this.putBlock(head)
Expand Down Expand Up @@ -433,8 +447,8 @@ export class Skeleton extends MetaDBManager {

// subchains are useful if subChain1Head is in skeleton only and its tail correct
const subChain1Head = await this.getBlock(this.status.progress.subchains[1].head, true)
// tail lookup can be from skeleton or chain
const subChain1Tail = await this.getBlock(this.status.progress.subchains[1].tail)
// tail lookup also needs to be from skeleton because we set resetCanonicalHead true if merged
const subChain1Tail = await this.getBlock(this.status.progress.subchains[1].tail, true)
if (
subChain1Head === undefined ||
subChain1Tail === undefined ||
Expand Down Expand Up @@ -480,9 +494,14 @@ export class Skeleton extends MetaDBManager {
*/
async putBlocks(blocks: Block[]): Promise<number> {
return this.runWithLock<number>(async () => {
// if no subchain or linked chain throw error as this will exit the fetcher
if (this.status.progress.subchains.length === 0) {
throw Error(`Skeleton no subchain set for sync`)
}
if (this.status.linked) {
throw Error(`Chain already linked`)
}

let merged = false
let tailUpdated = false
this.config.logger.debug(
Expand Down Expand Up @@ -719,6 +738,7 @@ export class Skeleton extends MetaDBManager {
})
break
}

canonicalHead += BigInt(numBlocksInserted)
this.fillLogIndex += numBlocksInserted
// Delete skeleton block to clean up as we go, if block is fetched and chain is linked
Expand Down Expand Up @@ -832,8 +852,8 @@ export class Skeleton extends MetaDBManager {

private logSyncStatus(logPrefix: string): void {
this.config.logger.debug(
`${logPrefix} sync status linked=${
this.status.linked
`${logPrefix} sync status linked=${this.status.linked} canonicalHeadReset=${
this.status.canonicalHeadReset
} subchains=${this.status.progress.subchains
.map((s) => `[head=${s.head} tail=${s.tail} next=${short(s.next)}]`)
.join(',')}`
Expand Down
3 changes: 2 additions & 1 deletion packages/client/test/rpc/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ export async function setupChain(genesisFile: any, chainName = 'dev', clientOpts

const { chain } = client
const service = client.services.find((s) => s.name === 'eth') as FullEthereumService
const { execution } = service
const { execution, skeleton } = service

await chain.open()
await skeleton?.open()
await execution?.open()
await chain.update()
return { chain, common, execution: execution!, server, service, blockchain }
Expand Down
14 changes: 14 additions & 0 deletions packages/client/test/sync/skeleton.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ const block51 = Block.fromBlockData(
{ common }
)

describe('[Skeleton]/ startup ', () => {
it('starts the chain when starting the skeleton', async () => {
const config = new Config({
common,
})
const chain = await Chain.create({ config })
const skeleton = new Skeleton({ chain, config, metaDB: new MemoryLevel() })
assert.equal(chain.opened, false, 'chain is not started')
await skeleton.open()
assert.equal(chain.opened, true, 'chain is opened by skeleton')
})
})

describe('[Skeleton] / initSync', async () => {
// Tests various sync initializations based on previous leftovers in the database
// and announced heads.
Expand Down Expand Up @@ -229,6 +242,7 @@ describe('[Skeleton] / initSync', async () => {
})
}
})

describe('[Skeleton] / setHead', async () => {
// Tests that a running skeleton sync can be extended with properly linked up
// headers but not with side chains.
Expand Down