From e1fcb731e271d98cefcd94c2f26c02f80ccb6222 Mon Sep 17 00:00:00 2001 From: Nazar Duchak Date: Fri, 21 Aug 2020 18:12:03 +0300 Subject: [PATCH] chore: add stop method for rate service refactor db-backup job --- src/app.ts | 5 +- src/cli/start.ts | 5 +- src/definitions.ts | 8 +++ src/services/rates/index.ts | 8 ++- src/utils.ts | 106 +++++++++++++++++++++++------------- 5 files changed, 86 insertions(+), 46 deletions(-) diff --git a/src/app.ts b/src/app.ts index a6b7a4d0..17713576 100644 --- a/src/app.ts +++ b/src/app.ts @@ -60,13 +60,13 @@ export async function appFactory (options: AppOptions): Promise<{ stop: () => vo /**********************************************************/ // Configure each services - const servicePromises: Promise[] = [] + const servicePromises: Promise<{ stop: () => void }>[] = [] for (const service of Object.values(services)) { app.configure((app) => servicePromises.push(errorHandler(service.initialize, logger)(app))) } // Wait for services to initialize - await Promise.all(servicePromises) + const servicesInstances = await Promise.all(servicePromises) // Log errors in hooks app.hooks({ @@ -97,6 +97,7 @@ export async function appFactory (options: AppOptions): Promise<{ stop: () => vo return { stop: () => { server.close() + servicesInstances.forEach(service => service.stop()) } } } diff --git a/src/cli/start.ts b/src/cli/start.ts index 34fec31d..846c1062 100644 --- a/src/cli/start.ts +++ b/src/cli/start.ts @@ -5,8 +5,9 @@ import { appFactory, services } from '../app' import { loggingFactory } from '../logger' import { Flags, Config, SupportedServices, isSupportedServices } from '../definitions' import { BaseCLICommand, DbBackUpJob, restoreDb } from '../utils' -import { sequelizeFactory } from '../sequelize' import Event from '../blockchain/event.model' +import { getNewBlockEmitter } from '../blockchain/utils' +import { ethFactory } from '../blockchain' const logger = loggingFactory('cli:start') @@ -108,7 +109,7 @@ ${formattedServices}` let stopCallback = (() => { throw new Error('No stop callback was assigned!') }) as () => void // Run backup job - const backUpJob = new DbBackUpJob() + const backUpJob = new DbBackUpJob(getNewBlockEmitter(ethFactory())) backUpJob.run() // Promise that resolves when reset callback is called diff --git a/src/definitions.ts b/src/definitions.ts index 4e7a0199..cb9268a5 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -107,6 +107,11 @@ export interface BlockchainServiceOptions { newBlockEmitter?: NewBlockEmitterOptions } +export interface DbBackUpConfig { + blocks: number + path: string +} + export interface Config { host?: string port?: number @@ -114,6 +119,9 @@ export interface Config { // DB URI to connect to database db?: string + // DB backup config + dbBackUp?: DbBackUpConfig + log?: { level?: string filter?: string diff --git a/src/services/rates/index.ts b/src/services/rates/index.ts index 03c21e01..8fa120d3 100644 --- a/src/services/rates/index.ts +++ b/src/services/rates/index.ts @@ -16,10 +16,10 @@ const logger = loggingFactory(SERVICE_NAME) const storage: CachedService = { // eslint-disable-next-line require-await - async initialize (app: Application): Promise { + async initialize (app: Application): Promise<{ stop: () => void }> { if (!config.get('rates.enabled')) { logger.info('Rates service: disabled') - return + return { stop: () => undefined } } logger.info('Rates service: enabled') @@ -30,7 +30,9 @@ const storage: CachedService = { // Start periodical refresh const updatePeriod = config.get(CONFIG_UPDATE_PERIOD) * 1000 // Converting seconds to ms - setInterval(() => updater().catch(logger.error), updatePeriod) + const intervalId = setInterval(() => updater().catch(logger.error), updatePeriod) + + return { stop: () => clearInterval(intervalId) } }, async purge (): Promise { diff --git a/src/utils.ts b/src/utils.ts index 7c1e54ae..280f8ed2 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -8,8 +8,15 @@ import { hexToAscii } from 'web3-utils' import BigNumber from 'bignumber.js' import { BlockHeader } from 'web3-eth' -import { Application, Config, isSupportedServices, Logger, SupportedServices } from './definitions' -import { getNewBlockEmitter } from './blockchain/utils' +import { + Application, + Config, + isSupportedServices, + Logger, + SupportedServices, + EventsEmitterOptions, + DbBackUpConfig +} from './definitions' import { ethFactory } from './blockchain' import { AutoStartStopEventEmitter, NEW_BLOCK_EVENT_NAME } from './blockchain/new-block-emitters' import { services } from './app' @@ -216,9 +223,9 @@ function parseBackUps (backUpName: string): BackUpEntry { } function getBackUps (): BackUpEntry[] { - const backupConfig = config.get('dbBackUp') as { path: string } + const backupConfig = config.get('dbBackUp') - const backups = fs.readdirSync(path.resolve(__dirname, '../../' + backupConfig.path)) + const backups = fs.readdirSync(path.resolve(__dirname, '../' + backupConfig.path)) if (backups.length) { return backups @@ -232,54 +239,22 @@ function getBackUps (): BackUpEntry[] { return [] } -export class DbBackUpJob { - private newBlockEmitter: AutoStartStopEventEmitter - - constructor () { - this.newBlockEmitter = getNewBlockEmitter(ethFactory()) - } - - private backupHandler (block: BlockHeader): void { - const db = config.get('db') - const backupConfig = config.get('dbBackUp') as { path: string, blocks: number } - const [lastBackUp, previousBackUp] = getBackUps() - - if (block.number - backupConfig.blocks >= lastBackUp.block.number) { - // copy and rename current db - fs.copyFileSync(db, backupConfig.path) - fs.renameSync(path.resolve(backupConfig.path, db), path.resolve(backupConfig.path, `${block.hash}:${block.number}-${db}`)) - - // remove the oldest version - if (previousBackUp) fs.unlinkSync(path.resolve(backupConfig.path, previousBackUp.name)) - } - } - - public run (): void { - const newBlockEmitter = getNewBlockEmitter(ethFactory()) - newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupHandler) - } - - public stop (): void { - this.newBlockEmitter.stop() - } -} - export async function restoreDb (): Promise { const db = config.get('db') - const backupConfig = config.get<{ path: string }>('dbBackUp') + const backupConfig = config.get('dbBackUp') const eth = ethFactory() const backUps = getBackUps() const [latest, previousBackUp] = backUps if (backUps.length < 2) { - // TODO Notify devOps + // TODO Notify devOps throw new Error('Back up not exist') } // Check if back up last processed block hash is valid if (!(await eth.getBlock(backUps[1].block.hash))) { - // TODO Notify devOps + // TODO Notify devOps throw new Error('Invalid backup. Hash is not exist!') } @@ -297,3 +272,56 @@ export async function restoreDb (): Promise { toBePrecache.map((service) => services[service].precache(eth)) ) } + +export class DbBackUpJob { + private newBlockEmitter: AutoStartStopEventEmitter + private backUpConfig: DbBackUpConfig + + constructor (newBlockEmitter: AutoStartStopEventEmitter) { + if (!config.has('dbBackUp')) throw new Error('DB Backup config not exist') + this.backUpConfig = config.get<{ blocks: number, path: string }>('dbBackUp') + + const eventEmittersConfirmations = this.getEventEmittersConfigs() + const invalidConfirmation = eventEmittersConfirmations.find(c => c.config.confirmations && c.config.confirmations > this.backUpConfig.blocks) + + if (invalidConfirmation) { + throw new Error(`Invalid db backup configuration. Number of backup blocks should be greater then confirmation blocks for ${invalidConfirmation.name} service`) + } + + this.newBlockEmitter = newBlockEmitter + } + + private backupHandler (block: BlockHeader): void { + const db = config.get('db') + const backupConfig = config.get('dbBackUp') as { path: string, blocks: number } + const [lastBackUp, previousBackUp] = getBackUps() + + if (block.number - backupConfig.blocks >= lastBackUp.block.number) { + // copy and rename current db + fs.copyFileSync(db, backupConfig.path) + fs.renameSync(path.resolve(backupConfig.path, db), path.resolve(backupConfig.path, `${block.hash}:${block.number}-${db}`)) + + // remove the oldest version + if (previousBackUp) fs.unlinkSync(path.resolve(backupConfig.path, previousBackUp.name)) + } + } + + private getEventEmittersConfigs (): { config: EventsEmitterOptions, name: string }[] { + return Object.values(SupportedServices) + .reduce((acc: { config: EventsEmitterOptions, name: string }[], serviceName: string) => { + if (config.has(`${serviceName}.eventsEmitter`)) { + const emitterConfig = config.get(`${serviceName}.eventsEmitter`) + return [...acc, { config: emitterConfig, name: serviceName }] + } + return acc + }, []) + } + + public run (): void { + this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupHandler) + } + + public stop (): void { + this.newBlockEmitter.stop() + } +}