diff --git a/src/app.ts b/src/app.ts index 83befd6c..6df6ac6c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -32,7 +32,7 @@ export const services = { export type AppOptions = { appResetCallBack: (...args: any) => void } -export async function appFactory (options: AppOptions): Promise<{ stop: () => void }> { +export async function startApp (options: AppOptions): Promise<{ stop: () => void }> { const app: Application = express(feathers()) logger.verbose('Current configuration: ', config) diff --git a/src/blockchain/reorg-emitter.ts b/src/blockchain/reorg-emitter.ts index d9358a17..ba6e51b9 100644 --- a/src/blockchain/reorg-emitter.ts +++ b/src/blockchain/reorg-emitter.ts @@ -31,9 +31,8 @@ export class ReorgEmitterService implements Partial> { if (!this.timeoutStarted) { setTimeout(() => { - if (this.emit) { - this.emit(REORG_OUT_OF_RANGE_EVENT_NAME, { contracts: this.reorgContract, lastProcessedBlockNumber: this.lastProcessedBlockNumber }) - } + logger.warn(`Reorg outside of confirmation range happens on block number ${lastProcessedBlockNumber} for [${this.reorgContract}] contracts`) + this.emit!(REORG_OUT_OF_RANGE_EVENT_NAME, { contracts: this.reorgContract, lastProcessedBlockNumber: this.lastProcessedBlockNumber }) this.reorgContract = [] this.lastProcessedBlockNumber = 0 }, this.debounceTime) @@ -42,7 +41,5 @@ export class ReorgEmitterService implements Partial> { this.reorgContract = [...this.reorgContract, contractName] this.lastProcessedBlockNumber = lastProcessedBlockNumber - - logger.info(`Reorg happens on block number ${lastProcessedBlockNumber} for ${contractName} contract`) } } diff --git a/src/cli/start.ts b/src/cli/start.ts index 6d0dc360..672bd8f7 100644 --- a/src/cli/start.ts +++ b/src/cli/start.ts @@ -1,10 +1,11 @@ import config from 'config' import { flags } from '@oclif/command' -import { appFactory, services } from '../app' +import { startApp, services } from '../app' import { loggingFactory } from '../logger' import { Flags, Config, SupportedServices, isSupportedServices } from '../definitions' -import { BaseCLICommand, DbBackUpJob } from '../utils' +import { BaseCLICommand } from '../utils' +import DbBackUpJob from '../db-backup' import { ethFactory } from '../blockchain' const logger = loggingFactory('cli:start') @@ -25,10 +26,6 @@ ${formattedServices}` port: flags.integer({ char: 'p', description: 'port to attach the server to' }), db: flags.string({ description: 'database connection URI', env: 'RIFM_DB' }), provider: flags.string({ description: 'blockchain provider connection URI', env: 'RIFM_PROVIDER' }), - purge: flags.boolean({ - char: 'u', - description: 'will purge services that should be lunched (eq. enable/disable is applied)' - }), enable: flags.string({ char: 'e', multiple: true, description: 'enable specific service' }), disable: flags.string({ char: 'd', multiple: true, description: 'disable specific service' }) } @@ -109,7 +106,7 @@ ${formattedServices}` // Promise that resolves when reset callback is called const resetPromise = new Promise(resolve => { - appFactory({ + startApp({ appResetCallBack: () => resolve() }).then(value => { // Lets save the function that stops the app @@ -127,7 +124,7 @@ ${formattedServices}` backUpJob.stop() // Restore DB from backup - await backUpJob.restoreDb(() => undefined) + await backUpJob.restoreDb() // Run pre-cache await this.precache() diff --git a/src/db-backup.ts b/src/db-backup.ts new file mode 100644 index 00000000..da927ab4 --- /dev/null +++ b/src/db-backup.ts @@ -0,0 +1,115 @@ +import fs from 'fs' +import config from 'config' +import path from 'path' +import BigNumber from 'bignumber.js' +import { BlockHeader, Eth } from 'web3-eth' + +import { AutoStartStopEventEmitter, NEW_BLOCK_EVENT_NAME } from './blockchain/new-block-emitters' +import { DbBackUpConfig } from './definitions' +import { getNewBlockEmitter } from './blockchain/utils' + +export type BackUpEntry = { name: string, block: { hash: string, number: BigNumber } } + +export function parseBackUps (backUpName: string): BackUpEntry { + const [block] = backUpName.split('.')[0].split('-') + const [hash, blockNumber] = block.split(':') + + return { + name: backUpName, + block: { number: new BigNumber(blockNumber), hash } + } +} + +export function getBackUps (): BackUpEntry[] { + const backupConfig = config.get('dbBackUp') + + const backups = fs.readdirSync(path.resolve(process.cwd(), `./${backupConfig.path}`)) + + if (backups.length) { + return backups + .map(parseBackUps) + .sort( + (a: Record, b: Record) => + a.block.number.gt(b.block.number) ? -1 : 1 + ) + } + + return [] +} + +export class DbBackUpJob { + readonly newBlockEmitter: AutoStartStopEventEmitter + readonly db: string + readonly eth: Eth + readonly backUpConfig: DbBackUpConfig + + constructor (eth: Eth) { + if (!config.has('dbBackUp')) { + throw new Error('DB Backup config not exist') + } + this.backUpConfig = config.get('dbBackUp') + this.db = config.get('db') + + if (!fs.existsSync(this.backUpConfig.path)) { + fs.mkdirSync(this.backUpConfig.path) + } + + this.eth = eth + this.newBlockEmitter = getNewBlockEmitter(eth) + } + + /** + * Back-up database if blocks condition met + * @return {Promise} + * @param block + */ + private async backupDb (block: BlockHeader): Promise { + const [lastBackUp, previousBackUp] = getBackUps() + + if (!lastBackUp || new BigNumber(block.number).minus(this.backUpConfig.blocks).gte(lastBackUp.block.number)) { + // copy and rename current db + await fs.promises.copyFile(this.db, path.resolve(this.backUpConfig.path, `${block.hash}:${block.number}-${this.db}`)) + + // remove the oldest version + if (previousBackUp) { + await fs.promises.unlink(path.resolve(this.backUpConfig.path, previousBackUp.name)) + } + } + } + + /** + * Restore database backup + * @return {Promise} + */ + public async restoreDb (): Promise { + const backUps = getBackUps() + const [_, oldest] = backUps + + if (backUps.length < 2) { + throw new Error('Should be two backups to be able to restore') + } + + // Check if back up block hash exist after reorg + const block = await this.eth.getBlock(oldest.block.hash).catch(() => false) + + if (!block) { + throw new Error('Invalid backup. Block Hash is not valid!') + } + + // remove current db + await fs.promises.unlink(this.db) + + // restore backup + await fs.promises.copyFile(path.resolve(this.backUpConfig.path, oldest.name), path.resolve(process.cwd(), this.db)) + } + + public run (): void { + this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupDb.bind(this)) + } + + public stop (): void { + this.newBlockEmitter.stop() + } +} + +export default DbBackUpJob diff --git a/src/index.ts b/src/index.ts index fbe3c5a0..ede2759c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ -import { appFactory } from './app' +import { startApp } from './app' (async function (): Promise { - await appFactory({ appResetCallBack: () => { throw new Error('Reset callback not implemented') } }) + await startApp({ appResetCallBack: () => { throw new Error('Reset callback not implemented') } }) })() diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index f766a2b4..706d141b 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -117,6 +117,7 @@ const storage: CachedService = { return { stop: () => { + confirmationService.removeAllListeners() eventsEmitter.stop() } } diff --git a/src/utils.ts b/src/utils.ts index a0ba76df..dcfe9d0a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,10 +3,8 @@ import { Input, OutputFlags } from '@oclif/parser' import { promisify } from 'util' import config from 'config' import fs from 'fs' -import path from 'path' import { hexToAscii } from 'web3-utils' import BigNumber from 'bignumber.js' -import { BlockHeader, Eth } from 'web3-eth' import { Application, @@ -14,11 +12,7 @@ import { isSupportedServices, Logger, SupportedServices, - EventsEmitterOptions, - DbBackUpConfig } from './definitions' -import { AutoStartStopEventEmitter, NEW_BLOCK_EVENT_NAME } from './blockchain/new-block-emitters' -import { getNewBlockEmitter } from './blockchain/utils' const readFile = promisify(fs.readFile) @@ -208,145 +202,3 @@ export abstract class BaseCLICommand extends Command { return Promise.resolve() } } - -type BackUpEntry = { name: string, block: { hash: string, number: BigNumber } } - -function parseBackUps (backUpName: string): BackUpEntry { - const [block] = backUpName.split('.')[0].split('-') - const [hash, blockNumber] = block.split(':') - - return { - name: backUpName, - block: { number: new BigNumber(blockNumber), hash } - } -} - -function getBackUps (): BackUpEntry[] { - const backupConfig = config.get('dbBackUp') - - const backups = fs.readdirSync(path.resolve(__dirname, `../${backupConfig.path}`)) - - if (backups.length) { - return backups - .map(parseBackUps) - .sort( - (a: Record, b: Record) => - a.block.number.gt(b.block.number) ? -1 : 1 - ) - } - - return [] -} - -export class DbBackUpJob { - readonly newBlockEmitter: AutoStartStopEventEmitter - readonly db: string - readonly eth: Eth - readonly backUpConfig: DbBackUpConfig - - constructor (eth: Eth) { - if (!config.has('dbBackUp')) { - throw new Error('DB Backup config not exist') - } - this.backUpConfig = config.get('dbBackUp') - this.db = config.get('db') - - const eventEmittersConfirmations = this.getEventEmittersConfigs() - const invalidConfirmation = eventEmittersConfirmations.find(c => c.config.confirmations && c.config.confirmations > this.backUpConfig.blocks) - - if (invalidConfirmation) { - throw new Error(`Invalid db backup configuration. Number of backup blocks should be greater then confirmation blocks for ${invalidConfirmation.name} service`) - } - - if (!fs.existsSync(this.backUpConfig.path)) { - fs.mkdirSync(this.backUpConfig.path) - } - - this.eth = eth - this.newBlockEmitter = getNewBlockEmitter(eth) - } - - /** - * Back-up database if blocks condition met - * @return {Promise} - * @param block - */ - private async backupDb (block: BlockHeader): Promise { - const [lastBackUp, previousBackUp] = getBackUps() - - if (!lastBackUp || new BigNumber(block.number).minus(this.backUpConfig.blocks).gte(lastBackUp.block.number)) { - // copy and rename current db - await fs.promises.copyFile(this.db, path.resolve(this.backUpConfig.path, `${block.hash}:${block.number}-${this.db}`)) - - // remove the oldest version - if (previousBackUp) { - await fs.promises.unlink(path.resolve(this.backUpConfig.path, previousBackUp.name)) - } - } - } - - /** - * Restore database backup - * @param {(message:any) => void} errorCallback - * @return {Promise} - */ - public async restoreDb (errorCallback: (message: any) => void): Promise { - const backUps = getBackUps() - const [_, oldest] = backUps - - if (backUps.length < 2) { - errorCallback({ code: 1, message: 'Not enough backups' }) - throw new Error('Should be two backups to be able to restore') - } - - // Check if back up block hash exist after reorg - const block = await this.eth.getBlock(oldest.block.hash).catch(() => false) - - if (!block) { - errorCallback({ code: 2, message: 'Invalid backup. Block Hash is not valid!' }) - throw new Error('Invalid backup. Block Hash is not valid!') - } - - // remove current db - await fs.promises.unlink(this.db) - - // restore backup - await fs.promises.copyFile(path.resolve(this.backUpConfig.path, oldest.name), path.resolve(process.cwd(), this.db)) - } - - private getEventEmittersConfigs (): { config: EventsEmitterOptions, name: string }[] { - return Object.values(SupportedServices) - .reduce((acc: { config: EventsEmitterOptions, name: string }[], serviceName: string) => { - if (config.has(serviceName) && config.get(`${serviceName}.enabled`)) { - if (serviceName === SupportedServices.RNS) { - const rnsEmitters = ['owner', 'reverse', 'placement'] - .reduce( - (acc2: any[], contract: string) => { - if (config.has(`${serviceName}.${contract}.eventsEmitter`)) { - const emitterConfig = config.get(`${serviceName}.${contract}.eventsEmitter`) - return [...acc2, { config: emitterConfig, name: `${serviceName}.${contract}` }] - } - return acc2 - }, - [] - ) - return [...acc, ...rnsEmitters] - } - - if (config.has(`${serviceName}.eventsEmitter`)) { - const emitterConfig = config.get(`${serviceName}.eventsEmitter`) - return [...acc, { config: emitterConfig, name: serviceName }] - } - } - return acc - }, []) - } - - public run (): void { - this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupDb.bind(this)) - } - - public stop (): void { - this.newBlockEmitter.stop() - } -} diff --git a/test/db-backup-restore.spec.ts b/test/db-backup-restore.spec.ts index 7c5f1eb6..65470c70 100644 --- a/test/db-backup-restore.spec.ts +++ b/test/db-backup-restore.spec.ts @@ -7,7 +7,7 @@ import sinonChai from 'sinon-chai' import { Eth } from 'web3-eth' import { Substitute, Arg } from '@fluffy-spoon/substitute' -import { DbBackUpJob } from '../src/utils' +import DbBackUpJob from '../src/db-backup' import { blockMock, rmDir, sleep } from './utils' import { NEW_BLOCK_EVENT_NAME } from '../src/blockchain/new-block-emitters' import { DbBackUpConfig } from '../src/definitions' @@ -29,7 +29,7 @@ describe('DB back-up/restore', function () { const errorCallBack = sinon.spy() expect(fs.readdirSync(config.get('dbBackUp').path).length).to.be.eql(0) - await expect(backupJob.restoreDb(errorCallBack)).to.eventually.be.rejectedWith( + await expect(backupJob.restoreDb()).to.eventually.be.rejectedWith( Error, 'Should be two backups to be able to restore' ) @@ -48,7 +48,7 @@ describe('DB back-up/restore', function () { fs.writeFileSync(path.resolve(backupPath, `0x0123:20-${db}`), 'Second') expect(fs.readdirSync(backupPath).length).to.be.eql(2) - await expect(backupJob.restoreDb(errorCallBack)).to.eventually.be.rejectedWith( + await expect(backupJob.restoreDb()).to.eventually.be.rejectedWith( Error, 'Invalid backup. Block Hash is not valid!' ) @@ -68,7 +68,7 @@ describe('DB back-up/restore', function () { fs.writeFileSync(path.resolve(backupPath, `0x01234:20-${db}`), 'Second db backup') expect(fs.readdirSync(backupPath).length).to.be.eql(2) - await backupJob.restoreDb(errorCallBack) + await backupJob.restoreDb() await sleep(1000) expect(errorCallBack.called).to.be.false()