diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 92859f119..cd1563733 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4524,4 +4524,13 @@ export class PgStore extends BasePgStore { `; if (result.count) return result[0]; } + + async getStacksBlockCountAtPreviousBurnBlock(): Promise { + const result = await this.sql<{ count: string }[]>` + SELECT COUNT(*) AS count + FROM blocks + WHERE burn_block_height = (SELECT burn_block_height - 1 FROM chain_tip) AND canonical = TRUE + `; + return parseInt(result[0]?.count ?? '0'); + } } diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index bf4691e53..156c69504 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1780,6 +1780,12 @@ export class PgWriteStore extends PgStore { }); } + async updateBurnChainBlockHeight(args: { blockHeight: number }): Promise { + await this.sql` + UPDATE chain_tip SET burn_block_height = GREATEST(${args.blockHeight}, burn_block_height) + `; + } + async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise { const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({ canonical: true, diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 64fbf4f45..d1da30b23 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -130,6 +130,7 @@ async function handleBurnBlockMessage( burnchainBlockHeight: burnBlockMsg.burn_block_height, slotHolders: slotHolders, }); + await db.updateBurnChainBlockHeight({ blockHeight: burnBlockMsg.burn_block_height }); } async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise { @@ -631,18 +632,32 @@ interface EventMessageHandler { handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise | void; } -function createMessageProcessorQueue(): EventMessageHandler { +function createMessageProcessorQueue(db: PgWriteStore): EventMessageHandler { // Create a promise queue so that only one message is handled at a time. const processorQueue = new PQueue({ concurrency: 1 }); - let eventTimer: prom.Histogram<'event'> | undefined; + let metrics: + | { + eventTimer: prom.Histogram; + blocksInPreviousBurnBlock: prom.Gauge; + } + | undefined; if (isProdEnv) { - eventTimer = new prom.Histogram({ - name: 'stacks_event_ingestion_timers', - help: 'Event ingestion timers', - labelNames: ['event'], - buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes - }); + metrics = { + eventTimer: new prom.Histogram({ + name: 'stacks_event_ingestion_timers', + help: 'Event ingestion timers', + labelNames: ['event'], + buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes + }), + blocksInPreviousBurnBlock: new prom.Gauge({ + name: 'stacks_blocks_in_previous_burn_block', + help: 'Number of Stacks blocks produced in the previous burn block', + async collect() { + this.set(await db.getStacksBlockCountAtPreviousBurnBlock()); + }, + }), + }; } const observeEvent = async (event: string, fn: () => Promise) => { @@ -651,7 +666,7 @@ function createMessageProcessorQueue(): EventMessageHandler { await fn(); } finally { const elapsedMs = timer.getElapsed(); - eventTimer?.observe({ event }, elapsedMs); + metrics?.eventTimer.observe({ event }, elapsedMs); } }; @@ -738,7 +753,7 @@ export async function startEventServer(opts: { serverPort?: number; }): Promise { const db = opts.datastore; - const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(); + const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(db); let eventHost = opts.serverHost ?? process.env['STACKS_CORE_EVENT_HOST']; const eventPort = opts.serverPort ?? parseInt(process.env['STACKS_CORE_EVENT_PORT'] ?? '', 10);