diff --git a/.circleci/config.yml b/.circleci/config.yml index bded65f0..18cdd20f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,6 +41,8 @@ jobs: type: string docker: - image: circleci/node:<< parameters.version >> + environment: + ALLOW_CONFIG_MUTATIONS: true steps: - checkout - run: diff --git a/.tasegir.js b/.tasegir.js index d04bdb31..e22d76de 100644 --- a/.tasegir.js +++ b/.tasegir.js @@ -4,7 +4,7 @@ module.exports = { }, depCheck: { ignore: [ - 'tasegir', 'reflect-metadata', '@types/*', 'sqlite3', '@oclif/*', + 'cross-env', 'tasegir', 'reflect-metadata', '@types/*', 'sqlite3', '@oclif/*', ] } } diff --git a/config/default.json5 b/config/default.json5 index 1367b641..b05290a7 100644 --- a/config/default.json5 +++ b/config/default.json5 @@ -1,6 +1,12 @@ // For full syntax see /src/types.ts::Config interface { - db: 'sqlite:db.sqlite', + db: 'db.sqlite', + + // DB back up config + dbBackUp: { + blocks: 1440, // 12 hours * 60 * 60 / 30s (new block every 30s) ==> backuped twice a day + path: 'db-backups' + }, // CORS setting, please consult https://expressjs.com/en/resources/middleware/cors.html for more details cors: { diff --git a/config/test.json5 b/config/test.json5 index 79b3ffd7..bbfb89d8 100644 --- a/config/test.json5 +++ b/config/test.json5 @@ -1,5 +1,6 @@ { - db: 'sqlite://db_test.sqlite', + db: 'db_test.sqlite', + dbBackUp: { blocks: 10, path: 'db-backups' }, blockchain: { provider: 'ws://localhost:8545', waitBlockCountBeforeConfirmationRemoved: 10 diff --git a/package-lock.json b/package-lock.json index 0dc2741f..c6e2477d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7590,6 +7590,58 @@ "sha.js": "^2.4.8" } }, + "cross-env": { + "version": "7.0.2", + "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.2.tgz", + "integrity": "sha512-KZP/bMEOJEDCkDQAyRhu3RL2ZO/SUVrxQVI0G3YEQ+OLbRA3c6zgixe8Mq8a/z7+HKlNEjo8oiLUs8iRijY2Rw==", + "dev": true, + "requires": { + "cross-spawn": "^7.0.1" + }, + "dependencies": { + "cross-spawn": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", + "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "dev": true, + "requires": { + "path-key": "^3.1.0", + "shebang-command": "^2.0.0", + "which": "^2.0.1" + } + }, + "path-key": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/path-key/-/path-key-3.1.1.tgz", + "integrity": "sha512-ojmeN0qd+y0jszEtoY48r0Peq5dwMEkIlCOu6Q5f41lfkswXuKtYrhgoTpLnyIcHm24Uhqx+5Tqm2InSwLhE6Q==", + "dev": true + }, + "shebang-command": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/shebang-command/-/shebang-command-2.0.0.tgz", + "integrity": "sha512-kHxr2zZpYtdmrN1qDjrrX/Z1rR1kG8Dx+gkpK1G4eXmvXswmcE1hTWBWYUzlraYw1/yZp6YuDY77YtvbN0dmDA==", + "dev": true, + "requires": { + "shebang-regex": "^3.0.0" + } + }, + "shebang-regex": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/shebang-regex/-/shebang-regex-3.0.0.tgz", + "integrity": "sha512-7++dFhtcx3353uBaq8DDR4NuxBetBzC7ZQOhmTQInHEd6bSrXdiEyzCvG07Z44UYdLShWUyXt5M/yhz8ekcb1A==", + "dev": true + }, + "which": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", + "integrity": "sha512-BLI3Tl1TW3Pvl70l3yq3Y64i+awpwXqsGBYWkkqMtnbXgrMD+yj7rhW0kuEDxzJaYXGjEW5ogapKNMEKNMjibA==", + "dev": true, + "requires": { + "isexe": "^2.0.0" + } + } + } + }, "cross-spawn": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-5.1.0.tgz", diff --git a/package.json b/package.json index 1ac1c668..2c476f3e 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "types-check": "tasegir types-check", "lint": "tasegir lint", "release": "tasegir release", - "test": "tasegir test --target node", + "test": "cross-env ALLOW_CONFIG_MUTATIONS=true tasegir test --target node", "start": "tasegir run --watch ./src/index.ts" }, "dependencies": { @@ -113,6 +113,7 @@ "@types/validator": "^13.0.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", + "cross-env": "^7.0.2", "dirty-chai": "^2.0.1", "sinon": "^9.0.1", "sinon-chai": "^3.5.0", diff --git a/src/app.ts b/src/app.ts index 5908ed6d..03a00dce 100644 --- a/src/app.ts +++ b/src/app.ts @@ -7,7 +7,7 @@ import feathers from '@feathersjs/feathers' import express from '@feathersjs/express' import socketio from '@feathersjs/socketio' -import { Application, SupportedServices } from './definitions' +import { Application, ServiceAddresses, SupportedServices } from './definitions' import { loggingFactory } from './logger' import sequelize from './sequelize' import blockchain from './blockchain' @@ -20,6 +20,7 @@ import authentication from './services/authentication' import storage from './services/storage' import rates from './services/rates' import rns from './services/rns' +import { REORG_OUT_OF_RANGE_EVENT_NAME } from './blockchain/events' const logger = loggingFactory() @@ -29,7 +30,9 @@ export const services = { [SupportedServices.RNS]: rns } -export async function appFactory (): Promise { +export type AppOptions = { appResetCallBack: (...args: any) => void } + +export async function appFactory (options: AppOptions): Promise<{ app: Application, stop: () => void }> { const app: Application = express(feathers()) logger.verbose('Current configuration: ', config) @@ -58,13 +61,13 @@ export async function appFactory (): Promise { /**********************************************************/ // Configure each services - const servicePromises: Promise[] = [] + const servicePromises: Promise<{ stop: () => void }>[] = [] for (const service of Object.values(services)) { app.configure((app) => servicePromises.push(errorHandler(service.initialize, logger)(app))) } // Wait for services to initialize - await Promise.all(servicePromises) + const servicesInstances = await Promise.all(servicePromises) // Log errors in hooks app.hooks({ @@ -77,5 +80,12 @@ export async function appFactory (): Promise { app.use(express.notFound()) app.use(express.errorHandler({ logger })) - return app + // Subscribe for reorg event + const reorgService = app.service(ServiceAddresses.REORG_EMITTER) + reorgService.on(REORG_OUT_OF_RANGE_EVENT_NAME, () => { + // wait 5 seconds to be sure that reorg event received by connected services + setTimeout(() => options.appResetCallBack(), 5000) + }) + + return { app, stop: () => servicesInstances.forEach(service => service.stop()) } } diff --git a/src/blockchain/index.ts b/src/blockchain/index.ts index 229ed838..9aa2d408 100644 --- a/src/blockchain/index.ts +++ b/src/blockchain/index.ts @@ -8,6 +8,7 @@ import { loggingFactory } from '../logger' import { NEW_BLOCK_EVENT_NAME, NewBlockEmitterService } from './new-block-emitters' import { getNewBlockEmitter } from './utils' import { waitForReadyApp } from '../utils' +import { ReorgEmitterService } from './reorg-emitter' const logger = loggingFactory('blockchain') @@ -20,6 +21,7 @@ export function ethFactory (): Eth { const CONFIRMATION_CHANNEL = 'confirmations' const NEW_BLOCK_EMITTER_CHANNEL = 'new-block' +const REORG_CHANNEL = 'reorg' function channelSetup (app: Application): void { if (typeof app.channel !== 'function') { @@ -29,9 +31,11 @@ function channelSetup (app: Application): void { app.on('connection', (connection: any) => { app.channel(CONFIRMATION_CHANNEL).join(connection) app.channel(NEW_BLOCK_EMITTER_CHANNEL).join(connection) + app.channel(REORG_CHANNEL).join(connection) }) app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CONFIRMATION_CHANNEL)) app.service(ServiceAddresses.NEW_BLOCK_EMITTER).publish(() => app.channel(NEW_BLOCK_EMITTER_CHANNEL)) + app.service(ServiceAddresses.REORG_EMITTER).publish(() => app.channel(REORG_CHANNEL)) } function subscribeAndEmitNewBlocks (app: Application): void { @@ -55,6 +59,7 @@ export default async function (app: Application): Promise { app.set('eth', eth) app.use(ServiceAddresses.CONFIRMATIONS, new ConfirmatorService(eth)) app.use(ServiceAddresses.NEW_BLOCK_EMITTER, new NewBlockEmitterService()) + app.use(ServiceAddresses.REORG_EMITTER, new ReorgEmitterService()) subscribeAndEmitNewBlocks(app) channelSetup(app) diff --git a/src/blockchain/reorg-emitter.ts b/src/blockchain/reorg-emitter.ts new file mode 100644 index 00000000..ba6e51b9 --- /dev/null +++ b/src/blockchain/reorg-emitter.ts @@ -0,0 +1,45 @@ +import { ServiceMethods } from '@feathersjs/feathers' + +import { loggingFactory } from '../logger' +import { REORG_OUT_OF_RANGE_EVENT_NAME } from './events' + +const logger = loggingFactory('blockchain:reorg-service') +const DEFAULT_DEBOUNCE_TIME = 5000 + +export class ReorgEmitterService implements Partial> { + private readonly debounceTime: number + private reorgContract: string[] = [] + private lastProcessedBlockNumber = 0 + private timeoutStarted = false + emit?: Function + events: string[] + + constructor (debounceTime?: number) { + this.debounceTime = debounceTime || DEFAULT_DEBOUNCE_TIME + this.events = [REORG_OUT_OF_RANGE_EVENT_NAME] + } + + // eslint-disable-next-line require-await + async get (): Promise { + return Promise.resolve() + } + + emitReorg (lastProcessedBlockNumber: number, contractName: string): void { + if (!this.emit) { + throw new Error('ReorgEmitterService invalid setup. Missing \'emit\' function') + } + + if (!this.timeoutStarted) { + setTimeout(() => { + logger.warn(`Reorg outside of confirmation range happens on block number ${lastProcessedBlockNumber} for [${this.reorgContract}] contracts`) + this.emit!(REORG_OUT_OF_RANGE_EVENT_NAME, { contracts: this.reorgContract, lastProcessedBlockNumber: this.lastProcessedBlockNumber }) + this.reorgContract = [] + this.lastProcessedBlockNumber = 0 + }, this.debounceTime) + this.timeoutStarted = true + } + + this.reorgContract = [...this.reorgContract, contractName] + this.lastProcessedBlockNumber = lastProcessedBlockNumber + } +} diff --git a/src/blockchain/utils.ts b/src/blockchain/utils.ts index 905b05f6..88401f8a 100644 --- a/src/blockchain/utils.ts +++ b/src/blockchain/utils.ts @@ -1,11 +1,10 @@ import { AbiItem, keccak256 } from 'web3-utils' import Eth from 'web3-eth' -import { EventEmitter } from 'events' import config from 'config' import { getObject } from 'sequelize-store' import { loggingFactory } from '../logger' -import eventsEmitterFactory, { EventsEmitterOptions } from './events' +import eventsEmitterFactory, { BaseEventsEmitter, EventsEmitterOptions } from './events' import { NewBlockEmitterOptions } from '../definitions' import { BlockTracker } from './block-tracker' import { AutoStartStopEventEmitter, ListeningNewBlockEmitter, PollingNewBlockEmitter } from './new-block-emitters' @@ -29,7 +28,7 @@ export function isServiceInitialized (serviceName: string): boolean { return blockTracker.getLastFetchedBlock()[0] !== undefined } -export function getEventsEmitterForService (serviceName: string, eth: Eth, contractAbi: AbiItem[]): EventEmitter { +export function getEventsEmitterForService (serviceName: string, eth: Eth, contractAbi: AbiItem[]): BaseEventsEmitter { const contractAddresses = config.get(`${serviceName}.contractAddress`) const contract = new eth.Contract(contractAbi, contractAddresses) const logger = loggingFactory(`${serviceName}:blockchain`) diff --git a/src/cli/start.ts b/src/cli/start.ts index 944c153f..003c4ce8 100644 --- a/src/cli/start.ts +++ b/src/cli/start.ts @@ -5,8 +5,8 @@ import { appFactory, services } from '../app' import { loggingFactory } from '../logger' import { Flags, Config, SupportedServices, isSupportedServices } from '../definitions' import { BaseCLICommand } from '../utils' -import { sequelizeFactory } from '../sequelize' -import Event from '../blockchain/event.model' +import DbBackUpJob from '../db-backup' +import { ethFactory } from '../blockchain' const logger = loggingFactory('cli:start') @@ -26,10 +26,6 @@ ${formattedServices}` port: flags.integer({ char: 'p', description: 'port to attach the server to' }), db: flags.string({ description: 'database connection URI', env: 'RIFM_DB' }), provider: flags.string({ description: 'blockchain provider connection URI', env: 'RIFM_PROVIDER' }), - purge: flags.boolean({ - char: 'u', - description: 'will purge services that should be lunched (eq. enable/disable is applied)' - }), enable: flags.string({ char: 'e', multiple: true, description: 'enable specific service' }), disable: flags.string({ char: 'd', multiple: true, description: 'disable specific service' }) } @@ -84,17 +80,12 @@ ${formattedServices}` return output } - private async purge (): Promise { - const toBePurgedServices = (Object.keys(services) as Array) + public async precache () { + const toBePrecache = (Object.keys(services) as Array) .filter(service => config.get(`${service}.enabled`)) - - logger.info(`Purging services: ${toBePurgedServices.join(', ')}`) - await Promise.all( - toBePurgedServices.map((service) => services[service].purge()) + toBePrecache.map((service) => services[service].precache(ethFactory())) ) - - await Event.destroy({ where: {}, truncate: true, cascade: true }) } async run (): Promise { @@ -103,23 +94,57 @@ ${formattedServices}` const configOverrides = this.buildConfigObject(flags) config.util.extendDeep(config, configOverrides) - if (flags.purge) { - sequelizeFactory() - await this.purge() - } - - const app = await appFactory() - const port = config.get('port') - const server = app.listen(port) - - process.on('unhandledRejection', err => - logger.error(`Unhandled Rejection at: ${err}`) - ) - - server.on('listening', () => - this.log('Server started on port %d', port) - ) + const backUpJob = new DbBackUpJob(ethFactory()) + // An infinite loop which you can exit only with SIGINT/SIGKILL + while (true) { + let stopCallback: () => void = () => { + throw new Error('No stop callback was assigned!') + } - return Promise.resolve() + // Run backup job + backUpJob.run() + + // Promise that resolves when reset callback is called + const resetPromise = new Promise(resolve => { + appFactory({ + appResetCallBack: () => resolve() + }).then(({ app, stop }) => { + // Lets save the function that stops the app + stopCallback = stop + + // Start server + const port = config.get('port') + const server = app.listen(port) + + server.on('listening', () => + logger.info(`Server started on port ${port}`) + ) + + process.on('unhandledRejection', err => + logger.error(`Unhandled Rejection at: ${err}`) + ) + }) + }) + + // Let see if we have to restart the app at some point most probably because + // reorgs outside of confirmation range. + await resetPromise + + logger.warn('Reorg detected outside of confirmation range. Rebuilding the service\'s state!') + logger.info('Stopping the app') + stopCallback() + backUpJob.stop() + + // Restore DB from backup + await backUpJob.restoreDb().catch(e => { + // TODO send notification to devops + logger.error(e) + }) + + // Run pre-cache + await this.precache() + + logger.info('Restarting the app') + } } } diff --git a/src/db-backup.ts b/src/db-backup.ts new file mode 100644 index 00000000..ffd76768 --- /dev/null +++ b/src/db-backup.ts @@ -0,0 +1,115 @@ +import fs from 'fs' +import config from 'config' +import path from 'path' +import BigNumber from 'bignumber.js' +import { BlockHeader, Eth } from 'web3-eth' + +import { AutoStartStopEventEmitter, NEW_BLOCK_EVENT_NAME } from './blockchain/new-block-emitters' +import { DbBackUpConfig } from './definitions' +import { getNewBlockEmitter } from './blockchain/utils' + +export type BackUpEntry = { name: string, block: { hash: string, number: BigNumber } } + +export function parseBackUps (backUpName: string): BackUpEntry { + const [block] = backUpName.split('.')[0].split('-') + const [hash, blockNumber] = block.split(':') + + return { + name: backUpName, + block: { number: new BigNumber(blockNumber), hash } + } +} + +export function getBackUps (): BackUpEntry[] { + const backupConfig = config.get('dbBackUp') + + const backups = fs.readdirSync(path.resolve(backupConfig.path)) + + if (backups.length) { + return backups + .map(parseBackUps) + .sort( + (a: Record, b: Record) => + a.block.number.gt(b.block.number) ? -1 : 1 + ) + } + + return [] +} + +export class DbBackUpJob { + readonly newBlockEmitter: AutoStartStopEventEmitter + readonly db: string + readonly eth: Eth + readonly backUpConfig: DbBackUpConfig + + constructor (eth: Eth) { + if (!config.has('dbBackUp')) { + throw new Error('DB Backup config not exist') + } + this.backUpConfig = config.get('dbBackUp') + this.db = config.get('db') + + if (!fs.existsSync(path.resolve(this.backUpConfig.path))) { + fs.mkdirSync(path.resolve(this.backUpConfig.path)) + } + + this.eth = eth + this.newBlockEmitter = getNewBlockEmitter(eth) + } + + /** + * Back-up database if blocks condition met + * @return {Promise} + * @param block + */ + private async backupDb (block: BlockHeader): Promise { + const [lastBackUp, previousBackUp] = getBackUps() + + if (!lastBackUp || new BigNumber(block.number).minus(this.backUpConfig.blocks).gte(lastBackUp.block.number)) { + // copy and rename current db + await fs.promises.copyFile(this.db, path.resolve(this.backUpConfig.path, `${block.hash}:${block.number}-${this.db}`)) + + // remove the oldest version + if (previousBackUp) { + await fs.promises.unlink(path.resolve(this.backUpConfig.path, previousBackUp.name)) + } + } + } + + /** + * Restore database backup + * @return {Promise} + */ + public async restoreDb (): Promise { + const backUps = getBackUps() + const [_, oldest] = backUps + + if (backUps.length < 2) { + throw new Error('Should be two backups to be able to restore') + } + + // Check if back up block hash exist after reorg + const block = await this.eth.getBlock(oldest.block.hash).catch(() => false) + + if (!block) { + throw new Error('Invalid backup. Block Hash is not valid!') + } + + // remove current db + await fs.promises.unlink(this.db) + + // restore backup + await fs.promises.copyFile(path.resolve(this.backUpConfig.path, oldest.name), path.resolve(process.cwd(), this.db)) + } + + public run (): void { + this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupDb.bind(this)) + } + + public stop (): void { + this.newBlockEmitter.stop() + } +} + +export default DbBackUpJob diff --git a/src/definitions.ts b/src/definitions.ts index 0bca6994..85fccc9b 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -9,6 +9,7 @@ import type { RatesService } from './services/rates' import type { RnsBaseService } from './services/rns' import { ConfirmatorService } from './blockchain/confirmator' import { NewBlockEmitterService } from './blockchain/new-block-emitters' +import { ReorgEmitterService } from './blockchain/reorg-emitter' export enum SupportedServices { STORAGE = 'storage', @@ -28,7 +29,8 @@ export enum ServiceAddresses { STORAGE_AGREEMENTS = '/storage/v0/agreements', XR = '/rates/v0/', CONFIRMATIONS = '/confirmations', - NEW_BLOCK_EMITTER = '/new-block' + NEW_BLOCK_EMITTER = '/new-block', + REORG_EMITTER = '/reorg' } // A mapping of service names to types. Will be extended in service files. @@ -41,6 +43,7 @@ interface ServiceTypes { [ServiceAddresses.RNS_OFFERS]: RnsBaseService & ServiceAddons [ServiceAddresses.CONFIRMATIONS]: ConfirmatorService & ServiceAddons [ServiceAddresses.NEW_BLOCK_EMITTER]: NewBlockEmitterService & ServiceAddons + [ServiceAddresses.REORG_EMITTER]: ReorgEmitterService & ServiceAddons } // The application instance type that will be used everywhere else @@ -49,7 +52,7 @@ export type Application = ExpressFeathers; export interface CachedService { precache (eth?: Eth): Promise purge (): Promise - initialize (app: Application): Promise + initialize (app: Application): Promise<{ stop: () => void }> } export enum RatesProvider { @@ -107,6 +110,11 @@ export interface BlockchainServiceOptions { newBlockEmitter?: NewBlockEmitterOptions } +export interface DbBackUpConfig { + blocks: number + path: string +} + export interface Config { host?: string port?: number @@ -114,6 +122,9 @@ export interface Config { // DB URI to connect to database db?: string + // DB backup config + dbBackUp?: DbBackUpConfig + log?: { level?: string filter?: string diff --git a/src/index.ts b/src/index.ts index 90becdcf..be44c66c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,19 @@ -import { loggingFactory } from './logger' import { appFactory } from './app' import config from 'config' +import { loggingFactory } from './logger' (async function (): Promise { const logger = loggingFactory() - const app = await appFactory() + const { app } = await appFactory({ appResetCallBack: () => { throw new Error('Reset callback not implemented') } }) + // Start server const port = config.get('port') const server = app.listen(port) - process.on('unhandledRejection', err => - logger.error(`Unhandled Rejection at: ${err}`) + server.on('listening', () => + logger.info(`Server started on port ${port}`) ) - server.on('listening', () => - logger.info('Feathers application started on port %d', port) + process.on('unhandledRejection', err => + logger.error(`Unhandled Rejection at: ${err}`) ) })() diff --git a/src/sequelize.ts b/src/sequelize.ts index 48e6d0bb..f63656bf 100644 --- a/src/sequelize.ts +++ b/src/sequelize.ts @@ -31,7 +31,7 @@ export function sequelizeFactory (): Sequelize { transactionType: 'IMMEDIATE' } - return new Sequelize(config.get('db'), dbSettings) + return new Sequelize(`sqlite:${config.get('db')}`, dbSettings) } export function BigNumberStringType (propName: string): Partial { diff --git a/src/services/rates/index.ts b/src/services/rates/index.ts index 03c21e01..8fa120d3 100644 --- a/src/services/rates/index.ts +++ b/src/services/rates/index.ts @@ -16,10 +16,10 @@ const logger = loggingFactory(SERVICE_NAME) const storage: CachedService = { // eslint-disable-next-line require-await - async initialize (app: Application): Promise { + async initialize (app: Application): Promise<{ stop: () => void }> { if (!config.get('rates.enabled')) { logger.info('Rates service: disabled') - return + return { stop: () => undefined } } logger.info('Rates service: enabled') @@ -30,7 +30,9 @@ const storage: CachedService = { // Start periodical refresh const updatePeriod = config.get(CONFIG_UPDATE_PERIOD) * 1000 // Converting seconds to ms - setInterval(() => updater().catch(logger.error), updatePeriod) + const intervalId = setInterval(() => updater().catch(logger.error), updatePeriod) + + return { stop: () => clearInterval(intervalId) } }, async purge (): Promise { diff --git a/src/services/rns/index.ts b/src/services/rns/index.ts index 94fe1e7e..4bdc0ab3 100644 --- a/src/services/rns/index.ts +++ b/src/services/rns/index.ts @@ -10,6 +10,7 @@ import type { EventData } from 'web3-eth-contract' import type { AbiItem } from 'web3-utils' import { ethFactory } from '../../blockchain' import { getEventsEmitterForService, isServiceInitialized } from '../../blockchain/utils' +import { REORG_OUT_OF_RANGE_EVENT_NAME } from '../../blockchain/events' import { ServiceAddresses } from '../../definitions' import type { Application, CachedService } from '../../definitions' import { loggingFactory } from '../../logger' @@ -88,10 +89,10 @@ async function precache (eth?: Eth): Promise { const rns: CachedService = { // eslint-disable-next-line require-await - async initialize (app: Application): Promise { + async initialize (app: Application): Promise<{ stop: () => void }> { if (!config.get('rns.enabled')) { logger.info('RNS service: disabled') - return Promise.resolve() + return { stop: () => undefined } } logger.info('RNS service: enabled') @@ -102,6 +103,8 @@ const rns: CachedService = { app.use(ServiceAddresses.RNS_SOLD, new RnsBaseService({ Model: SoldDomain })) app.use(ServiceAddresses.RNS_OFFERS, new RnsBaseService({ Model: DomainOffer })) + const reorgEmitterService = app.service(ServiceAddresses.REORG_EMITTER) + const domains = app.service(ServiceAddresses.RNS_DOMAINS) const sold = app.service(ServiceAddresses.RNS_SOLD) const offers = app.service(ServiceAddresses.RNS_OFFERS) @@ -128,6 +131,7 @@ const rns: CachedService = { }) rnsEventsEmitter.on('newConfirmation', (data) => confirmationService.emit('newConfirmation', data)) rnsEventsEmitter.on('invalidConfirmation', (data) => confirmationService.emit('invalidConfirmation', data)) + rnsEventsEmitter.on(REORG_OUT_OF_RANGE_EVENT_NAME, (blockNumber: number) => reorgEmitterService.emitReorg(blockNumber, 'rns.owner')) const rnsReverseEventsEmitter = getEventsEmitterForService('rns.reverse', eth, rnsReverseContractAbi.abi as AbiItem[]) rnsReverseEventsEmitter.on('newEvent', errorHandler(eventProcessor(loggingFactory('rns.reverse:processor'), eth, services), logger)) @@ -136,6 +140,7 @@ const rns: CachedService = { }) rnsReverseEventsEmitter.on('newConfirmation', (data) => confirmationService.emit('newConfirmation', data)) rnsReverseEventsEmitter.on('invalidConfirmation', (data) => confirmationService.emit('invalidConfirmation', data)) + rnsReverseEventsEmitter.on(REORG_OUT_OF_RANGE_EVENT_NAME, (blockNumber: number) => reorgEmitterService.emitReorg(blockNumber, 'rns.reverse')) const rnsPlacementsEventsEmitter = getEventsEmitterForService('rns.placement', eth, simplePlacementsContractAbi.abi as AbiItem[]) rnsPlacementsEventsEmitter.on('newEvent', errorHandler(eventProcessor(loggingFactory('rns.placement:processor'), eth, services), logger)) @@ -144,8 +149,16 @@ const rns: CachedService = { }) rnsPlacementsEventsEmitter.on('newConfirmation', (data) => confirmationService.emit('newConfirmation', data)) rnsPlacementsEventsEmitter.on('invalidConfirmation', (data) => confirmationService.emit('invalidConfirmation', data)) - - return Promise.resolve() + rnsPlacementsEventsEmitter.on(REORG_OUT_OF_RANGE_EVENT_NAME, (blockNumber: number) => reorgEmitterService.emitReorg(blockNumber, 'rns.placement')) + + return { + stop: () => { + rnsPlacementsEventsEmitter.stop() + rnsReverseEventsEmitter.stop() + rnsEventsEmitter.stop() + confirmationService.removeAllListeners() + } + } }, precache, diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 357a2878..706d141b 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -8,6 +8,7 @@ import { AbiItem } from 'web3-utils' import { ethFactory } from '../../blockchain' import { getEventsEmitterForService, isServiceInitialized } from '../../blockchain/utils' +import { REORG_OUT_OF_RANGE_EVENT_NAME } from '../../blockchain/events' import { Application, CachedService, ServiceAddresses } from '../../definitions' import { loggingFactory } from '../../logger' import { errorHandler, waitForReadyApp } from '../../utils' @@ -72,10 +73,10 @@ function precache (possibleEth?: Eth): Promise { } const storage: CachedService = { - async initialize (app: Application): Promise { + async initialize (app: Application): Promise<{ stop: () => void }> { if (!config.get('storage.enabled')) { logger.info('Storage service: disabled') - return + return { stop: () => undefined } } logger.info('Storage service: enabled') @@ -95,6 +96,8 @@ const storage: CachedService = { app.configure(storageChannels) + const reorgEmitterService = app.service(ServiceAddresses.REORG_EMITTER) + // We require services to be precached before running server if (!isServiceInitialized(SERVICE_NAME)) { return logger.critical('Storage service is not initialized! Run precache command.') @@ -110,6 +113,14 @@ const storage: CachedService = { }) eventsEmitter.on('newConfirmation', (data) => confirmationService.emit('newConfirmation', data)) eventsEmitter.on('invalidConfirmation', (data) => confirmationService.emit('invalidConfirmation', data)) + eventsEmitter.on(REORG_OUT_OF_RANGE_EVENT_NAME, (blockNumber: number) => reorgEmitterService.emitReorg(blockNumber, 'storage')) + + return { + stop: () => { + confirmationService.removeAllListeners() + eventsEmitter.stop() + } + } }, async purge (): Promise { diff --git a/src/utils.ts b/src/utils.ts index e642f833..34cb7d35 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,14 +1,20 @@ import { Command, flags } from '@oclif/command' import { Input, OutputFlags } from '@oclif/parser' -import { readFile as readFileCb } from 'fs' import { promisify } from 'util' import config from 'config' +import fs from 'fs' import { hexToAscii } from 'web3-utils' import BigNumber from 'bignumber.js' -import { Application, Config, isSupportedServices, Logger, SupportedServices } from './definitions' +import { + Application, + Config, + isSupportedServices, + Logger, + SupportedServices +} from './definitions' -const readFile = promisify(readFileCb) +const readFile = promisify(fs.readFile) /** * Bignumber.js utils functions @@ -115,7 +121,7 @@ export function validateServices (args: string[], onlyEnabledForAll = false): Su * @param fn * @param logger */ -export function errorHandler (fn: (...args: any[]) => Promise, logger: Logger): (...args: any[]) => Promise { +export function errorHandler (fn: (...args: any[]) => Promise, logger: Logger): (...args: any[]) => Promise { return (...args) => { return fn(...args).catch(err => logger.error(err)) } @@ -166,12 +172,12 @@ export abstract class BaseCLICommand extends Command { ) } - async loadConfig (path?: string): Promise { - if (!path) { + async loadConfig (configPath?: string): Promise { + if (!configPath) { return {} } - const data = await readFile(path, 'utf-8') + const data = await readFile(configPath, 'utf-8') return JSON.parse(data) as Config } diff --git a/test/cli.spec.ts b/test/cli.spec.ts new file mode 100644 index 00000000..fc6057a0 --- /dev/null +++ b/test/cli.spec.ts @@ -0,0 +1,111 @@ +import sinon from 'sinon' +import config from 'config' +import path from 'path' +import * as AppModule from '../src/app' +import chai from 'chai' + +import RnsService from '../src/services/rns' +import StorageService from '../src/services/storage' +import { unlinkSync, copyFileSync, mkdirSync } from 'fs' +import { sequelizeFactory } from '../src/sequelize' +import { initStore } from '../src/store' +import { getEndPromise } from 'sequelize-store' +import StartCommand from '../src/cli/start' +import { rmDir, sleep } from './utils' +import sinonChai from 'sinon-chai' +import * as blockchainUtils from '../src/blockchain' +import { Eth } from 'web3-eth' +import Rate from '../src/services/rates/rates.model' +import { Application, DbBackUpConfig } from '../src/definitions' + +chai.use(sinonChai) +const expect = chai.expect + +describe('CLI', function () { + this.timeout(10000) + after(() => { + sinon.reset() + rmDir(config.get('dbBackUp').path) + }) + before(async () => { + // Prepare DB and set it to be used + const dbPath = path.join(__dirname, '..', '..', config.get('db')) + try { + unlinkSync(dbPath) + } catch (e) { + // Ignore "not found" errors + if (e.code !== 'ENOENT') { + throw e + } + } + + // Init the DB + const sequelize = sequelizeFactory() + // const migration = new Migration(sequelize) + // await migration.up() + await sequelize.sync({ force: true }) + await initStore(sequelize) + await getEndPromise() + }) + + it('should restart when appResetCallback is triggered', async () => { + // create backups + const db = config.get('db') + const backupPath = path.resolve(process.cwd(), config.get('dbBackUp.path')) + mkdirSync(config.get('dbBackUp').path) + copyFileSync(db, path.resolve(backupPath, `0x0123:10-${db}`)) + copyFileSync(db, path.resolve(backupPath, `0x0123:20-${db}`)) + + // Create fake rate + await Rate.create({ + token: '123456789012345', + usd: 1, + eur: 1, + btc: 1, + ars: 1, + cny: 1, + krw: 1, + jpy: 1 + }) + + expect(await Rate.findByPk(123456789012345)).to.be.ok() + + // Mock the dependencies + let appResetCallback = (() => { + throw new Error('AppResetCallback was not assigned!') + }) as () => void + const stopSpy = sinon.spy() + const initAppStub = sinon.stub(AppModule, 'appFactory') + initAppStub.callsFake((options: AppModule.AppOptions): Promise<{ app: Application, stop: () => void }> => { + appResetCallback = options.appResetCallBack + + return Promise.resolve({ app: {} as Application, stop: stopSpy }) + }) + sinon.stub(blockchainUtils, 'ethFactory').returns({ + getBlock: sinon.stub().resolves(true) + } as unknown as Eth) + const rnsPrecacheStub = sinon.stub(RnsService, 'precache').resolves() + const storagePrecacheStub = sinon.stub(StorageService, 'precache').resolves() + + // Launches the Daemon + // @ts-ignore + StartCommand.run([]).then(() => null, (e) => expect.fail(e)) + + await sleep(3000) + // + appResetCallback() // Trigger reset + // + await sleep(3000) + + // Precache called + expect(rnsPrecacheStub.called).to.be.true() + expect(storagePrecacheStub.called).to.be.true() + // restart connection after db restored + sequelizeFactory() + // db restored + expect(await Rate.findByPk('123456789012345')).to.be.eql(null) + + rmDir(backupPath) + sinon.reset() + }) +}) diff --git a/test/db-backup-restore.spec.ts b/test/db-backup-restore.spec.ts new file mode 100644 index 00000000..ae420110 --- /dev/null +++ b/test/db-backup-restore.spec.ts @@ -0,0 +1,196 @@ +import path from 'path' +import fs from 'fs' +import config from 'config' +import chai from 'chai' +import sinonChai from 'sinon-chai' +import { Eth } from 'web3-eth' +import { Substitute, Arg } from '@fluffy-spoon/substitute' + +import DbBackUpJob from '../src/db-backup' +import { blockMock, rmDir, sleep } from './utils' +import { NEW_BLOCK_EVENT_NAME } from '../src/blockchain/new-block-emitters' +import { DbBackUpConfig } from '../src/definitions' + +chai.use(sinonChai) +const expect = chai.expect + +describe('DB back-up/restore', function () { + const configBackUp = { db: config.get('db'), dbBackUp: config.get('dbBackUp') } + + describe('Restore', () => { + before(() => fs.writeFileSync(config.get('db'), 'Initial DB')) + after(() => fs.unlinkSync(config.get('db'))) + afterEach(() => rmDir(config.get('dbBackUp').path)) + + it('should throw if not enough backups', async () => { + const eth = Substitute.for() + const backupJob = new DbBackUpJob(eth) + + expect(fs.readdirSync(config.get('dbBackUp').path).length).to.be.eql(0) + await expect(backupJob.restoreDb()).to.eventually.be.rejectedWith( + Error, + 'Should be two backups to be able to restore' + ) + }) + it('should throw if backup block hash is not exist after reorg', async () => { + const eth = Substitute.for() + const backupPath = config.get('dbBackUp').path + const db = config.get('db') + + eth.getBlock(Arg.all()).rejects(new Error('Not found')) + const backupJob = new DbBackUpJob(eth) + fs.writeFileSync(path.resolve(backupPath, `0x0123:10-${db}`), 'First ') + fs.writeFileSync(path.resolve(backupPath, `0x0123:20-${db}`), 'Second') + + expect(fs.readdirSync(backupPath).length).to.be.eql(2) + await expect(backupJob.restoreDb()).to.eventually.be.rejectedWith( + Error, + 'Invalid backup. Block Hash is not valid!' + ) + }) + it('should restore database', async () => { + const eth = Substitute.for() + const backupPath = config.get('dbBackUp').path + const db = config.get('db') + + const backupJob = new DbBackUpJob(eth) + eth.getBlock('0x0123').resolves(blockMock(10)) + + fs.writeFileSync(path.resolve(backupPath, `0x0123:10-${db}`), 'First db backup') + fs.writeFileSync(path.resolve(backupPath, `0x01234:20-${db}`), 'Second db backup') + + expect(fs.readdirSync(backupPath).length).to.be.eql(2) + await backupJob.restoreDb() + await sleep(1000) + + expect(fs.readFileSync(db).toString()).to.be.eql('First db backup') + }) + }) + + describe('Back Up Job', () => { + before(() => fs.writeFileSync(config.get('db'), 'Initial DB')) + after(() => fs.unlinkSync(config.get('db'))) + afterEach(() => rmDir(config.get('dbBackUp').path)) + + it('should throw error if "dbBackUp" not in config', () => { + // @ts-ignore + config.util.extendDeep(config, { dbBackUp: undefined }) + + expect(config.has('dbBackUp')).to.be.false() + + expect(() => new DbBackUpJob(Substitute.for())).to.throw( + Error, + 'DB Backup config not exist' + ) + + // @ts-ignore + config.util.extendDeep(config, configBackUp) + }) + it('should create back-up folder if not exist', () => { + const dbPath = config.get('dbBackUp.path') + + expect(fs.existsSync(dbPath)).to.be.false() + + const job = new DbBackUpJob(Substitute.for()) + + expect(fs.existsSync(dbPath)).to.be.true() + }) + it('should make backup if not exist', async () => { + const eth = Substitute.for() + eth.getBlock(Arg.all()).returns(Promise.resolve(blockMock(10))) + const backUpPath = config.get('dbBackUp').path + + const job = new DbBackUpJob(eth) + + job.run() + await sleep(300) + + // should have one db back up already + const files = fs.readdirSync(backUpPath) + expect(files.length).to.be.eql(1) + expect(files).to.be.eql([`0x123:${10}-${config.get('db')}`]) + }) + it('should not make backup if blocks condition not met', async () => { + const eth = Substitute.for() + eth.getBlock(Arg.all()).returns(Promise.resolve(blockMock(10))) + const backUpPath = config.get('dbBackUp').path + + const job = new DbBackUpJob(eth) + + job.run() + await sleep(300) + + // should have one db back up already + const files = fs.readdirSync(backUpPath) + expect(files.length).to.be.eql(1) + expect(files).to.be.eql([`0x123:${10}-${config.get('db')}`]) + + // should skip this block as it's not met condition + job.newBlockEmitter.emit(NEW_BLOCK_EVENT_NAME, blockMock(13)) + await sleep(300) + + const files2 = fs.readdirSync(backUpPath) + expect(files2.length).to.be.eql(1) + expect(files2).to.be.eql([`0x123:${10}-${config.get('db')}`]) + }) + it('should add second backup', async () => { + const eth = Substitute.for() + eth.getBlock(Arg.all()).returns(Promise.resolve(blockMock(10))) + const backups = [] + const backUpPath = path.resolve(config.get('dbBackUp').path) + const job = new DbBackUpJob(eth) + + job.run() + await sleep(300) + + // should have one db back up already + const files = fs.readdirSync(backUpPath) + backups.push(`0x123:${10}-${config.get('db')}`) + expect(files.length).to.be.eql(1) + expect(files).to.be.eql(backups) + + // should add another backe up + job.newBlockEmitter.emit(NEW_BLOCK_EVENT_NAME, blockMock(30)) + await sleep(300) + + const files2 = fs.readdirSync(backUpPath) + backups.push(`0x123:${30}-${config.get('db')}`) + expect(files2.length).to.be.eql(2) + expect(files2).to.be.eql(backups) + }) + it('should replace oldest back-up with fresh one', async () => { + const eth = Substitute.for() + eth.getBlock(Arg.all()).returns(Promise.resolve(blockMock(10))) + const backups = [] + const backUpPath = config.get('dbBackUp').path + const job = new DbBackUpJob(eth) + + job.run() + await sleep(300) + + // should have one db back up already + const files = fs.readdirSync(backUpPath) + backups.push(`0x123:${10}-${config.get('db')}`) + expect(files.length).to.be.eql(1) + expect(files).to.be.eql(backups) + + // should add another backe up + job.newBlockEmitter.emit(NEW_BLOCK_EVENT_NAME, blockMock(30)) + await sleep(300) + + const files2 = fs.readdirSync(backUpPath) + backups.push(`0x123:${30}-${config.get('db')}`) + expect(files2.length).to.be.eql(2) + expect(files2).to.be.eql(backups) + + // should replace the oldest backup with fresh one + job.newBlockEmitter.emit(NEW_BLOCK_EVENT_NAME, blockMock(45)) + await sleep(300) + + const files3 = fs.readdirSync(backUpPath) + backups.push(`0x123:${45}-${config.get('db')}`) + expect(files3.length).to.be.eql(2) + expect(files3).to.be.eql(backups.slice(1)) + }) + }) +}) diff --git a/test/utils.ts b/test/utils.ts index 1dd20985..842d2cee 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -1,6 +1,11 @@ import { BlockHeader, BlockTransactionString, TransactionReceipt } from 'web3-eth' import { Substitute } from '@fluffy-spoon/substitute' import { EventData } from 'web3-eth-contract' +import fs from 'fs' +import path from 'path' +import config from 'config' + +import { DbBackUpConfig } from '../src/definitions' export function sleep (ms: number, ...args: T[]): Promise { return new Promise(resolve => setTimeout(() => resolve(...args), ms)) @@ -70,3 +75,13 @@ export function blockMock (blockNumber: number, blockHash = '0x123', options: Pa block.hash.returns!(blockHash) return block } + +export function rmDir (folder: string) { + if (fs.existsSync(folder)) { + for (const file of fs.readdirSync(folder)) { + fs.unlinkSync(path.join(folder, file)) + } + + fs.rmdirSync(config.get('dbBackUp').path, { recursive: true }) + } +}