diff --git a/.env b/.env index 5a5fa4bd3e..9f6393cee7 100644 --- a/.env +++ b/.env @@ -43,10 +43,6 @@ PG_APPLICATION_NAME=stacks-blockchain-api # Limit to how many concurrent connections can be created, defaults to 10 # PG_CONNECTION_POOL_MAX=10 -# Insert concurrency when processing new blocks -# If your PostgreSQL is operating on SSD and has multiple CPU cores, consider raising this value, for instance, to 8 or 16. -# STACKS_BLOCK_DATA_INSERT_CONCURRENCY=4 - # If specified, controls the Stacks Blockchain API mode. The possible values are: # * `readonly`: Runs the API endpoints without an Event Server that listens to events from a node and # writes them to the local database. The API will only read data from the PG database diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 0de9723d16..86ee6f62e0 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1,5 +1,3 @@ -import * as assert from 'assert'; -import PQueue from 'p-queue'; import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers'; import { DbBlock, @@ -40,6 +38,7 @@ import { BurnchainRewardInsertValues, TxInsertValues, MempoolTxInsertValues, + MempoolTxQueryResult, SmartContractInsertValues, BnsNameInsertValues, BnsNamespaceInsertValues, @@ -67,8 +66,10 @@ import { setTotalBlockUpdateDataExecutionCost, convertTxQueryResultToDbMempoolTx, markBlockUpdateDataAsNonCanonical, + MEMPOOL_TX_COLUMNS, MICROBLOCK_COLUMNS, parseBlockQueryResult, + parseMempoolTxQueryResult, parseMicroblockQueryResult, parseTxQueryResult, TX_COLUMNS, @@ -95,10 +96,6 @@ import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; -const STACKS_BLOCK_DATA_INSERT_CONCURRENCY = Math.max( - 1, - getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4) -); const MEMPOOL_STATS_DEBOUNCE_INTERVAL = getUintEnvOrDefault( 'MEMPOOL_STATS_DEBOUNCE_INTERVAL', 1000 @@ -258,30 +255,20 @@ export class PgWriteStore extends PgStore { } if ((await this.updateBlock(sql, data.block)) !== 0) { await this.updateMinerRewards(sql, data.minerRewards); - if (batchedTxData.length > 0) { - const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); - const enqueue = (task: Parameters[0]) => void q.add(task); - enqueue(() => - this.updateTx( - sql, - batchedTxData.map(b => b.tx) - ) - ); - enqueue(() => this.updateStxEvents(sql, batchedTxData)); - enqueue(() => this.updatePrincipalStxTxs(sql, batchedTxData)); - enqueue(() => this.updateSmartContractEvents(sql, batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', batchedTxData)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', batchedTxData)); - enqueue(() => this.updateStxLockEvents(sql, batchedTxData)); - enqueue(() => this.updateFtEvents(sql, batchedTxData)); - for (const entry of batchedTxData) { - enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents)); - enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); - enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); - enqueue(() => this.updateNames(sql, entry.tx, entry.names)); - } - await q.onIdle(); + for (const entry of batchedTxData) { + await this.updateTx(sql, entry.tx); + await this.updateStxEvents(sql, entry.tx, entry.stxEvents); + await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); + await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); + await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); + await this.updateFtEvents(sql, entry.tx, entry.ftEvents); + await this.updateNftEvents(sql, entry.tx, entry.nftEvents); + await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); + await this.updateNamespaces(sql, entry.tx, entry.namespaces); + await this.updateNames(sql, entry.tx, entry.names); } const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); if (mempoolGarbageResults.deletedTxs.length > 0) { @@ -758,37 +745,14 @@ export class PgWriteStore extends PgStore { logger.info('Updated block zero boot data', tablesUpdates); } - async updatePoxSyntheticEvents< - T extends PoxSyntheticEventTable, - Entry extends { tx: DbTx } & ('pox2_events' extends T - ? { pox2Events: DbPoxSyntheticEvent[] } - : 'pox3_events' extends T - ? { pox3Events: DbPoxSyntheticEvent[] } - : 'pox4_events' extends T - ? { pox4Events: DbPoxSyntheticEvent[] } - : never) - >(sql: PgSqlClient, poxTable: T, entries: Entry[]) { - const values: PoxSyntheticEventInsertValues[] = []; - for (const entry of entries) { - let events: DbPoxSyntheticEvent[] | null = null; - switch (poxTable) { - case 'pox2_events': - assert('pox2Events' in entry); - events = entry.pox2Events; - break; - case 'pox3_events': - assert('pox3Events' in entry); - events = entry.pox3Events; - break; - case 'pox4_events': - assert('pox4Events' in entry); - events = entry.pox4Events; - break; - default: - throw new Error(`unknown pox table: ${poxTable}`); - } - const tx = entry.tx; - for (const event of events ?? []) { + async updatePoxSyntheticEvents( + sql: PgSqlClient, + tx: DbTx, + poxTable: PoxSyntheticEventTable, + events: DbPoxSyntheticEvent[] + ) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values = batch.map(event => { const value: PoxSyntheticEventInsertValues = { event_index: event.event_index, tx_id: event.tx_id, @@ -821,7 +785,6 @@ export class PgWriteStore extends PgStore { reward_cycle: null, amount_ustx: null, }; - // Set event-specific columns switch (event.name) { case SyntheticPoxEventName.HandleUnlock: { @@ -898,78 +861,63 @@ export class PgWriteStore extends PgStore { ); } } - values.push(value); - } - } - for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { - const res = await sql` - INSERT INTO ${sql(String(poxTable))} ${sql(batch)} + return value; + }); + await sql` + INSERT INTO ${sql(poxTable)} ${sql(values)} `; - assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } - async updateStxLockEvents( - sql: PgSqlClient, - entries: { tx: DbTx; stxLockEvents: DbStxLockEvent[] }[] - ) { - const values: StxLockEventInsertValues[] = []; - for (const { tx, stxLockEvents } of entries) { - for (const event of stxLockEvents) { - values.push({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - locked_amount: event.locked_amount.toString(), - unlock_height: event.unlock_height, - locked_address: event.locked_address, - contract_name: event.contract_name, - }); - } - } - for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { - const res = await sql` - INSERT INTO stx_lock_events ${sql(batch)} + async updateStxLockEvents(sql: PgSqlClient, tx: DbTx, events: DbStxLockEvent[]) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: StxLockEventInsertValues[] = batch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + locked_amount: event.locked_amount.toString(), + unlock_height: event.unlock_height, + locked_address: event.locked_address, + contract_name: event.contract_name, + })); + await sql` + INSERT INTO stx_lock_events ${sql(values)} `; - assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } - async updateStxEvents(sql: PgSqlClient, entries: { tx: DbTx; stxEvents: DbStxEvent[] }[]) { - const values: StxEventInsertValues[] = []; - for (const { tx, stxEvents } of entries) { - for (const event of stxEvents) { - values.push({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - asset_event_type_id: event.asset_event_type_id, - sender: event.sender ?? null, - recipient: event.recipient ?? null, - amount: event.amount, - memo: event.memo ?? null, - }); - } - } - for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { + async updateStxEvents(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: StxEventInsertValues[] = eventBatch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + asset_event_type_id: event.asset_event_type_id, + sender: event.sender ?? null, + recipient: event.recipient ?? null, + amount: event.amount, + memo: event.memo ?? null, + })); const res = await sql` - INSERT INTO stx_events ${sql(batch)} + INSERT INTO stx_events ${sql(values)} `; - assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); + if (res.count !== eventBatch.length) { + throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); + } } } @@ -977,43 +925,45 @@ export class PgWriteStore extends PgStore { * Update the `principal_stx_tx` table with the latest `tx_id`s that resulted in a STX * transfer relevant to a principal (stx address or contract id). * @param sql - DB client - * @param entries - list of tx and stxEvents + * @param tx - Transaction + * @param events - Transaction STX events */ - async updatePrincipalStxTxs(sql: PgSqlClient, entries: { tx: DbTx; stxEvents: DbStxEvent[] }[]) { - const values: PrincipalStxTxsInsertValues[] = []; - for (const { tx, stxEvents } of entries) { - const principals = new Set( - [ - tx.sender_address, - tx.token_transfer_recipient_address, - tx.contract_call_contract_id, - tx.smart_contract_contract_id, - ].filter((p): p is string => !!p) - ); - for (const event of stxEvents) { - if (event.sender) principals.add(event.sender); - if (event.recipient) principals.add(event.recipient); - } - for (const principal of principals) { - values.push({ - principal: principal, - tx_id: tx.tx_id, - block_height: tx.block_height, - index_block_hash: tx.index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - tx_index: tx.tx_index, - canonical: tx.canonical, - microblock_canonical: tx.microblock_canonical, - }); - } - } - - for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { + async updatePrincipalStxTxs(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { + const insertPrincipalStxTxs = async (principals: string[]) => { + principals = [...new Set(principals)]; // Remove duplicates + const values: PrincipalStxTxsInsertValues[] = principals.map(principal => ({ + principal: principal, + tx_id: tx.tx_id, + block_height: tx.block_height, + index_block_hash: tx.index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + tx_index: tx.tx_index, + canonical: tx.canonical, + microblock_canonical: tx.microblock_canonical, + })); await sql` - INSERT INTO principal_stx_txs ${sql(eventBatch)} + INSERT INTO principal_stx_txs ${sql(values)} ON CONFLICT ON CONSTRAINT unique_principal_tx_id_index_block_hash_microblock_hash DO NOTHING `; + }; + // Insert tx data + await insertPrincipalStxTxs( + [ + tx.sender_address, + tx.token_transfer_recipient_address, + tx.contract_call_contract_id, + tx.smart_contract_contract_id, + ].filter((p): p is string => !!p) // Remove undefined + ); + // Insert stx_event data + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { + const principals: string[] = []; + for (const event of eventBatch) { + if (event.sender) principals.push(event.sender); + if (event.recipient) principals.push(event.recipient); + } + await insertPrincipalStxTxs(principals); } } @@ -1052,10 +1002,9 @@ export class PgWriteStore extends PgStore { ON CONFLICT ON CONSTRAINT unique_name_zonefile_hash_tx_id_index_block_hash DO UPDATE SET zonefile = EXCLUDED.zonefile `; - assert( - result.count === zonefileValues.length, - `Expecting ${result.count} zonefile inserts, got ${zonefileValues.length}` - ); + if (result.count !== zonefileValues.length) { + throw new Error(`Expected ${result.count} zonefile inserts, got ${zonefileValues.length}`); + } } async updateBatchSubdomains( @@ -1111,10 +1060,9 @@ export class PgWriteStore extends PgStore { microblock_sequence = EXCLUDED.microblock_sequence, microblock_canonical = EXCLUDED.microblock_canonical `; - assert( - result.count === subdomainValues.length, - `Expecting ${subdomainValues.length} subdomain inserts, got ${result.count}` - ); + if (result.count !== subdomainValues.length) { + throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`); + } } async resolveBnsSubdomains( @@ -1134,34 +1082,28 @@ export class PgWriteStore extends PgStore { }); } - async updateFtEvents(sql: PgSqlClient, entries: { tx: DbTx; ftEvents: DbFtEvent[] }[]) { - const values: FtEventInsertValues[] = []; - for (const { tx, ftEvents } of entries) { - for (const event of ftEvents) { - values.push({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - asset_event_type_id: event.asset_event_type_id, - sender: event.sender ?? null, - recipient: event.recipient ?? null, - asset_identifier: event.asset_identifier, - amount: event.amount.toString(), - }); - } - } - for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { - const res = await sql` - INSERT INTO ft_events ${sql(batch)} + async updateFtEvents(sql: PgSqlClient, tx: DbTx, events: DbFtEvent[]) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: FtEventInsertValues[] = batch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + asset_event_type_id: event.asset_event_type_id, + sender: event.sender ?? null, + recipient: event.recipient ?? null, + asset_identifier: event.asset_identifier, + amount: event.amount.toString(), + })); + await sql` + INSERT INTO ft_events ${sql(values)} `; - assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); } } @@ -1260,35 +1202,29 @@ export class PgWriteStore extends PgStore { } } - async updateSmartContractEvents( - sql: PgSqlClient, - entries: { tx: DbTx; contractLogEvents: DbSmartContractEvent[] }[] - ) { - const values: SmartContractEventInsertValues[] = []; - for (const { tx, contractLogEvents } of entries) { - for (const event of contractLogEvents) { - values.push({ - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - contract_identifier: event.contract_identifier, - topic: event.topic, - value: event.value, - }); - } - } - for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) { + async updateSmartContractEvents(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) { + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: SmartContractEventInsertValues[] = eventBatch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + contract_identifier: event.contract_identifier, + topic: event.topic, + value: event.value, + })); const res = await sql` - INSERT INTO contract_logs ${sql(batch)} + INSERT INTO contract_logs ${sql(values)} `; - assert(res.count === batch.length, `Expecting ${batch.length} inserts, got ${res.count}`); + if (res.count !== eventBatch.length) { + throw new Error(`Expected ${eventBatch.length} inserts, got ${res.count}`); + } } } @@ -1576,9 +1512,8 @@ export class PgWriteStore extends PgStore { } } - async updateTx(sql: PgSqlClient, txs: DbTxRaw | DbTxRaw[]): Promise { - if (!Array.isArray(txs)) txs = [txs]; - const values: TxInsertValues[] = txs.map(tx => ({ + async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise { + const values: TxInsertValues = { tx_id: tx.tx_id, raw_tx: tx.raw_tx, tx_index: tx.tx_index, @@ -1634,17 +1569,12 @@ export class PgWriteStore extends PgStore { execution_cost_runtime: tx.execution_cost_runtime, execution_cost_write_count: tx.execution_cost_write_count, execution_cost_write_length: tx.execution_cost_write_length, - })); - - let count = 0; - for (const eventBatch of batchIterate(values, INSERT_BATCH_SIZE)) { - const res = await sql` - INSERT INTO txs ${sql(eventBatch)} - ON CONFLICT ON CONSTRAINT unique_tx_id_index_block_hash_microblock_hash DO NOTHING - `; - count += res.count; - } - return count; + }; + const result = await sql` + INSERT INTO txs ${sql(values)} + ON CONFLICT ON CONSTRAINT unique_tx_id_index_block_hash_microblock_hash DO NOTHING + `; + return result.count; } async insertDbMempoolTxs( @@ -2007,10 +1937,9 @@ export class PgWriteStore extends PgStore { const res = await sql` INSERT INTO token_offering_locked ${sql(lockedInfos, 'address', 'value', 'block')} `; - assert( - res.count === lockedInfos.length, - `Expecting ${lockedInfos.length} inserts, got ${res.count}` - ); + if (res.count !== lockedInfos.length) { + throw new Error(`Expected ${lockedInfos.length} inserts, got ${res.count}`); + } } catch (e: any) { logger.error(e, `Locked Info errors ${e.message}`); throw e; @@ -2137,33 +2066,26 @@ export class PgWriteStore extends PgStore { } } - if (txs.length > 0) { - const rowsUpdated = await this.updateTx( - sql, - txs.map(t => t.tx) - ); - if (rowsUpdated !== txs.length) { + for (const entry of txs) { + const rowsUpdated = await this.updateTx(sql, entry.tx); + if (rowsUpdated !== 1) { throw new Error( - `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}, expecting ${txs.length}` + `Unexpected amount of rows updated for microblock tx insert: ${rowsUpdated}` ); } - const q = new PQueue({ concurrency: STACKS_BLOCK_DATA_INSERT_CONCURRENCY }); - const enqueue = (task: Parameters[0]) => void q.add(task); - enqueue(() => this.updateStxEvents(sql, txs)); - enqueue(() => this.updatePrincipalStxTxs(sql, txs)); - enqueue(() => this.updateSmartContractEvents(sql, txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox2_events', txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox3_events', txs)); - enqueue(() => this.updatePoxSyntheticEvents(sql, 'pox4_events', txs)); - enqueue(() => this.updateStxLockEvents(sql, txs)); - enqueue(() => this.updateFtEvents(sql, txs)); - for (const entry of txs) { - enqueue(() => this.updateNftEvents(sql, entry.tx, entry.nftEvents, true)); - enqueue(() => this.updateSmartContracts(sql, entry.tx, entry.smartContracts)); - enqueue(() => this.updateNamespaces(sql, entry.tx, entry.namespaces)); - enqueue(() => this.updateNames(sql, entry.tx, entry.names)); - } - await q.onIdle(); + + await this.updateStxEvents(sql, entry.tx, entry.stxEvents); + await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); + await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); + await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); + await this.updateFtEvents(sql, entry.tx, entry.ftEvents); + await this.updateNftEvents(sql, entry.tx, entry.nftEvents, true); + await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); + await this.updateNamespaces(sql, entry.tx, entry.namespaces); + await this.updateNames(sql, entry.tx, entry.names); } } diff --git a/src/event-replay/parquet-based/importers/new-block-importer.ts b/src/event-replay/parquet-based/importers/new-block-importer.ts index a0b3ba4f82..ff7e76a005 100644 --- a/src/event-replay/parquet-based/importers/new-block-importer.ts +++ b/src/event-replay/parquet-based/importers/new-block-importer.ts @@ -376,7 +376,7 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updateStxLockEvents(db.sql, [entry]); + await db.updateStxLockEvents(db.sql, entry.tx, entry.stxLockEvents); } }; @@ -386,19 +386,19 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertPox2Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, 'pox2_events', [entry]); + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox2_events', entry.pox2Events); } }; const insertPox3Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, 'pox3_events', [entry]); + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox3_events', entry.pox3Events); } }; const insertPox4Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - await db.updatePoxSyntheticEvents(db.sql, 'pox4_events', [entry]); + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox4_events', entry.pox4Events); } }; diff --git a/src/tests/datastore-tests.ts b/src/tests/datastore-tests.ts index 9faac56268..e5621e9bf4 100644 --- a/src/tests/datastore-tests.ts +++ b/src/tests/datastore-tests.ts @@ -195,7 +195,7 @@ describe('postgres datastore', () => { createStxLockEvent('addrA', 222n, 1), createStxLockEvent('addrB', 333n, 1), ]; - await db.updateStxLockEvents(client, [{ tx, stxLockEvents }]); + await db.updateStxLockEvents(client, tx, stxLockEvents); await db.updateTx(client, tx); await db.updateTx(client, tx2); @@ -3546,7 +3546,7 @@ describe('postgres datastore', () => { } // insert stx lock events directly - await db.updateStxLockEvents(client, [{ tx: tx1, stxLockEvents: [stxLockEvent1] }]); + await db.updateStxLockEvents(client, tx1, [stxLockEvent1]); const block5: DbBlock = { block_hash: '0x55', diff --git a/src/tests/search-tests.ts b/src/tests/search-tests.ts index ada372a422..312c63de06 100644 --- a/src/tests/search-tests.ts +++ b/src/tests/search-tests.ts @@ -773,7 +773,7 @@ describe('search tests', () => { recipient: addr5, sender: 'none', }; - await db.updateFtEvents(client, [{ tx: stxTx1, ftEvents: [ftEvent1] }]); + await db.updateFtEvents(client, stxTx1, [ftEvent1]); // test address as a ft event recipient const searchResult5 = await supertest(api.server).get(`/extended/v1/search/${addr5}`); @@ -801,7 +801,7 @@ describe('search tests', () => { recipient: 'none', sender: addr6, }; - await db.updateFtEvents(client, [{ tx: stxTx1, ftEvents: [ftEvent2] }]); + await db.updateFtEvents(client, stxTx1, [ftEvent2]); // test address as a ft event sender const searchResult6 = await supertest(api.server).get(`/extended/v1/search/${addr6}`);