From da3a95fbc8ce7f687a1fb11ebb1c027287bd720d Mon Sep 17 00:00:00 2001 From: Lyka Labrada Date: Tue, 7 Nov 2023 09:59:47 +0800 Subject: [PATCH 1/6] chore(whale-api): add a retry for missing previous block txns --- .../src/module.indexer/rpc.block.provider.ts | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/apps/whale-api/src/module.indexer/rpc.block.provider.ts b/apps/whale-api/src/module.indexer/rpc.block.provider.ts index 369ba308d6..95d04a2b0c 100644 --- a/apps/whale-api/src/module.indexer/rpc.block.provider.ts +++ b/apps/whale-api/src/module.indexer/rpc.block.provider.ts @@ -90,7 +90,28 @@ export class RPCBlockProvider { if (await RPCBlockProvider.isBestChain(indexed, nextBlock)) { await this.index(nextBlock) } else { - await this.invalidate(indexed.hash, indexed.height) + // Retry indexing the previous block before invalidating it + const MAX_RETRIES = 3 + let prevBlockIndexedSuccessfully = false + let prevBlockRetryCount = 0 + + while (!prevBlockIndexedSuccessfully && prevBlockRetryCount < MAX_RETRIES) { + try { + this.logger.log(`Retrying indexing block ${indexed.height} - Attempt ${prevBlockRetryCount + 1}`) + const previousBlock = await this.client.blockchain.getBlock(indexed.hash, 2) + await this.index(previousBlock) + prevBlockIndexedSuccessfully = true + } catch (error) { + this.logger.error(`Error indexing the previous block - ${indexed.height}`) + prevBlockRetryCount++ + } + } + + if (!prevBlockIndexedSuccessfully) { + // If all retries for indexing the previous block fail, invalidate it + await this.invalidate(indexed.hash, indexed.height) + this.logger.error('All retries for indexing the previous block have failed. The block has been invalidated.') + } } return true } @@ -100,7 +121,7 @@ export class RPCBlockProvider { * @param {defid.Block} nextBlock to check previous block hash */ private static async isBestChain (indexed: Block, nextBlock: defid.Block): Promise { - return nextBlock.previousblockhash === indexed.hash + return indexed.hash === nextBlock.previousblockhash } public async indexGenesis (): Promise { From a3b83c3838c6fdf623124e4d9fd42854e3c7c3ae Mon Sep 17 00:00:00 2001 From: Lyka Labrada Date: Wed, 8 Nov 2023 13:34:07 +0800 Subject: [PATCH 2/6] set indexing to true if status mapper is set to error --- .../src/module.indexer/rpc.block.provider.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/whale-api/src/module.indexer/rpc.block.provider.ts b/apps/whale-api/src/module.indexer/rpc.block.provider.ts index 95d04a2b0c..13a92085c0 100644 --- a/apps/whale-api/src/module.indexer/rpc.block.provider.ts +++ b/apps/whale-api/src/module.indexer/rpc.block.provider.ts @@ -44,6 +44,15 @@ export class RPCBlockProvider { } try { + const status = await this.statusMapper.get() + this.logger.log(`[Start Cycle] - status: ${JSON.stringify(status)}`) + if (status?.status === Status.ERROR) { + this.logger.log('Setting status mapper to previous block...') + const currentBlock = await this.client.blockchain.getBlock(status.hash, 2) + const prevBlock = await this.client.blockchain.getBlock(currentBlock.previousblockhash, 2) + await this.statusMapper.put(prevBlock.hash, prevBlock.height, Status.INDEXING) + } + await this.cleanup() this.indexing = await this.synchronize() } catch (err) { @@ -88,8 +97,10 @@ export class RPCBlockProvider { const nextBlock = await this.client.blockchain.getBlock(nextHash, 2) if (await RPCBlockProvider.isBestChain(indexed, nextBlock)) { + this.logger.log(`[Synchronize] BEST CHAIN. Indexing next block ${nextBlock.height}...`) await this.index(nextBlock) } else { + this.logger.log(`[Synchronize] NOT BEST CHAIN. Indexing prev block ${indexed.height}...`) // Retry indexing the previous block before invalidating it const MAX_RETRIES = 3 let prevBlockIndexedSuccessfully = false From 020c5ae3241d5e95cab6848dd19db658ea3781c0 Mon Sep 17 00:00:00 2001 From: Lyka Labrada Date: Fri, 10 Nov 2023 13:03:44 +0800 Subject: [PATCH 3/6] add a new REINDEX status --- .../src/module.indexer/rpc.block.provider.ts | 18 ++++++++++++------ apps/whale-api/src/module.indexer/status.ts | 3 ++- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/apps/whale-api/src/module.indexer/rpc.block.provider.ts b/apps/whale-api/src/module.indexer/rpc.block.provider.ts index 13a92085c0..b35a09b2e2 100644 --- a/apps/whale-api/src/module.indexer/rpc.block.provider.ts +++ b/apps/whale-api/src/module.indexer/rpc.block.provider.ts @@ -6,6 +6,7 @@ import { Block, BlockMapper } from '../module.model/block' import { IndexStatusMapper, Status } from './status' import { waitForCondition } from '@defichain/testcontainers/dist/utils' import { blockchain as defid, RpcApiError } from '@defichain/jellyfish-api-core' +import { NotFoundIndexerError } from './error' @Injectable() export class RPCBlockProvider { @@ -45,11 +46,11 @@ export class RPCBlockProvider { try { const status = await this.statusMapper.get() - this.logger.log(`[Start Cycle] - status: ${JSON.stringify(status)}`) - if (status?.status === Status.ERROR) { - this.logger.log('Setting status mapper to previous block...') + this.logger.log(`[Start Indexing] - status: ${status?.status ?? ''}`) + if (status?.status === Status.REINDEX) { const currentBlock = await this.client.blockchain.getBlock(status.hash, 2) const prevBlock = await this.client.blockchain.getBlock(currentBlock.previousblockhash, 2) + this.logger.log(`Reindexing previous block ${prevBlock.height} ...`) await this.statusMapper.put(prevBlock.hash, prevBlock.height, Status.INDEXING) } @@ -97,10 +98,10 @@ export class RPCBlockProvider { const nextBlock = await this.client.blockchain.getBlock(nextHash, 2) if (await RPCBlockProvider.isBestChain(indexed, nextBlock)) { - this.logger.log(`[Synchronize] BEST CHAIN. Indexing next block ${nextBlock.height}...`) + this.logger.log(`[Synchronize - best chain] Indexing next block ${nextBlock.height}...`) await this.index(nextBlock) } else { - this.logger.log(`[Synchronize] NOT BEST CHAIN. Indexing prev block ${indexed.height}...`) + this.logger.log(`[Synchronize - not the best chain] Indexing prev block ${indexed.height}...`) // Retry indexing the previous block before invalidating it const MAX_RETRIES = 3 let prevBlockIndexedSuccessfully = false @@ -153,6 +154,7 @@ export class RPCBlockProvider { switch (status.status) { case Status.INVALIDATED: case Status.INDEXED: + case Status.REINDEX: return } @@ -181,7 +183,11 @@ export class RPCBlockProvider { await this.indexer.invalidate(hash) await this.statusMapper.put(hash, height, Status.INVALIDATED) } catch (err) { - await this.statusMapper.put(hash, height, Status.ERROR) + if (err instanceof NotFoundIndexerError) { + await this.statusMapper.put(hash, height, Status.REINDEX) + } else { + await this.statusMapper.put(hash, height, Status.ERROR) + } throw err } } diff --git a/apps/whale-api/src/module.indexer/status.ts b/apps/whale-api/src/module.indexer/status.ts index d170a9ad9a..f3d5751a27 100644 --- a/apps/whale-api/src/module.indexer/status.ts +++ b/apps/whale-api/src/module.indexer/status.ts @@ -36,7 +36,8 @@ export enum Status { INDEXED = 'INDEXED', INVALIDATING = 'INVALIDATING', INVALIDATED = 'INVALIDATED', - ERROR = 'ERROR' + ERROR = 'ERROR', + REINDEX = 'REINDEX' } export interface IndexStatus extends Model { From 4528c67603d5d033850b6072be7c46be3fb1d7e4 Mon Sep 17 00:00:00 2001 From: Lyka Labrada Date: Fri, 10 Nov 2023 13:42:31 +0800 Subject: [PATCH 4/6] remove index retry for previous block --- .../src/module.indexer/rpc.block.provider.ts | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/apps/whale-api/src/module.indexer/rpc.block.provider.ts b/apps/whale-api/src/module.indexer/rpc.block.provider.ts index b35a09b2e2..0b55829c58 100644 --- a/apps/whale-api/src/module.indexer/rpc.block.provider.ts +++ b/apps/whale-api/src/module.indexer/rpc.block.provider.ts @@ -45,15 +45,6 @@ export class RPCBlockProvider { } try { - const status = await this.statusMapper.get() - this.logger.log(`[Start Indexing] - status: ${status?.status ?? ''}`) - if (status?.status === Status.REINDEX) { - const currentBlock = await this.client.blockchain.getBlock(status.hash, 2) - const prevBlock = await this.client.blockchain.getBlock(currentBlock.previousblockhash, 2) - this.logger.log(`Reindexing previous block ${prevBlock.height} ...`) - await this.statusMapper.put(prevBlock.hash, prevBlock.height, Status.INDEXING) - } - await this.cleanup() this.indexing = await this.synchronize() } catch (err) { @@ -98,32 +89,9 @@ export class RPCBlockProvider { const nextBlock = await this.client.blockchain.getBlock(nextHash, 2) if (await RPCBlockProvider.isBestChain(indexed, nextBlock)) { - this.logger.log(`[Synchronize - best chain] Indexing next block ${nextBlock.height}...`) await this.index(nextBlock) } else { - this.logger.log(`[Synchronize - not the best chain] Indexing prev block ${indexed.height}...`) - // Retry indexing the previous block before invalidating it - const MAX_RETRIES = 3 - let prevBlockIndexedSuccessfully = false - let prevBlockRetryCount = 0 - - while (!prevBlockIndexedSuccessfully && prevBlockRetryCount < MAX_RETRIES) { - try { - this.logger.log(`Retrying indexing block ${indexed.height} - Attempt ${prevBlockRetryCount + 1}`) - const previousBlock = await this.client.blockchain.getBlock(indexed.hash, 2) - await this.index(previousBlock) - prevBlockIndexedSuccessfully = true - } catch (error) { - this.logger.error(`Error indexing the previous block - ${indexed.height}`) - prevBlockRetryCount++ - } - } - - if (!prevBlockIndexedSuccessfully) { - // If all retries for indexing the previous block fail, invalidate it - await this.invalidate(indexed.hash, indexed.height) - this.logger.error('All retries for indexing the previous block have failed. The block has been invalidated.') - } + await this.invalidate(indexed.hash, indexed.height) } return true } From 388306540e01dabffe3744c30be41481fae1ae6b Mon Sep 17 00:00:00 2001 From: Lyka Labrada Date: Fri, 10 Nov 2023 14:09:50 +0800 Subject: [PATCH 5/6] add log inside not best chain case --- apps/whale-api/src/module.indexer/rpc.block.provider.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/whale-api/src/module.indexer/rpc.block.provider.ts b/apps/whale-api/src/module.indexer/rpc.block.provider.ts index 0b55829c58..bb66b2d231 100644 --- a/apps/whale-api/src/module.indexer/rpc.block.provider.ts +++ b/apps/whale-api/src/module.indexer/rpc.block.provider.ts @@ -91,6 +91,7 @@ export class RPCBlockProvider { if (await RPCBlockProvider.isBestChain(indexed, nextBlock)) { await this.index(nextBlock) } else { + this.logger.error(`indexedBlock{height: ${indexed.height}, hash: ${indexed.hash}}, nextBlock{height: ${nextBlock.height}, prevHash: ${nextBlock.previousblockhash}}`) await this.invalidate(indexed.hash, indexed.height) } return true From 9c6356fb88951b750a6830f5373dc74d7def8c74 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 10 Nov 2023 15:19:10 +0800 Subject: [PATCH 6/6] use log instead err, retry findPair to update pathmapping --- .../src/module.indexer/model/dftx/composite.swap.ts | 7 +------ .../module.indexer/model/dftx/pool.pair.path.mapping.ts | 9 +++++++-- .../whale-api/src/module.indexer/model/dftx/pool.swap.ts | 8 +------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/apps/whale-api/src/module.indexer/model/dftx/composite.swap.ts b/apps/whale-api/src/module.indexer/model/dftx/composite.swap.ts index e8358b973a..29306bc03d 100644 --- a/apps/whale-api/src/module.indexer/model/dftx/composite.swap.ts +++ b/apps/whale-api/src/module.indexer/model/dftx/composite.swap.ts @@ -4,7 +4,6 @@ import { RawBlock } from '../_abstract' import { Inject, Injectable } from '@nestjs/common' import { NetworkName } from '@defichain/jellyfish-network' import BigNumber from 'bignumber.js' -import { IndexerError } from '../../error' import { PoolPairPathMapping } from './pool.pair.path.mapping' import { PoolSwapIndexer } from './pool.swap' @@ -51,10 +50,6 @@ export class CompositeSwapIndexer extends DfTxIndexer { const poolSwap = compositeSwap.poolSwap const pair = await this.poolPairPathMapping.findPair(poolSwap.fromTokenId, poolSwap.toTokenId) - if (pair !== undefined) { - return [{ id: Number(pair.id) }] - } - - throw new IndexerError(`Pool for pair ${poolSwap.fromTokenId}, ${poolSwap.toTokenId} not found in PoolPairPathMapping`) + return [{ id: Number(pair.id) }] } } diff --git a/apps/whale-api/src/module.indexer/model/dftx/pool.pair.path.mapping.ts b/apps/whale-api/src/module.indexer/model/dftx/pool.pair.path.mapping.ts index 050c52b94f..e32506b4b1 100644 --- a/apps/whale-api/src/module.indexer/model/dftx/pool.pair.path.mapping.ts +++ b/apps/whale-api/src/module.indexer/model/dftx/pool.pair.path.mapping.ts @@ -1,8 +1,9 @@ -import { Injectable } from '@nestjs/common' +import { Injectable, Logger } from '@nestjs/common' import { DeFiDCache, PoolPairInfoWithId } from '../../../module.api/cache/defid.cache' @Injectable() export class PoolPairPathMapping { + private readonly logger = new Logger(PoolPairPathMapping.name) private readonly paths: Record = {} constructor ( @@ -10,13 +11,17 @@ export class PoolPairPathMapping { ) { } - async findPair (tokenA: number, tokenB: number): Promise { + async findPair (tokenA: number, tokenB: number): Promise { const pair = this.paths[`${tokenA}-${tokenB}`] if (pair !== undefined) { return pair } await this.updateMapping() + if (this.paths[`${tokenA}-${tokenB}`] === undefined) { + this.logger.error(`Pool for pair ${tokenA}, ${tokenB} not found in PoolPairPathMapping`) + await this.findPair(tokenA, tokenB) + } return this.paths[`${tokenA}-${tokenB}`] } diff --git a/apps/whale-api/src/module.indexer/model/dftx/pool.swap.ts b/apps/whale-api/src/module.indexer/model/dftx/pool.swap.ts index 881ff707b4..c7b91afcb9 100644 --- a/apps/whale-api/src/module.indexer/model/dftx/pool.swap.ts +++ b/apps/whale-api/src/module.indexer/model/dftx/pool.swap.ts @@ -9,7 +9,6 @@ import { HexEncoder } from '../../../module.model/_hex.encoder' import { PoolSwapAggregatedMapper } from '../../../module.model/pool.swap.aggregated' import { AggregatedIntervals } from './pool.swap.aggregated' import { PoolPairInfoWithId } from '../../../module.api/cache/defid.cache' -import { IndexerError } from '../../error' import { PoolPairPathMapping } from './pool.pair.path.mapping' @Injectable() @@ -89,11 +88,6 @@ export class PoolSwapIndexer extends DfTxIndexer { } async getPair (tokenA: number, tokenB: number): Promise { - const pair = await this.poolPairPathMapping.findPair(tokenA, tokenB) - if (pair !== undefined) { - return pair - } - - throw new IndexerError(`Pool for pair ${tokenA}, ${tokenB} not found in PoolPairPathMapping`) + return await this.poolPairPathMapping.findPair(tokenA, tokenB) } }