Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize data cache update mechanism #137

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/routes/bitcoin/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
.default('true')
.describe('Only return confirmed UTXOs'),
min_satoshi: z.coerce.number().optional().describe('The minimum value of the UTXO in satoshi'),
no_cache: z
.enum(['true', 'false'])
.default('false')
.describe('Whether to disable cache to get utxos, default is false'),
}),
response: {
200: z.array(UTXO),
Expand All @@ -85,10 +89,10 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
},
async function (request) {
const { address } = request.params;
const { only_confirmed, min_satoshi } = request.query;
const { only_confirmed, min_satoshi, no_cache } = request.query;

let utxosCache = null;
if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') {
utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address);
await fastify.utxoSyncer.enqueueSyncJob(address);
}
Expand Down
14 changes: 10 additions & 4 deletions src/routes/rgbpp/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
- as a hex string: '0x...' (You can pack by @ckb-lumos/codec blockchain.Script.pack({ "codeHash": "0x...", ... }))
`,
),
no_cache: z
.enum(['true', 'false'])
.default('false')
.describe('Whether to disable cache to get RGB++ assets, default is false'),
}),
response: {
200: z.array(Cell),
Expand All @@ -48,7 +52,7 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
},
async (request) => {
const { btc_address } = request.params;
const { type_script } = request.query;
const { type_script, no_cache } = request.query;

let typeScript: Script | undefined = undefined;
if (type_script) {
Expand All @@ -60,22 +64,24 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
}

let utxosCache = null;
if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') {
utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(btc_address);
await fastify.utxoSyncer.enqueueSyncJob(btc_address);
}
const utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address: btc_address });

let rgbppCache = null;
if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) {
if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE && no_cache !== 'true') {
rgbppCache = await fastify.rgbppCollector.getRgbppCellsFromCache(btc_address);
await fastify.rgbppCollector.enqueueCollectJob(btc_address, utxos);
}

if (rgbppCache) {
fastify.log.debug(`[RGB++] get cells from cache: ${btc_address}`);
if (typeScript) {
return rgbppCache.filter((cell) => serializeScript(cell.cellOutput.type!) === serializeScript(typeScript!));
return rgbppCache.filter(
(cell) => cell.cellOutput.type && serializeScript(cell.cellOutput.type) === serializeScript(typeScript!),
);
}
return rgbppCache;
}
Expand Down
19 changes: 16 additions & 3 deletions src/services/utxo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { z } from 'zod';
import { Job, RepeatOptions } from 'bullmq';
import * as Sentry from '@sentry/node';
import DataCache from './base/data-cache';
import { throttle } from 'lodash';

interface IUTXOSyncRequest {
btcAddress: string;
Expand Down Expand Up @@ -119,12 +120,12 @@ export default class UTXOSyncer extends BaseQueueWorker<IUTXOSyncRequest, IUTXOS
return data.utxos;
}

public async enqueueSyncJob(btcAddress: string) {
private async _enqueueSyncJob(btcAddress: string) {
const jobs = await this.queue.getRepeatableJobs();
const repeatableJob = jobs.find((job) => job.name === btcAddress);

if (repeatableJob) {
// remove the existing repeatable job to update the start date
// so the job will be processed immediately
// Remove the existing repeatable job to update the start date, let the job be processed immediately
this.cradle.logger.info(`[UTXOSyncer] Remove existing repeatable job for ${btcAddress}`);
await this.queue.removeRepeatableByKey(repeatableJob.key);
}
Expand All @@ -143,6 +144,18 @@ export default class UTXOSyncer extends BaseQueueWorker<IUTXOSyncRequest, IUTXOS
);
}

private enqueueSyncJobThrottle = throttle((address) => this._enqueueSyncJob(address), 1000, {
leading: true,
});

/**
* Enqueue a sync job for the btc address, with a throttle to avoid too many jobs being enqueued at the same time
*/
public enqueueSyncJob(btcAddress: string) {
this.cradle.logger.info(`[UTXOSyncer] Enqueue sync job for ${btcAddress}, ${Date.now()}`);
return this.enqueueSyncJobThrottle(btcAddress);
}

public async process(job: Job<IUTXOSyncRequest>): Promise<IUTXOSyncJobReturn> {
try {
const { btcAddress } = job.data;
Expand Down
8 changes: 8 additions & 0 deletions test/services/utxo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ describe('UTXOSyncer', () => {
expect(spy).toHaveBeenCalled();
});

test('enqueueSyncJob: should not be remove repeat job when enqueued duplicate jobs', async () => {
await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3');
const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey');
await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3');
expect(spy).not.toHaveBeenCalled();
});

test('enqueueSyncJob: should be remove repeat job when is exists', async () => {
await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3');
await new Promise((resolve) => setTimeout(resolve, 1100));
const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey');
await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3');
expect(spy).toHaveBeenCalled();
Expand Down
Loading