From 394226604fc9ebf8ff41451a9bc6ed8b7d08a3fd Mon Sep 17 00:00:00 2001 From: ahonn Date: Fri, 17 May 2024 12:12:04 +1000 Subject: [PATCH 1/5] fix: skip add repeatable job if the next run is too soon --- src/services/utxo.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/services/utxo.ts b/src/services/utxo.ts index 0192ba38..6ada231e 100644 --- a/src/services/utxo.ts +++ b/src/services/utxo.ts @@ -122,9 +122,14 @@ export default class UTXOSyncer extends BaseQueueWorker job.name === btcAddress); + if (repeatableJob) { - // remove the existing repeatable job to update the start date - // so the job will be processed immediately + // Skip adding repeatable job if the next run is too soon + if (repeatableJob.next < Date.now() + this.cradle.env.UTXO_SYNC_REPEAT_BASE_DURATION) { + this.cradle.logger.info(`[UTXOSyncer] Skip adding repeatable job for ${btcAddress}, next run is too soon`); + return repeatableJob; + } + // Remove the existing repeatable job to update the start date, let the job be processed immediately this.cradle.logger.info(`[UTXOSyncer] Remove existing repeatable job for ${btcAddress}`); await this.queue.removeRepeatableByKey(repeatableJob.key); } From 6dfb6587be134929fe1734a5acd08b84261c5d51 Mon Sep 17 00:00:00 2001 From: ahonn Date: Fri, 17 May 2024 12:54:01 +1000 Subject: [PATCH 2/5] feat: add no_cache query params to disable utxo/cells data cache --- src/routes/bitcoin/address.ts | 8 ++++++-- src/routes/rgbpp/address.ts | 10 +++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/routes/bitcoin/address.ts b/src/routes/bitcoin/address.ts index bfdcc64f..b43af0bb 100644 --- a/src/routes/bitcoin/address.ts +++ b/src/routes/bitcoin/address.ts @@ -77,6 +77,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType .default('true') .describe('Only return confirmed UTXOs'), min_satoshi: z.coerce.number().optional().describe('The minimum value of the UTXO in satoshi'), + no_cache: z + .enum(['true', 'false']) + .default('false') + .describe('Whether to disable cache to get utxos, default is false'), }), response: { 200: z.array(UTXO), @@ -85,10 +89,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }, async function (request) { const { address } = request.params; - const { only_confirmed, min_satoshi } = request.query; + const { only_confirmed, min_satoshi, no_cache } = request.query; let utxosCache = null; - if (env.UTXO_SYNC_DATA_CACHE_ENABLE) { + if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') { utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address); await fastify.utxoSyncer.enqueueSyncJob(address); } diff --git a/src/routes/rgbpp/address.ts b/src/routes/rgbpp/address.ts index a0902c8c..906e5344 100644 --- a/src/routes/rgbpp/address.ts +++ b/src/routes/rgbpp/address.ts @@ -40,6 +40,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType - as a hex string: '0x...' (You can pack by @ckb-lumos/codec blockchain.Script.pack({ "codeHash": "0x...", ... })) `, ), + no_cache: z + .enum(['true', 'false']) + .default('false') + .describe('Whether to disable cache to get RGB++ assets, default is false'), }), response: { 200: z.array(Cell), @@ -48,7 +52,7 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }, async (request) => { const { btc_address } = request.params; - const { type_script } = request.query; + const { type_script, no_cache } = request.query; let typeScript: Script | undefined = undefined; if (type_script) { @@ -60,14 +64,14 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType } let utxosCache = null; - if (env.UTXO_SYNC_DATA_CACHE_ENABLE) { + if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') { utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(btc_address); await fastify.utxoSyncer.enqueueSyncJob(btc_address); } const utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address: btc_address }); let rgbppCache = null; - if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) { + if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE && no_cache !== 'true') { rgbppCache = await fastify.rgbppCollector.getRgbppCellsFromCache(btc_address); await fastify.rgbppCollector.enqueueCollectJob(btc_address, utxos); } From 3103f27659d2005a3fe0d534618475543aee5dfb Mon Sep 17 00:00:00 2001 From: ahonn Date: Fri, 17 May 2024 12:54:35 +1000 Subject: [PATCH 3/5] feat: add throttle to avoid too many jobs being enqueued at the same time --- src/services/utxo.ts | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/services/utxo.ts b/src/services/utxo.ts index 6ada231e..b91ca8ab 100644 --- a/src/services/utxo.ts +++ b/src/services/utxo.ts @@ -6,6 +6,7 @@ import { z } from 'zod'; import { Job, RepeatOptions } from 'bullmq'; import * as Sentry from '@sentry/node'; import DataCache from './base/data-cache'; +import { throttle } from 'lodash'; interface IUTXOSyncRequest { btcAddress: string; @@ -119,16 +120,11 @@ export default class UTXOSyncer extends BaseQueueWorker job.name === btcAddress); if (repeatableJob) { - // Skip adding repeatable job if the next run is too soon - if (repeatableJob.next < Date.now() + this.cradle.env.UTXO_SYNC_REPEAT_BASE_DURATION) { - this.cradle.logger.info(`[UTXOSyncer] Skip adding repeatable job for ${btcAddress}, next run is too soon`); - return repeatableJob; - } // Remove the existing repeatable job to update the start date, let the job be processed immediately this.cradle.logger.info(`[UTXOSyncer] Remove existing repeatable job for ${btcAddress}`); await this.queue.removeRepeatableByKey(repeatableJob.key); @@ -148,6 +144,18 @@ export default class UTXOSyncer extends BaseQueueWorker this._enqueueSyncJob(address), 1000, { + leading: true, + }); + + /** + * Enqueue a sync job for the btc address, with a throttle to avoid too many jobs being enqueued at the same time + */ + public enqueueSyncJob(btcAddress: string) { + this.cradle.logger.info(`[UTXOSyncer] Enqueue sync job for ${btcAddress}, ${Date.now()}`); + return this.enqueueSyncJobThrottle(btcAddress); + } + public async process(job: Job): Promise { try { const { btcAddress } = job.data; From e9908d90ff38db7f33dd66055d2263547afeaad0 Mon Sep 17 00:00:00 2001 From: ahonn Date: Fri, 17 May 2024 14:07:03 +1000 Subject: [PATCH 4/5] test: update tests --- test/services/utxo.test.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/services/utxo.test.ts b/test/services/utxo.test.ts index 08e1855f..83af452e 100644 --- a/test/services/utxo.test.ts +++ b/test/services/utxo.test.ts @@ -43,8 +43,16 @@ describe('UTXOSyncer', () => { expect(spy).toHaveBeenCalled(); }); + test('enqueueSyncJob: should not be remove repeat job when enqueued duplicate jobs', async () => { + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey'); + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + expect(spy).not.toHaveBeenCalled(); + }); + test('enqueueSyncJob: should be remove repeat job when is exists', async () => { await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + await new Promise((resolve) => setTimeout(resolve, 1100)); const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey'); await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); expect(spy).toHaveBeenCalled(); From 9487426582802dad12a46b29041e1cb022ba0e1e Mon Sep 17 00:00:00 2001 From: ahonn Date: Fri, 17 May 2024 14:13:58 +1000 Subject: [PATCH 5/5] fix: fix rgbpp address assets type_script filter --- src/routes/rgbpp/address.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/routes/rgbpp/address.ts b/src/routes/rgbpp/address.ts index 906e5344..9601843a 100644 --- a/src/routes/rgbpp/address.ts +++ b/src/routes/rgbpp/address.ts @@ -79,7 +79,9 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType if (rgbppCache) { fastify.log.debug(`[RGB++] get cells from cache: ${btc_address}`); if (typeScript) { - return rgbppCache.filter((cell) => serializeScript(cell.cellOutput.type!) === serializeScript(typeScript!)); + return rgbppCache.filter( + (cell) => cell.cellOutput.type && serializeScript(cell.cellOutput.type) === serializeScript(typeScript!), + ); } return rgbppCache; }