From 4c6abda1e1ac3699fce6546bd43aae4333608813 Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 11:53:44 -0600 Subject: [PATCH 1/6] feat: run inserts in batch and in parallel when processing new block Signed-off-by: bestmike007 --- src/datastore/pg-write-store.ts | 415 +++++++++++------- .../importers/new-block-importer.ts | 8 +- src/tests/datastore-tests.ts | 4 +- src/tests/search-tests.ts | 4 +- 4 files changed, 255 insertions(+), 176 deletions(-) diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 397d6c5da7..754d95c15e 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1,3 +1,5 @@ +import * as assert from 'assert'; +import PQueue from 'p-queue'; import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers'; import { DbBlock, @@ -38,7 +40,6 @@ import { BurnchainRewardInsertValues, TxInsertValues, MempoolTxInsertValues, - MempoolTxQueryResult, SmartContractInsertValues, BnsNameInsertValues, BnsNamespaceInsertValues, @@ -66,10 +67,8 @@ import { setTotalBlockUpdateDataExecutionCost, convertTxQueryResultToDbMempoolTx, markBlockUpdateDataAsNonCanonical, - MEMPOOL_TX_COLUMNS, MICROBLOCK_COLUMNS, parseBlockQueryResult, - parseMempoolTxQueryResult, parseMicroblockQueryResult, parseTxQueryResult, TX_COLUMNS, @@ -256,20 +255,30 @@ export class PgWriteStore extends PgStore { } if ((await this.updateBlock(sql, data.block)) !== 0) { await this.updateMinerRewards(sql, data.minerRewards); - for (const entry of batchedTxData) { - await this.updateTx(sql, entry.tx); - await this.updateStxEvents(sql, entry.tx, entry.stxEvents); - await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); - await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); - await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); - await this.updateFtEvents(sql, entry.tx, entry.ftEvents); - await this.updateNftEvents(sql, entry.tx, entry.nftEvents); - await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); - await this.updateNamespaces(sql, entry.tx, entry.namespaces); - await this.updateNames(sql, entry.tx, entry.names); + if (batchedTxData.length > 0) { + const q = new PQueue({ concurrency: 4 }); + const enqueue = (task: Parameters[0]) => void q.add(task); + enqueue(() => + this.updateTx( + sql, + batchedTxData.map(b => b.tx) + ) + ); + enqueue(() => this.updateStxEvents(sql, batchedTxData)); + enqueue(() => this.updatePrincipalStxTxs(sql, batchedTxData)); + enqueue(() => this.updateSmartContractEvents(sql, batchedTxData)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', batchedTxData)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', batchedTxData)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', batchedTxData)); + enqueue(() => this.updateStxLockEvents(sql, batchedTxData)); + enqueue(() => this.updateFtEvents(sql, batchedTxData)); + for (const entry of batchedTxData) { + enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents)); + enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); + enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); + enqueue(() => this.updateNames(sql, entry.tx, entry.names)); + } + await q.onIdle(); } const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); if (mempoolGarbageResults.deletedTxs.length > 0) { @@ -746,14 +755,35 @@ export class PgWriteStore extends PgStore { logger.info('Updated block zero boot data', tablesUpdates); } - async updatePoxSyntheticEvents( - sql: PgSqlClient, - tx: DbTx, - poxTable: PoxSyntheticEventTable, - events: DbPoxSyntheticEvent[] - ) { - for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { - const values = batch.map(event => { + async updatePoxSyntheticEvents< + T extends PoxSyntheticEventTable, + Entry extends { tx: DbTx } & ('pox2_events' extends T + ? { pox2Events: DbPoxSyntheticEvent[] } + : 'pox3_events' extends T + ? { pox3Events: DbPoxSyntheticEvent[] } + : 'pox4_events' extends T + ? { pox4Events: DbPoxSyntheticEvent[] } + : never) + >(sql: PgSqlClient, poxTable: T, entries: Entry[]) { + const values: PoxSyntheticEventInsertValues[] = []; + for (const entry of entries) { + let events: DbPoxSyntheticEvent[] | null = null; + switch (poxTable) { + case 'pox2_events': + assert('pox2Events' in entry); + events = entry.pox2Events; + case 'pox3_events': + assert('pox3Events' in entry); + events = entry.pox3Events; + case 'pox4_events': + assert('pox4Events' in entry); + events = entry.pox4Events; + } + if (events == null) { + throw new Error(`unknown pox table: ${poxTable}`); + } + const tx = entry.tx; + for (const event of events) { const value: PoxSyntheticEventInsertValues = { event_index: event.event_index, tx_id: event.tx_id, @@ -786,6 +816,7 @@ export class PgWriteStore extends PgStore { reward_cycle: null, amount_ustx: null, }; + // Set event-specific columns switch (event.name) { case SyntheticPoxEventName.HandleUnlock: { @@ -862,59 +893,80 @@ export class PgWriteStore extends PgStore { ); } } - return value; - }); - await sql` - INSERT INTO ${sql(poxTable)} ${sql(values)} + values.push(value); + } + } + for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { + const res = await sql` + INSERT INTO ${sql(String(poxTable))} ${sql(batch)} `; + if (res.count !== batch.length) { + throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); + } } } - async updateStxLockEvents(sql: PgSqlClient, tx: DbTx, events: DbStxLockEvent[]) { - for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { - const values: StxLockEventInsertValues[] = batch.map(event => ({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - locked_amount: event.locked_amount.toString(), - unlock_height: event.unlock_height, - locked_address: event.locked_address, - contract_name: event.contract_name, - })); - await sql` - INSERT INTO stx_lock_events ${sql(values)} + async updateStxLockEvents( + sql: PgSqlClient, + entries: { tx: DbTx; stxLockEvents: DbStxLockEvent[] }[] + ) { + const values: StxLockEventInsertValues[] = []; + for (const { tx, stxLockEvents } of entries) { + for (const event of stxLockEvents) { + values.push({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + locked_amount: event.locked_amount.toString(), + unlock_height: event.unlock_height, + locked_address: event.locked_address, + contract_name: event.contract_name, + }); + } + } + for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { + const res = await sql` + INSERT INTO stx_lock_events ${sql(batch)} `; + if (res.count !== batch.length) { + throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); + } } } - async updateStxEvents(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { - for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { - const values: StxEventInsertValues[] = eventBatch.map(event => ({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - asset_event_type_id: event.asset_event_type_id, - sender: event.sender ?? null, - recipient: event.recipient ?? null, - amount: event.amount, - memo: event.memo ?? null, - })); + async updateStxEvents(sql: PgSqlClient, entries: { tx: DbTx; stxEvents: DbStxEvent[] }[]) { + const values: StxEventInsertValues[] = []; + for (const { tx, stxEvents } of entries) { + for (const event of stxEvents) { + values.push({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + asset_event_type_id: event.asset_event_type_id, + sender: event.sender ?? null, + recipient: event.recipient ?? null, + amount: event.amount, + memo: event.memo ?? null, + }); + } + } + for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { const res = await sql` - INSERT INTO stx_events ${sql(values)} + INSERT INTO stx_events ${sql(eventBatch)} `; if (res.count !== eventBatch.length) { throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); @@ -926,45 +978,43 @@ export class PgWriteStore extends PgStore { * Update the `principal_stx_tx` table with the latest `tx_id`s that resulted in a STX * transfer relevant to a principal (stx address or contract id). * @param sql - DB client - * @param tx - Transaction - * @param events - Transaction STX events + * @param entries - list of tx and stxEvents */ - async updatePrincipalStxTxs(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { - const insertPrincipalStxTxs = async (principals: string[]) => { - principals = [...new Set(principals)]; // Remove duplicates - const values: PrincipalStxTxsInsertValues[] = principals.map(principal => ({ - principal: principal, - tx_id: tx.tx_id, - block_height: tx.block_height, - index_block_hash: tx.index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - tx_index: tx.tx_index, - canonical: tx.canonical, - microblock_canonical: tx.microblock_canonical, - })); + async updatePrincipalStxTxs(sql: PgSqlClient, entries: { tx: DbTx; stxEvents: DbStxEvent[] }[]) { + const values: PrincipalStxTxsInsertValues[] = []; + for (const { tx, stxEvents } of entries) { + const principals = new Set( + [ + tx.sender_address, + tx.token_transfer_recipient_address, + tx.contract_call_contract_id, + tx.smart_contract_contract_id, + ].filter((p): p is string => !!p) + ); + for (const event of stxEvents) { + if (event.sender) principals.add(event.sender); + if (event.recipient) principals.add(event.recipient); + } + for (const principal of principals) { + values.push({ + principal: principal, + tx_id: tx.tx_id, + block_height: tx.block_height, + index_block_hash: tx.index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + tx_index: tx.tx_index, + canonical: tx.canonical, + microblock_canonical: tx.microblock_canonical, + }); + } + } + + for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { await sql` - INSERT INTO principal_stx_txs ${sql(values)} + INSERT INTO principal_stx_txs ${sql(eventBatch)} ON CONFLICT ON CONSTRAINT unique_principal_tx_id_index_block_hash_microblock_hash DO NOTHING `; - }; - // Insert tx data - await insertPrincipalStxTxs( - [ - tx.sender_address, - tx.token_transfer_recipient_address, - tx.contract_call_contract_id, - tx.smart_contract_contract_id, - ].filter((p): p is string => !!p) // Remove undefined - ); - // Insert stx_event data - for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { - const principals: string[] = []; - for (const event of eventBatch) { - if (event.sender) principals.push(event.sender); - if (event.recipient) principals.push(event.recipient); - } - await insertPrincipalStxTxs(principals); } } @@ -1106,28 +1156,36 @@ export class PgWriteStore extends PgStore { `; } - async updateFtEvents(sql: PgSqlClient, tx: DbTx, events: DbFtEvent[]) { - for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { - const values: FtEventInsertValues[] = batch.map(event => ({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - asset_event_type_id: event.asset_event_type_id, - sender: event.sender ?? null, - recipient: event.recipient ?? null, - asset_identifier: event.asset_identifier, - amount: event.amount.toString(), - })); - await sql` - INSERT INTO ft_events ${sql(values)} + async updateFtEvents(sql: PgSqlClient, entries: { tx: DbTx; ftEvents: DbFtEvent[] }[]) { + const values: FtEventInsertValues[] = []; + for (const { tx, ftEvents } of entries) { + for (const event of ftEvents) { + values.push({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + asset_event_type_id: event.asset_event_type_id, + sender: event.sender ?? null, + recipient: event.recipient ?? null, + asset_identifier: event.asset_identifier, + amount: event.amount.toString(), + }); + } + } + for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { + const res = await sql` + INSERT INTO ft_events ${sql(batch)} `; + if (res.count !== batch.length) { + throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); + } } } @@ -1226,25 +1284,33 @@ export class PgWriteStore extends PgStore { } } - async updateSmartContractEvents(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) { - for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { - const values: SmartContractEventInsertValues[] = eventBatch.map(event => ({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - contract_identifier: event.contract_identifier, - topic: event.topic, - value: event.value, - })); + async updateSmartContractEvents( + sql: PgSqlClient, + entries: { tx: DbTx; contractLogEvents: DbSmartContractEvent[] }[] + ) { + const values: SmartContractEventInsertValues[] = []; + for (const { tx, contractLogEvents } of entries) { + for (const event of contractLogEvents) { + values.push({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + contract_identifier: event.contract_identifier, + topic: event.topic, + value: event.value, + }); + } + } + for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { const res = await sql` - INSERT INTO contract_logs ${sql(values)} + INSERT INTO contract_logs ${sql(eventBatch)} `; if (res.count !== eventBatch.length) { throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); @@ -1534,8 +1600,9 @@ export class PgWriteStore extends PgStore { } } - async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise { - const values: TxInsertValues = { + async updateTx(sql: PgSqlClient, txs: DbTxRaw | DbTxRaw[]): Promise { + if (!Array.isArray(txs)) txs = [txs]; + const values: TxInsertValues[] = txs.map(tx => ({ tx_id: tx.tx_id, raw_tx: tx.raw_tx, tx_index: tx.tx_index, @@ -1591,12 +1658,17 @@ export class PgWriteStore extends PgStore { execution_cost_runtime: tx.execution_cost_runtime, execution_cost_write_count: tx.execution_cost_write_count, execution_cost_write_length: tx.execution_cost_write_length, - }; - const result = await sql` - INSERT INTO txs ${sql(values)} - ON CONFLICT ON CONSTRAINT unique_tx_id_index_block_hash_microblock_hash DO NOTHING - `; - return result.count; + })); + + let count = 0; + for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { + const res = await sql` + INSERT INTO txs ${sql(eventBatch)} + ON CONFLICT ON CONSTRAINT unique_tx_id_index_block_hash_microblock_hash DO NOTHING + `; + count += res.count; + } + return count; } async insertDbMempoolTxs( @@ -2088,26 +2160,33 @@ export class PgWriteStore extends PgStore { } } - for (const entry of txs) { - const rowsUpdated = await this.updateTx(sql, entry.tx); - if (rowsUpdated !== 1) { + if (txs.length > 0) { + const rowsUpdated = await this.updateTx( + sql, + txs.map(t => t.tx) + ); + if (rowsUpdated !== txs.length) { throw new Error( - `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}` + `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` ); } - - await this.updateStxEvents(sql, entry.tx, entry.stxEvents); - await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); - await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); - await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); - await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); - await this.updateFtEvents(sql, entry.tx, entry.ftEvents); - await this.updateNftEvents(sql, entry.tx, entry.nftEvents, true); - await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); - await this.updateNamespaces(sql, entry.tx, entry.namespaces); - await this.updateNames(sql, entry.tx, entry.names); + const q = new PQueue({ concurrency: 4 }); + const enqueue = (task: Parameters[0]) => void q.add(task); + enqueue(() => this.updateStxEvents(sql, txs)); + enqueue(() => this.updatePrincipalStxTxs(sql, txs)); + enqueue(() => this.updateSmartContractEvents(sql, txs)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', txs)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', txs)); + enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', txs)); + enqueue(() => this.updateStxLockEvents(sql, txs)); + enqueue(() => this.updateFtEvents(sql, txs)); + for (const entry of txs) { + enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents, true)); + enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); + enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); + enqueue(() => this.updateNames(sql, entry.tx, entry.names)); + } + await q.onIdle(); } } diff --git a/src/event-replay/parquet-based/importers/new-block-importer.ts b/src/event-replay/parquet-based/importers/new-block-importer.ts index ff7e76a005..a0b3ba4f82 100644 --- a/src/event-replay/parquet-based/importers/new-block-importer.ts +++ b/src/event-replay/parquet-based/importers/new-block-importer.ts @@ -376,7 +376,7 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updateStxLockEvents(db.sql, entry.tx, entry.stxLockEvents); + await db.updateStxLockEvents(db.sql, [entry]); } }; @@ -386,19 +386,19 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertPox2Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox2_events', entry.pox2Events); + await db.updatePoxSyntheticEvents(db.sql, 'pox2_events', [entry]); } }; const insertPox3Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox3_events', entry.pox3Events); + await db.updatePoxSyntheticEvents(db.sql, 'pox3_events', [entry]); } }; const insertPox4Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox4_events', entry.pox4Events); + await db.updatePoxSyntheticEvents(db.sql, 'pox4_events', [entry]); } }; diff --git a/src/tests/datastore-tests.ts b/src/tests/datastore-tests.ts index ce2617c73e..d513f61d5c 100644 --- a/src/tests/datastore-tests.ts +++ b/src/tests/datastore-tests.ts @@ -195,7 +195,7 @@ describe('postgres datastore', () => { createStxLockEvent('addrA', 222n, 1), createStxLockEvent('addrB', 333n, 1), ]; - await db.updateStxLockEvents(client, tx, stxLockEvents); + await db.updateStxLockEvents(client, [{ tx, stxLockEvents }]); await db.updateTx(client, tx); await db.updateTx(client, tx2); @@ -3546,7 +3546,7 @@ describe('postgres datastore', () => { } // insert stx lock events directly - await db.updateStxLockEvents(client, tx1, [stxLockEvent1]); + await db.updateStxLockEvents(client, [{ tx: tx1, stxLockEvents: [stxLockEvent1] }]); const block5: DbBlock = { block_hash: '0x55', diff --git a/src/tests/search-tests.ts b/src/tests/search-tests.ts index 5135630497..c7806dfff3 100644 --- a/src/tests/search-tests.ts +++ b/src/tests/search-tests.ts @@ -773,7 +773,7 @@ describe('search tests', () => { recipient: addr5, sender: 'none', }; - await db.updateFtEvents(client, stxTx1, [ftEvent1]); + await db.updateFtEvents(client, [{ tx: stxTx1, ftEvents: [ftEvent1] }]); // test address as a ft event recipient const searchResult5 = await supertest(api.server).get(`/extended/v1/search/${addr5}`); @@ -801,7 +801,7 @@ describe('search tests', () => { recipient: 'none', sender: addr6, }; - await db.updateFtEvents(client, stxTx1, [ftEvent2]); + await db.updateFtEvents(client, [{ tx: stxTx1, ftEvents: [ftEvent2] }]); // test address as a ft event sender const searchResult6 = await supertest(api.server).get(`/extended/v1/search/${addr6}`); From 74bf509d0fc5825915150a441dd566578fd54bc3 Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 12:53:42 -0600 Subject: [PATCH 2/6] chore: use asserts Signed-off-by: bestmike007 --- src/datastore/pg-write-store.ts | 49 ++++++++++++++------------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 754d95c15e..8234cab286 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -900,9 +900,7 @@ export class PgWriteStore extends PgStore { const res = await sql` INSERT INTO ${sql(String(poxTable))} ${sql(batch)} `; - if (res.count !== batch.length) { - throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); - } + assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -935,9 +933,7 @@ export class PgWriteStore extends PgStore { const res = await sql` INSERT INTO stx_lock_events ${sql(batch)} `; - if (res.count !== batch.length) { - throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); - } + assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -964,13 +960,11 @@ export class PgWriteStore extends PgStore { }); } } - for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { + for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { const res = await sql` - INSERT INTO stx_events ${sql(eventBatch)} + INSERT INTO stx_events ${sql(batch)} `; - if (res.count !== eventBatch.length) { - throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); - } + assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -1053,9 +1047,10 @@ export class PgWriteStore extends PgStore { ON CONFLICT ON CONSTRAINT unique_name_zonefile_hash_tx_id_index_block_hash DO UPDATE SET zonefile = EXCLUDED.zonefile `; - if (result.count !== zonefileValues.length) { - throw new Error(`Expected ${result.count} zonefile inserts, got ${zonefileValues.length}`); - } + assert( + result.count === zonefileValues.length, + `Expecting ${result.count} zonefile inserts, got ${zonefileValues.length}` + ); } async updateBatchSubdomains( @@ -1111,9 +1106,10 @@ export class PgWriteStore extends PgStore { microblock_sequence = EXCLUDED.microblock_sequence, microblock_canonical = EXCLUDED.microblock_canonical `; - if (result.count !== subdomainValues.length) { - throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`); - } + assert( + result.count === subdomainValues.length, + `Expecting ${subdomainValues.length} subdomain inserts, got ${result.count}` + ); } async resolveBnsSubdomains( @@ -1183,9 +1179,7 @@ export class PgWriteStore extends PgStore { const res = await sql` INSERT INTO ft_events ${sql(batch)} `; - if (res.count !== batch.length) { - throw new Error(`Expected ${batch.length} inserts, got ${res.count}`); - } + assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -1308,13 +1302,11 @@ export class PgWriteStore extends PgStore { }); } } - for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { + for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { const res = await sql` - INSERT INTO contract_logs ${sql(eventBatch)} + INSERT INTO contract_logs ${sql(batch)} `; - if (res.count !== eventBatch.length) { - throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); - } + assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -2031,9 +2023,10 @@ export class PgWriteStore extends PgStore { const res = await sql` INSERT INTO token_offering_locked ${sql(lockedInfos, 'address', 'value', 'block')} `; - if (res.count !== lockedInfos.length) { - throw new Error(`Expected ${lockedInfos.length} inserts, got ${res.count}`); - } + assert( + res.count === lockedInfos.length, + `Expecting ${lockedInfos.length} inserts, got ${res.count}` + ); } catch (e: any) { logger.error(e, `Locked Info errors ${e.message}`); throw e; From c2cc1b2ccec8acca4b27eb0740bb09d678d348da Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 13:09:35 -0600 Subject: [PATCH 3/6] chore: make insert concurrency configurable Signed-off-by: bestmike007 --- .env | 4 ++++ src/datastore/pg-write-store.ts | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.env b/.env index 9f6393cee7..8af239eb23 100644 --- a/.env +++ b/.env @@ -43,6 +43,10 @@ PG_APPLICATION_NAME=stacks-blockchain-api # Limit to how many concurrent connections can be created, defaults to 10 # PG_CONNECTION_POOL_MAX=10 +# Insert concurrency when processing new blocks +# It'd better set it higher, e.g. 8 or 16, if your postgres is running on SSD and multiple CPU cores +# INSERT_CONCURRENCY=4 + # If specified, controls the Stacks Blockchain API mode. The possible values are: # * `readonly`: Runs the API endpoints without an Event Server that listens to events from a node and # writes them to the local database. The API will only read data from the PG database diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 8234cab286..2192be0057 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -96,6 +96,7 @@ import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; +const INSERT_CONCURRENCY = getUintEnvOrDefault('INSERT_CONCURRENCY', 4); const MEMPOOL_STATS_DEBOUNCE_INTERVAL = getUintEnvOrDefault( 'MEMPOOL_STATS_DEBOUNCE_INTERVAL', 1000 @@ -256,7 +257,7 @@ export class PgWriteStore extends PgStore { if ((await this.updateBlock(sql, data.block)) !== 0) { await this.updateMinerRewards(sql, data.minerRewards); if (batchedTxData.length > 0) { - const q = new PQueue({ concurrency: 4 }); + const q = new PQueue({ concurrency: INSERT_CONCURRENCY }); const enqueue = (task: Parameters[0]) => void q.add(task); enqueue(() => this.updateTx( @@ -2163,7 +2164,7 @@ export class PgWriteStore extends PgStore { `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` ); } - const q = new PQueue({ concurrency: 4 }); + const q = new PQueue({ concurrency: INSERT_CONCURRENCY }); const enqueue = (task: Parameters[0]) => void q.add(task); enqueue(() => this.updateStxEvents(sql, txs)); enqueue(() => this.updatePrincipalStxTxs(sql, txs)); From aff1aa1451e719c9f2e9c0a8fb1dc365ee5a9498 Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 13:32:08 -0600 Subject: [PATCH 4/6] chore: rename env to STACKS_BLOCK_DATA_INSERT_CONCURRENCY and enforce min value Signed-off-by: bestmike007 --- .env | 2 +- src/datastore/pg-write-store.ts | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.env b/.env index 8af239eb23..729650d958 100644 --- a/.env +++ b/.env @@ -45,7 +45,7 @@ PG_APPLICATION_NAME=stacks-blockchain-api # Insert concurrency when processing new blocks # It'd better set it higher, e.g. 8 or 16, if your postgres is running on SSD and multiple CPU cores -# INSERT_CONCURRENCY=4 +# STACKS_BLOCK_DATA_INSERT_CONCURRENCY=4 # If specified, controls the Stacks Blockchain API mode. The possible values are: # * `readonly`: Runs the API endpoints without an Event Server that listens to events from a node and diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 2192be0057..608b9ae95c 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -96,7 +96,10 @@ import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; -const INSERT_CONCURRENCY = getUintEnvOrDefault('INSERT_CONCURRENCY', 4); +const STACKS_BLOCK_DATA_INSERT_CONCURRENCY = Math.max( + 1, + getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4) +); const MEMPOOL_STATS_DEBOUNCE_INTERVAL = getUintEnvOrDefault( 'MEMPOOL_STATS_DEBOUNCE_INTERVAL', 1000 @@ -257,7 +260,7 @@ export class PgWriteStore extends PgStore { if ((await this.updateBlock(sql, data.block)) !== 0) { await this.updateMinerRewards(sql, data.minerRewards); if (batchedTxData.length > 0) { - const q = new PQueue({ concurrency: INSERT_CONCURRENCY }); + const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); const enqueue = (task: Parameters[0]) => void q.add(task); enqueue(() => this.updateTx( @@ -2164,7 +2167,7 @@ export class PgWriteStore extends PgStore { `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` ); } - const q = new PQueue({ concurrency: INSERT_CONCURRENCY }); + const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); const enqueue = (task: Parameters[0]) => void q.add(task); enqueue(() => this.updateStxEvents(sql, txs)); enqueue(() => this.updatePrincipalStxTxs(sql, txs)); From c75f22057e4c64f7752e03095b8241553a61bacf Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 13:56:57 -0600 Subject: [PATCH 5/6] chore: fix switch-case fall-through Signed-off-by: bestmike007 --- src/datastore/pg-write-store.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 608b9ae95c..4110adf6b5 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -776,18 +776,20 @@ export class PgWriteStore extends PgStore { case 'pox2_events': assert('pox2Events' in entry); events = entry.pox2Events; + break; case 'pox3_events': assert('pox3Events' in entry); events = entry.pox3Events; + break; case 'pox4_events': assert('pox4Events' in entry); events = entry.pox4Events; - } - if (events == null) { - throw new Error(`unknown pox table: ${poxTable}`); + break; + default: + throw new Error(`unknown pox table: ${poxTable}`); } const tx = entry.tx; - for (const event of events) { + for (const event of events ?? []) { const value: PoxSyntheticEventInsertValues = { event_index: event.event_index, tx_id: event.tx_id, From b005dd4efe4df48668950dc9c94f5a20cc2908b2 Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Wed, 10 Jan 2024 14:05:56 -0600 Subject: [PATCH 6/6] chore: fix env entry wording Signed-off-by: bestmike007 --- .env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env b/.env index 729650d958..5a5fa4bd3e 100644 --- a/.env +++ b/.env @@ -44,7 +44,7 @@ PG_APPLICATION_NAME=stacks-blockchain-api # PG_CONNECTION_POOL_MAX=10 # Insert concurrency when processing new blocks -# It'd better set it higher, e.g. 8 or 16, if your postgres is running on SSD and multiple CPU cores +# If your PostgreSQL is operating on SSD and has multiple CPU cores, consider raising this value, for instance, to 8 or 16. # STACKS_BLOCK_DATA_INSERT_CONCURRENCY=4 # If specified, controls the Stacks Blockchain API mode. The possible values are: