Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

Commit

Permalink
chore: add stop method for rate service
Browse files Browse the repository at this point in the history
refactor db-backup job
  • Loading branch information
nduchak committed Aug 21, 2020
1 parent 55be7e6 commit e1fcb73
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 46 deletions.
5 changes: 3 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ export async function appFactory (options: AppOptions): Promise<{ stop: () => vo
/**********************************************************/
// Configure each services

const servicePromises: Promise<void>[] = []
const servicePromises: Promise<{ stop: () => void }>[] = []
for (const service of Object.values(services)) {
app.configure((app) => servicePromises.push(errorHandler(service.initialize, logger)(app)))
}

// Wait for services to initialize
await Promise.all(servicePromises)
const servicesInstances = await Promise.all(servicePromises)

// Log errors in hooks
app.hooks({
Expand Down Expand Up @@ -97,6 +97,7 @@ export async function appFactory (options: AppOptions): Promise<{ stop: () => vo
return {
stop: () => {
server.close()
servicesInstances.forEach(service => service.stop())
}
}
}
5 changes: 3 additions & 2 deletions src/cli/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { appFactory, services } from '../app'
import { loggingFactory } from '../logger'
import { Flags, Config, SupportedServices, isSupportedServices } from '../definitions'
import { BaseCLICommand, DbBackUpJob, restoreDb } from '../utils'
import { sequelizeFactory } from '../sequelize'
import Event from '../blockchain/event.model'
import { getNewBlockEmitter } from '../blockchain/utils'
import { ethFactory } from '../blockchain'

const logger = loggingFactory('cli:start')

Expand Down Expand Up @@ -108,7 +109,7 @@ ${formattedServices}`
let stopCallback = (() => { throw new Error('No stop callback was assigned!') }) as () => void

// Run backup job
const backUpJob = new DbBackUpJob()
const backUpJob = new DbBackUpJob(getNewBlockEmitter(ethFactory()))
backUpJob.run()

// Promise that resolves when reset callback is called
Expand Down
8 changes: 8 additions & 0 deletions src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,21 @@ export interface BlockchainServiceOptions {
newBlockEmitter?: NewBlockEmitterOptions
}

export interface DbBackUpConfig {
blocks: number
path: string
}

export interface Config {
host?: string
port?: number

// DB URI to connect to database
db?: string

// DB backup config
dbBackUp?: DbBackUpConfig

log?: {
level?: string
filter?: string
Expand Down
8 changes: 5 additions & 3 deletions src/services/rates/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ const logger = loggingFactory(SERVICE_NAME)

const storage: CachedService = {
// eslint-disable-next-line require-await
async initialize (app: Application): Promise<void> {
async initialize (app: Application): Promise<{ stop: () => void }> {
if (!config.get<boolean>('rates.enabled')) {
logger.info('Rates service: disabled')
return
return { stop: () => undefined }
}
logger.info('Rates service: enabled')

Expand All @@ -30,7 +30,9 @@ const storage: CachedService = {

// Start periodical refresh
const updatePeriod = config.get<number>(CONFIG_UPDATE_PERIOD) * 1000 // Converting seconds to ms
setInterval(() => updater().catch(logger.error), updatePeriod)
const intervalId = setInterval(() => updater().catch(logger.error), updatePeriod)

return { stop: () => clearInterval(intervalId) }
},

async purge (): Promise<void> {
Expand Down
106 changes: 67 additions & 39 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ import { hexToAscii } from 'web3-utils'
import BigNumber from 'bignumber.js'
import { BlockHeader } from 'web3-eth'

import { Application, Config, isSupportedServices, Logger, SupportedServices } from './definitions'
import { getNewBlockEmitter } from './blockchain/utils'
import {
Application,
Config,
isSupportedServices,
Logger,
SupportedServices,
EventsEmitterOptions,
DbBackUpConfig
} from './definitions'
import { ethFactory } from './blockchain'
import { AutoStartStopEventEmitter, NEW_BLOCK_EVENT_NAME } from './blockchain/new-block-emitters'
import { services } from './app'
Expand Down Expand Up @@ -216,9 +223,9 @@ function parseBackUps (backUpName: string): BackUpEntry {
}

function getBackUps (): BackUpEntry[] {
const backupConfig = config.get('dbBackUp') as { path: string }
const backupConfig = config.get<DbBackUpConfig>('dbBackUp')

const backups = fs.readdirSync(path.resolve(__dirname, '../../' + backupConfig.path))
const backups = fs.readdirSync(path.resolve(__dirname, '../' + backupConfig.path))

if (backups.length) {
return backups
Expand All @@ -232,54 +239,22 @@ function getBackUps (): BackUpEntry[] {
return []
}

export class DbBackUpJob {
private newBlockEmitter: AutoStartStopEventEmitter

constructor () {
this.newBlockEmitter = getNewBlockEmitter(ethFactory())
}

private backupHandler (block: BlockHeader): void {
const db = config.get<string>('db')
const backupConfig = config.get('dbBackUp') as { path: string, blocks: number }
const [lastBackUp, previousBackUp] = getBackUps()

if (block.number - backupConfig.blocks >= lastBackUp.block.number) {
// copy and rename current db
fs.copyFileSync(db, backupConfig.path)
fs.renameSync(path.resolve(backupConfig.path, db), path.resolve(backupConfig.path, `${block.hash}:${block.number}-${db}`))

// remove the oldest version
if (previousBackUp) fs.unlinkSync(path.resolve(backupConfig.path, previousBackUp.name))
}
}

public run (): void {
const newBlockEmitter = getNewBlockEmitter(ethFactory())
newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupHandler)
}

public stop (): void {
this.newBlockEmitter.stop()
}
}

export async function restoreDb (): Promise<void> {
const db = config.get<string>('db')
const backupConfig = config.get<{ path: string }>('dbBackUp')
const backupConfig = config.get<DbBackUpConfig>('dbBackUp')

const eth = ethFactory()
const backUps = getBackUps()
const [latest, previousBackUp] = backUps

if (backUps.length < 2) {
// TODO Notify devOps
// TODO Notify devOps
throw new Error('Back up not exist')
}

// Check if back up last processed block hash is valid
if (!(await eth.getBlock(backUps[1].block.hash))) {
// TODO Notify devOps
// TODO Notify devOps
throw new Error('Invalid backup. Hash is not exist!')
}

Expand All @@ -297,3 +272,56 @@ export async function restoreDb (): Promise<void> {
toBePrecache.map((service) => services[service].precache(eth))
)
}

export class DbBackUpJob {
private newBlockEmitter: AutoStartStopEventEmitter
private backUpConfig: DbBackUpConfig

constructor (newBlockEmitter: AutoStartStopEventEmitter) {
if (!config.has('dbBackUp')) throw new Error('DB Backup config not exist')
this.backUpConfig = config.get<{ blocks: number, path: string }>('dbBackUp')

const eventEmittersConfirmations = this.getEventEmittersConfigs()
const invalidConfirmation = eventEmittersConfirmations.find(c => c.config.confirmations && c.config.confirmations > this.backUpConfig.blocks)

if (invalidConfirmation) {
throw new Error(`Invalid db backup configuration. Number of backup blocks should be greater then confirmation blocks for ${invalidConfirmation.name} service`)
}

this.newBlockEmitter = newBlockEmitter
}

private backupHandler (block: BlockHeader): void {
const db = config.get<string>('db')
const backupConfig = config.get('dbBackUp') as { path: string, blocks: number }
const [lastBackUp, previousBackUp] = getBackUps()

if (block.number - backupConfig.blocks >= lastBackUp.block.number) {
// copy and rename current db
fs.copyFileSync(db, backupConfig.path)
fs.renameSync(path.resolve(backupConfig.path, db), path.resolve(backupConfig.path, `${block.hash}:${block.number}-${db}`))

// remove the oldest version
if (previousBackUp) fs.unlinkSync(path.resolve(backupConfig.path, previousBackUp.name))
}
}

private getEventEmittersConfigs (): { config: EventsEmitterOptions, name: string }[] {
return Object.values(SupportedServices)
.reduce((acc: { config: EventsEmitterOptions, name: string }[], serviceName: string) => {
if (config.has(`${serviceName}.eventsEmitter`)) {
const emitterConfig = config.get<EventsEmitterOptions>(`${serviceName}.eventsEmitter`)
return [...acc, { config: emitterConfig, name: serviceName }]
}
return acc
}, [])
}

public run (): void {
this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.backupHandler)
}

public stop (): void {
this.newBlockEmitter.stop()
}
}

0 comments on commit e1fcb73

Please sign in to comment.