diff --git a/src/datastore/helpers.ts b/src/datastore/helpers.ts index a5aabbaa1e..fba14fdaa8 100644 --- a/src/datastore/helpers.ts +++ b/src/datastore/helpers.ts @@ -1,4 +1,4 @@ -import { parseEnum, unwrapOptionalProp } from '../helpers'; +import { getUintEnvOrDefault, parseEnum, unwrapOptionalProp } from '../helpers'; import { BlockQueryResult, ContractTxQueryResult, @@ -66,6 +66,7 @@ import { PgStoreEventEmitter } from './pg-store-event-emitter'; import { SyntheticPoxEventName } from '../pox-helpers'; import { logger } from '../logger'; import { PgSqlClient } from '@hirosystems/api-toolkit'; +import PQueue from 'p-queue'; export const TX_COLUMNS = [ 'tx_id', @@ -1335,3 +1336,21 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { restoredMempoolTxs: 0, }; } + +/** + * Priority queue for parallel Postgres write query execution. This helps performance because it + * parallelizes the work postgres.js has to do when serializing JS types to PG types. + */ +export class PgWriteQueue { + readonly queue: PQueue; + constructor() { + const concurrency = Math.max(1, getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4)); + this.queue = new PQueue({ concurrency, autoStart: true }); + } + enqueue(task: Parameters[0]): void { + void this.queue.add(task); + } + done(): Promise { + return this.queue.onIdle(); + } +} diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 0de9723d16..e43f72626e 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1,5 +1,4 @@ import * as assert from 'assert'; -import PQueue from 'p-queue'; import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers'; import { DbBlock, @@ -75,6 +74,7 @@ import { TX_METADATA_TABLES, validateZonefileHash, newReOrgUpdatedEntities, + PgWriteQueue, } from './helpers'; import { PgNotifier } from './pg-notifier'; import { MIGRATIONS_DIR, PgStore } from './pg-store'; @@ -95,10 +95,6 @@ import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; -const STACKS_BLOCK_DATA_INSERT_CONCURRENCY = Math.max( - 1, - getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4) -); const MEMPOOL_STATS_DEBOUNCE_INTERVAL = getUintEnvOrDefault( 'MEMPOOL_STATS_DEBOUNCE_INTERVAL', 1000 @@ -194,11 +190,7 @@ export class PgWriteStore extends PgStore { markBlockUpdateDataAsNonCanonical(data); } else { const txIds = data.txs.map(d => d.tx.tx_id); - const pruneRes = await this.pruneMempoolTxs(sql, txIds); - if (pruneRes.removedTxs.length > 0) - logger.debug( - `Removed ${pruneRes.removedTxs.length} txs from mempool table during new block ingestion` - ); + await this.pruneMempoolTxs(sql, txIds); } setTotalBlockUpdateDataExecutionCost(data); @@ -209,8 +201,8 @@ export class PgWriteStore extends PgStore { return !insertedMicroblockHashes.has(entry.tx.microblock_hash); }); - // When processing an immediately-non-canonical block, do not orphan and possible existing microblocks - // which may be still considered canonical by the canonical block at this height. + // When processing an immediately-non-canonical block, do not orphan and possible existing + // microblocks which may be still considered canonical by the canonical block at this height. if (isCanonical) { const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical( sql, @@ -226,7 +218,8 @@ export class PgWriteStore extends PgStore { } ); - // Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool + // Identify any micro-orphaned txs that also didn't make it into this anchor block, and + // restore them into the mempool const orphanedAndMissingTxs = orphanedMicroblockTxs.filter( tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id) ); @@ -238,7 +231,8 @@ export class PgWriteStore extends PgStore { logger.info(`Restored micro-orphaned tx to mempool ${txId}`); }); - // Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts. + // Clear accepted microblock txs from the anchor-block update data to avoid duplicate + // inserts. batchedTxData = batchedTxData.filter(entry => { const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id); return !matchingTx; @@ -257,37 +251,35 @@ export class PgWriteStore extends PgStore { } } if ((await this.updateBlock(sql, data.block)) !== 0) { - await this.updateMinerRewards(sql, data.minerRewards); + const q = new PgWriteQueue(); + q.enqueue(() => this.updateMinerRewards(sql, data.minerRewards)); if (batchedTxData.length > 0) { - const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); - const enqueue = (task: Parameters[0]) => void q.add(task); - enqueue(() => + q.enqueue(() => this.updateTx( sql, batchedTxData.map(b => b.tx) ) ); - enqueue(() => this.updateStxEvents(sql, batchedTxData)); - enqueue(() => this.updatePrincipalStxTxs(sql, batchedTxData)); - enqueue(() => this.updateSmartContractEvents(sql, batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', batchedTxData)); - enqueue(() => this.updateStxLockEvents(sql, batchedTxData)); - enqueue(() => this.updateFtEvents(sql, batchedTxData)); + q.enqueue(() => this.updateStxEvents(sql, batchedTxData)); + q.enqueue(() => this.updatePrincipalStxTxs(sql, batchedTxData)); + q.enqueue(() => this.updateSmartContractEvents(sql, batchedTxData)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', batchedTxData)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', batchedTxData)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', batchedTxData)); + q.enqueue(() => this.updateStxLockEvents(sql, batchedTxData)); + q.enqueue(() => this.updateFtEvents(sql, batchedTxData)); for (const entry of batchedTxData) { - enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents)); - enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); - enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); - enqueue(() => this.updateNames(sql, entry.tx, entry.names)); + q.enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents)); + q.enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); + q.enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); + q.enqueue(() => this.updateNames(sql, entry.tx, entry.names)); } - await q.onIdle(); - } - const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); - if (mempoolGarbageResults.deletedTxs.length > 0) { - logger.debug(`Garbage collected ${mempoolGarbageResults.deletedTxs.length} mempool txs`); } - garbageCollectedMempoolTxs = mempoolGarbageResults.deletedTxs; + q.enqueue(async () => { + const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); + garbageCollectedMempoolTxs = mempoolGarbageResults.deletedTxs; + }); + await q.done(); } if (!this.isEventReplay) { @@ -2138,32 +2130,32 @@ export class PgWriteStore extends PgStore { } if (txs.length > 0) { - const rowsUpdated = await this.updateTx( - sql, - txs.map(t => t.tx) - ); - if (rowsUpdated !== txs.length) { - throw new Error( - `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` + const q = new PgWriteQueue(); + q.enqueue(async () => { + const rowsUpdated = await this.updateTx( + sql, + txs.map(t => t.tx) ); - } - const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); - const enqueue = (task: Parameters[0]) => void q.add(task); - enqueue(() => this.updateStxEvents(sql, txs)); - enqueue(() => this.updatePrincipalStxTxs(sql, txs)); - enqueue(() => this.updateSmartContractEvents(sql, txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', txs)); - enqueue(() => this.updateStxLockEvents(sql, txs)); - enqueue(() => this.updateFtEvents(sql, txs)); + if (rowsUpdated !== txs.length) + throw new Error( + `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` + ); + }); + q.enqueue(() => this.updateStxEvents(sql, txs)); + q.enqueue(() => this.updatePrincipalStxTxs(sql, txs)); + q.enqueue(() => this.updateSmartContractEvents(sql, txs)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', txs)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', txs)); + q.enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', txs)); + q.enqueue(() => this.updateStxLockEvents(sql, txs)); + q.enqueue(() => this.updateFtEvents(sql, txs)); for (const entry of txs) { - enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents, true)); - enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); - enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); - enqueue(() => this.updateNames(sql, entry.tx, entry.names)); + q.enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents, true)); + q.enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); + q.enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); + q.enqueue(() => this.updateNames(sql, entry.tx, entry.names)); } - await q.onIdle(); + await q.done(); } } @@ -2491,6 +2483,8 @@ export class PgWriteStore extends PgStore { ) SELECT tx_id FROM pruned `; + const txIds = deletedTxResults.map(r => r.tx_id); + if (txIds.length > 0) logger.debug(`Garbage collected ${txIds.length} mempool txs`); return { deletedTxs: deletedTxResults.map(r => r.tx_id) }; } @@ -2500,178 +2494,199 @@ export class PgWriteStore extends PgStore { canonical: boolean, updatedEntities: ReOrgUpdatedEntities ): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> { - const txResult = await sql<{ tx_id: string }[]>` - UPDATE txs - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - RETURNING tx_id - `; - const txIds = txResult.map(row => row.tx_id); - if (canonical) { - updatedEntities.markedCanonical.txs += txResult.count; - } else { - updatedEntities.markedNonCanonical.txs += txResult.count; - } - if (txResult.count) - await sql` - UPDATE principal_stx_txs + const result: { txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] } = { + txsMarkedCanonical: [], + txsMarkedNonCanonical: [], + }; + + const q = new PgWriteQueue(); + q.enqueue(async () => { + const txResult = await sql<{ tx_id: string }[]>` + UPDATE txs SET canonical = ${canonical} - WHERE tx_id IN ${sql(txIds)} - AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + RETURNING tx_id `; + const txIds = txResult.map(row => row.tx_id); + if (canonical) { + updatedEntities.markedCanonical.txs += txResult.count; + result.txsMarkedCanonical = txIds; + } else { + updatedEntities.markedNonCanonical.txs += txResult.count; + result.txsMarkedNonCanonical = txIds; + } + if (txResult.count) + await sql` + UPDATE principal_stx_txs + SET canonical = ${canonical} + WHERE tx_id IN ${sql(txIds)} + AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + }); + q.enqueue(async () => { + const minerRewardResults = await sql` + UPDATE miner_rewards + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.minerRewards += minerRewardResults.count; + } else { + updatedEntities.markedNonCanonical.minerRewards += minerRewardResults.count; + } + }); + q.enqueue(async () => { + const stxLockResults = await sql` + UPDATE stx_lock_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.stxLockEvents += stxLockResults.count; + } else { + updatedEntities.markedNonCanonical.stxLockEvents += stxLockResults.count; + } + }); + q.enqueue(async () => { + const stxResults = await sql` + UPDATE stx_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.stxEvents += stxResults.count; + } else { + updatedEntities.markedNonCanonical.stxEvents += stxResults.count; + } + }); + q.enqueue(async () => { + const ftResult = await sql` + UPDATE ft_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.ftEvents += ftResult.count; + } else { + updatedEntities.markedNonCanonical.ftEvents += ftResult.count; + } + }); + q.enqueue(async () => { + const nftResult = await sql` + UPDATE nft_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.nftEvents += nftResult.count; + } else { + updatedEntities.markedNonCanonical.nftEvents += nftResult.count; + } + if (nftResult.count) + await this.updateNftCustodyFromReOrg(sql, { + index_block_hash: indexBlockHash, + microblocks: [], + }); + }); + q.enqueue(async () => { + const pox2Result = await sql` + UPDATE pox2_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.pox2Events += pox2Result.count; + } else { + updatedEntities.markedNonCanonical.pox2Events += pox2Result.count; + } + }); + q.enqueue(async () => { + const pox3Result = await sql` + UPDATE pox3_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.pox3Events += pox3Result.count; + } else { + updatedEntities.markedNonCanonical.pox3Events += pox3Result.count; + } + }); + q.enqueue(async () => { + const pox4Result = await sql` + UPDATE pox4_events + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.pox4Events += pox4Result.count; + } else { + updatedEntities.markedNonCanonical.pox4Events += pox4Result.count; + } + }); + q.enqueue(async () => { + const contractLogResult = await sql` + UPDATE contract_logs + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.contractLogs += contractLogResult.count; + } else { + updatedEntities.markedNonCanonical.contractLogs += contractLogResult.count; + } + }); + q.enqueue(async () => { + const smartContractResult = await sql` + UPDATE smart_contracts + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.smartContracts += smartContractResult.count; + } else { + updatedEntities.markedNonCanonical.smartContracts += smartContractResult.count; + } + }); + q.enqueue(async () => { + const nameResult = await sql` + UPDATE names + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.names += nameResult.count; + } else { + updatedEntities.markedNonCanonical.names += nameResult.count; + } + }); + q.enqueue(async () => { + const namespaceResult = await sql` + UPDATE namespaces + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.namespaces += namespaceResult.count; + } else { + updatedEntities.markedNonCanonical.namespaces += namespaceResult.count; + } + }); + q.enqueue(async () => { + const subdomainResult = await sql` + UPDATE subdomains + SET canonical = ${canonical} + WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} + `; + if (canonical) { + updatedEntities.markedCanonical.subdomains += subdomainResult.count; + } else { + updatedEntities.markedNonCanonical.subdomains += subdomainResult.count; + } + }); + await q.done(); - const minerRewardResults = await sql` - UPDATE miner_rewards - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.minerRewards += minerRewardResults.count; - } else { - updatedEntities.markedNonCanonical.minerRewards += minerRewardResults.count; - } - - const stxLockResults = await sql` - UPDATE stx_lock_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.stxLockEvents += stxLockResults.count; - } else { - updatedEntities.markedNonCanonical.stxLockEvents += stxLockResults.count; - } - - const stxResults = await sql` - UPDATE stx_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.stxEvents += stxResults.count; - } else { - updatedEntities.markedNonCanonical.stxEvents += stxResults.count; - } - - const ftResult = await sql` - UPDATE ft_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.ftEvents += ftResult.count; - } else { - updatedEntities.markedNonCanonical.ftEvents += ftResult.count; - } - - const nftResult = await sql` - UPDATE nft_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.nftEvents += nftResult.count; - } else { - updatedEntities.markedNonCanonical.nftEvents += nftResult.count; - } - if (nftResult.count) - await this.updateNftCustodyFromReOrg(sql, { - index_block_hash: indexBlockHash, - microblocks: [], - }); - - const pox2Result = await sql` - UPDATE pox2_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.pox2Events += pox2Result.count; - } else { - updatedEntities.markedNonCanonical.pox2Events += pox2Result.count; - } - - const pox3Result = await sql` - UPDATE pox3_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.pox3Events += pox3Result.count; - } else { - updatedEntities.markedNonCanonical.pox3Events += pox3Result.count; - } - - const pox4Result = await sql` - UPDATE pox4_events - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.pox4Events += pox4Result.count; - } else { - updatedEntities.markedNonCanonical.pox4Events += pox4Result.count; - } - - const contractLogResult = await sql` - UPDATE contract_logs - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.contractLogs += contractLogResult.count; - } else { - updatedEntities.markedNonCanonical.contractLogs += contractLogResult.count; - } - - const smartContractResult = await sql` - UPDATE smart_contracts - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.smartContracts += smartContractResult.count; - } else { - updatedEntities.markedNonCanonical.smartContracts += smartContractResult.count; - } - - const nameResult = await sql` - UPDATE names - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.names += nameResult.count; - } else { - updatedEntities.markedNonCanonical.names += nameResult.count; - } - - const namespaceResult = await sql` - UPDATE namespaces - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.namespaces += namespaceResult.count; - } else { - updatedEntities.markedNonCanonical.namespaces += namespaceResult.count; - } - - const subdomainResult = await sql` - UPDATE subdomains - SET canonical = ${canonical} - WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - `; - if (canonical) { - updatedEntities.markedCanonical.subdomains += subdomainResult.count; - } else { - updatedEntities.markedNonCanonical.subdomains += subdomainResult.count; - } - - return { - txsMarkedCanonical: canonical ? txIds : [], - txsMarkedNonCanonical: canonical ? [] : txIds, - }; + return result; } async restoreOrphanedChain(