Skip to content

Commit

Permalink
add quick-scan using BFS to the repair tool
Browse files Browse the repository at this point in the history
  • Loading branch information
igorls committed Nov 20, 2024
1 parent dde7390 commit 0388a7b
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 52 deletions.
238 changes: 187 additions & 51 deletions src/cli/hyp-repair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import {existsSync, mkdirSync, writeFileSync} from 'fs';
import {HyperionBlock} from './repair-cli/interfaces.js';
import {
getBlocks,
getFirstIndexedBlock,
getLastIndexedBlock,
initESClient,
readChainConfig,
readConnectionConfig,
} from './repair-cli/functions.js';

import {getFirstIndexedBlock} from "../indexer/helpers/common_functions.js";


import {WebSocket} from 'ws';

const progressBar = new cliProgress.SingleBar(
Expand All @@ -25,51 +27,54 @@ const program = new Command();
const errorRanges: any[] = [];
let pendingBlock: HyperionBlock | null = null;

const missingBlocks: {
let missingBlocks: {
start: number;
end: number;
count: number;
}[] = [];

async function run(
client: Client,
rpc: JsonRpc,
indexName: string,
blockInit: number,
lastRequestedBlock: number,
firstBlock: number,
qtdTotal: any,
loop: any = 1
) {
let blockInitial = blockInit;
let blockFinal = blockInitial - qtdTotal;
// run in reverse order
let blockInitial = lastRequestedBlock;
let finalBlock = blockInitial - qtdTotal;
const tRef = process.hrtime.bigint();
progressBar.start(loop, 0);
for (let i: any = 1; i <= loop; i++) {
if (blockFinal < firstBlock) {
blockFinal = firstBlock;
if (finalBlock < firstBlock) {
finalBlock = firstBlock;
}
try {
const result = await getBlocks(
client,
indexName,
blockInitial,
blockFinal,
finalBlock,
qtdTotal
);
let {hits: {hits}} = result;
const blocks = hits.map((obj: any) => obj._source);
await findForksOnRange(blocks, rpc);
blockInitial = blockFinal;
blockFinal = blockInitial - qtdTotal;
blockInitial = finalBlock;
finalBlock = blockInitial - qtdTotal;
progressBar.update(i);
} catch (e: any) {
console.log('Error: ', e);
progressBar.stop();
console.log('\nError getting blocks from ES: ', e.message);
process.exit(1);
}
}
progressBar.stop();

if (errorRanges.length === 0 && missingBlocks.length === 0) {
console.log(`\n 🎉 🎉 No forked or missing blocks found between the range ${blockInitial} and ${blockFinal}`);
console.log(`\n 🎉 🎉 No forked or missing blocks found between the range ${blockInitial} and ${lastRequestedBlock}`);
} else {
if (errorRanges.length > 0) {
console.log('\n=========== Forked Ranges ===========');
Expand Down Expand Up @@ -111,13 +116,14 @@ async function findForksOnRange(blocks: HyperionBlock[], rpc: JsonRpc) {
i++;
// console.log('previous -> ', previousBlock.block_num);
if (previousBlock.block_num !== currentBlockNumber - 1) {
console.log(
`\nBlock number mismatch, expected: ${currentBlockNumber - 1
} got ${previousBlock.block_num}`
);
// console.log(
// `\nBlock number mismatch, expected: ${currentBlockNumber - 1
// } got ${previousBlock.block_num}`
// );
missingBlocks.push({
start: previousBlock.block_num + 1,
end: currentBlockNumber - 1,
count: currentBlockNumber - previousBlock.block_num - 1
});
continue;
}
Expand Down Expand Up @@ -151,6 +157,33 @@ async function findForksOnRange(blocks: HyperionBlock[], rpc: JsonRpc) {
}
}

function processResults(chain, firstBlock, lastBlock, args) {
if (errorRanges.length > 0 || missingBlocks.length > 0) {
if (!existsSync('.repair')) {
mkdirSync('.repair');
}
}

if (errorRanges.length > 0) {
let path = `.repair/${chain}-${firstBlock + 2}-${lastBlock}-forked-blocks.json`;
if (args.outFile) {
path = args.outFile + '-forked-blocks.json';
}
console.log(`Run the following command to repair forked blocks: \n\nhyp-repair repair ${chain} ${path}`);
writeFileSync(path, JSON.stringify(errorRanges));
}

if (missingBlocks.length > 0) {
let path = `.repair/${chain}-${firstBlock + 2
}-${lastBlock}-missing-blocks.json`;
if (args.outFile) {
path = args.outFile + '-missing-blocks.json';
}
console.log(`Run the following command to repair missing blocks: \n\nhyp-repair fill-missing ${chain} ${path}`);
writeFileSync(path, JSON.stringify(missingBlocks));
}
}

async function scanChain(chain: string, args: any) {
const chainConfig = readChainConfig(chain);
const config = readConnectionConfig();
Expand All @@ -163,15 +196,7 @@ async function scanChain(chain: string, args: any) {
process.exit();
}

console.log(`
__ _ _ __ __
/ / __ __ ___ ___ ____ (_) ___ ___ ____ ___ ___ ___ _ (_) ____ / /_ ___ ___ / /
/ _ \\ / // / / _ \\/ -_) / __/ / / / _ \\ / _ \\ / __// -_) / _ \\/ _ \`/ / / / __/ / __// _ \\/ _ \\ / /
/_//_/ \\_, / / .__/\\__/ /_/ /_/ \\___//_//_/ /_/ \\__/ / .__/\\_,_/ /_/ /_/ \\__/ \\___/\\___//_/
/___/ /_/ /_/
`);


printHeader();

const blockIndex = `${chain}-block-${chainConfig.settings.index_version}`;

Expand All @@ -192,7 +217,7 @@ async function scanChain(chain: string, args: any) {
// reduce by 2 to account for the fork end validation
firstBlock = args.first - 2;
} else {
firstBlock = await getFirstIndexedBlock(client, blockIndex);
firstBlock = await getFirstIndexedBlock(client, chain, chainConfig.settings.index_partition_size);
}

if (args.last) {
Expand All @@ -203,7 +228,7 @@ async function scanChain(chain: string, args: any) {

const totalBlocks = lastBlock - firstBlock;

let batchSize = 1000;
let batchSize = 2000;
if (args.batch) {
batchSize = args.batch;
}
Expand All @@ -218,28 +243,7 @@ async function scanChain(chain: string, args: any) {
await run(client, jsonRpc, blockIndex, lastBlock, firstBlock, batchSize, numberOfBatches);
console.log(`Finished checking forked blocks!`);

if (errorRanges.length > 0 || missingBlocks.length > 0) {
if (!existsSync('.repair')) {
mkdirSync('.repair');
}
}

if (errorRanges.length > 0) {
let path = `.repair/${chain}-${firstBlock + 2}-${lastBlock}-forked-blocks.json`;
if (args.outFile) {
path = args.outFile + '-forked-blocks.json';
}
writeFileSync(path, JSON.stringify(errorRanges));
}

if (missingBlocks.length > 0) {
let path = `.repair/${chain}-${firstBlock + 2
}-${lastBlock}-missing-blocks.json`;
if (args.outFile) {
path = args.outFile + '-missing-blocks.json';
}
writeFileSync(path, JSON.stringify(missingBlocks));
}
processResults(chain, firstBlock, lastBlock, args);
}

async function repairMissing(chain: string, file: string, args: any) {
Expand Down Expand Up @@ -799,7 +803,6 @@ async function repairChain(chain: string, file: string, args: any) {
`Deleted ${deleteBlocks} blocks, ${deleteActions} actions, ${deleteDeltas} deltas and ${deleteAbis} ABIs`
);
}

await fillMissingBlocksFromFile(args.host, chain, file, args.dry);
}

Expand Down Expand Up @@ -873,6 +876,134 @@ function viewFile(file: string) {
console.table(parsed);
}

function printHeader() {
console.log(`
__ _ _ __ __
/ / __ __ ___ ___ ____ (_) ___ ___ ____ ___ ___ ___ _ (_) ____ / /_ ___ ___ / /
/ _ \\ / // / / _ \\/ -_) / __/ / / / _ \\ / _ \\ / __// -_) / _ \\/ _ \`/ / / / __/ / __// _ \\/ _ \\ / /
/_//_/ \\_, / / .__/\\__/ /_/ /_/ \\___//_//_/ /_/ \\__/ / .__/\\_,_/ /_/ /_/ \\__/ \\___/\\___//_/
/___/ /_/ /_/
`);
}

async function quickScanChain(chain: string, args: any) {
const tRef = process.hrtime.bigint();
const chainConfig = readChainConfig(chain);
const config = readConnectionConfig();
const client = initESClient(config);
const jsonRpc = new JsonRpc(config.chains[chain].http, {fetch});
const ping = await client.ping();

if (!ping) {
console.log('Could not connect to ElasticSearch');
process.exit();
}

printHeader();

const blockIndex = `${chain}-block-${chainConfig.settings.index_version}`;

console.log(`Using block index: ${blockIndex}`);

let firstBlock;
let lastBlock;

if (args.first && args.last) {
// first must be less than last
if (args.first > args.last) {
console.log('First block must be less than last block');
process.exit();
}
}

if (args.first) {
// reduce by 2 to account for the fork end validation
firstBlock = args.first - 2;
} else {
const tRef = process.hrtime.bigint();
firstBlock = await getFirstIndexedBlock(client, chain, chainConfig.settings.index_partition_size);
console.log(`First block search time: ${Number(process.hrtime.bigint() - tRef) / 1000000}ms`);
}

if (args.last) {
lastBlock = args.last;
} else {
const tRef = process.hrtime.bigint();
lastBlock = await getLastIndexedBlock(client, blockIndex);
console.log(`Last block search time: ${Number(process.hrtime.bigint() - tRef) / 1000000}ms`);
}

const totalBlocks = lastBlock - firstBlock + 1;

console.log('Range:', firstBlock, lastBlock);
console.log('Total Blocks:', totalBlocks);

await searchMissingBlocks(client, blockIndex, firstBlock, lastBlock, totalBlocks);

if (missingBlocks.length > 0) {
// merge missing ranges
const mergedRanges: any[] = [];
let start = missingBlocks[0].start;
let end = missingBlocks[0].end;
for (let i = 1; i < missingBlocks.length; i++) {
if (missingBlocks[i].start === end + 1) {
end = missingBlocks[i].end;
} else {
mergedRanges.push({start, end, count: end - start + 1});
start = missingBlocks[i].start;
end = missingBlocks[i].end;
}
}
mergedRanges.push({start, end, count: end - start + 1});
console.log(mergedRanges);
missingBlocks = mergedRanges;
const totalMissingBlocks = missingBlocks.reduce((acc, range) => acc + range.count, 0);
console.log(`\n ⚠️ ⚠️ Found ${totalMissingBlocks} missing blocks between the range ${firstBlock} and ${lastBlock}`);
} else {
console.log(`\n 🎉 🎉 No missing blocks found between the range ${firstBlock} and ${lastBlock}`);
}
console.log(`\nTotal time: ${Number(process.hrtime.bigint() - tRef) / 1000000}ms`);

processResults(chain, firstBlock, lastBlock, args);
}

async function searchMissingBlocks(client: Client,
blockIndex: string,
firstBlock: number,
lastBlock: number,
totalBlocks: number) {

// console.log(`Search region size: ${totalBlocks}`);
// count the total number of indexed blocks
const totalIndexedBlocks = await client.count({
index: blockIndex,
query: {
bool: {
must: [{
range: {block_num: {gte: firstBlock, lte: lastBlock}}
}]
}
}
});

if (totalIndexedBlocks.count === 0) {
// console.log(`\n ❌ No indexed blocks found between the range ${firstBlock} and ${lastBlock}`);
missingBlocks.push({start: firstBlock, end: lastBlock, count: firstBlock - lastBlock + 1});
return;
}

const missedBlocks = totalBlocks - totalIndexedBlocks.count;
if (missedBlocks > 0) {
// console.log(`\n ⚠️ ⚠️ Found ${missedBlocks} missing blocks between the range ${firstBlock} and ${lastBlock}`);
// split the range in half
const middleBlock = Math.floor((lastBlock + firstBlock) / 2);
const leftRange = {first: firstBlock, last: middleBlock};
const rightRange = {first: middleBlock + 1, last: lastBlock};
await searchMissingBlocks(client, blockIndex, leftRange.first, leftRange.last, middleBlock - firstBlock + 1);
await searchMissingBlocks(client, blockIndex, rightRange.first, rightRange.last, lastBlock - middleBlock);
}
}

// Commander Logic

program
Expand All @@ -882,14 +1013,19 @@ program

program
.command('scan <chain>')
.description('scan for forked blocks')
.description('scan for missing and forked blocks')
.option('-d, --dry', 'dry-run, do not delete or repair blocks')
.option('-o, --out-file <file>', 'forked-blocks.json output file')
.option('-f, --first <number>', 'initial block to start validation')
.option('-l, --last <number>', 'last block to validate')
.option('-b, --batch <number>', 'batch size to process')
.action(scanChain);

program
.command('quick-scan <chain>')
.description('scan for missing blocks using binary tree search')
.action(quickScanChain);

program
.command('repair <chain> <file>')
.description('repair forked blocks')
Expand Down
3 changes: 2 additions & 1 deletion src/cli/repair-cli/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {readFileSync} from "node:fs";
import {Client} from "@elastic/elasticsearch";
import {HyperionBlock} from "./interfaces.js";
import path from "path";
import {HyperionConfig} from "../../interfaces/hyperionConfig.js";

export function readConnectionConfig() {
const connectionsConfigPath = path.join(import.meta.dirname, '../../../', 'config/connections.json');
Expand All @@ -23,7 +24,7 @@ export function initESClient(config: any) {
});
}

export function readChainConfig(chain: string) {
export function readChainConfig(chain: string): HyperionConfig {
const chainConfigPath = path.join(import.meta.dirname, '../../../', `config/chains/${chain}.config.json`);
const file = readFileSync(chainConfigPath, 'utf8');
return JSON.parse(file);
Expand Down

0 comments on commit 0388a7b

Please sign in to comment.