Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

feat: reorgs outside of confirmations range handling #273

Merged
merged 27 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
55be7e6
chore: draft for backup/restore of db
nduchak Aug 21, 2020
e1fcb73
chore: add stop method for rate service
nduchak Aug 21, 2020
626cec7
chore: use async fs methods for copy/rename/delete in db restore/backup
nduchak Aug 21, 2020
cb236cf
chore: fix linter
nduchak Aug 21, 2020
4ff6cfd
feat: use bignumber for db backup
nduchak Aug 24, 2020
0699b59
feat: fix get emitters config method
nduchak Aug 24, 2020
ca21a7c
feat: finish backup tests
nduchak Aug 24, 2020
0a88d6a
feat: add restore db tests
nduchak Aug 24, 2020
466b556
chore: fix lint
nduchak Aug 24, 2020
df64345
chore: small adjustment
nduchak Aug 25, 2020
127dc78
chore: fix some code smells
nduchak Aug 25, 2020
754b2bc
feat: add basic reorg emitter service
nduchak Aug 26, 2020
b6badf2
feat: add timeout for emitting reorgs
nduchak Aug 26, 2020
efca52b
feat: add reorg handler to app.js
nduchak Aug 26, 2020
ee097d7
feat: precache after db recovery
nduchak Aug 27, 2020
9f825d2
feat: rename event name for reorg-emitter to reorg-out-of-range
nduchak Aug 27, 2020
c6b3120
feat: add cli start reset tests
nduchak Aug 28, 2020
7b76fd4
chore: test ci
nduchak Aug 28, 2020
3133aa0
chore: fix linter
nduchak Aug 28, 2020
e17acb1
chore: add config mutation env var to circleci
nduchak Aug 28, 2020
5adcd90
chore: fix index file
nduchak Aug 31, 2020
fea1cd8
chore: increase timeout for CLI test
nduchak Aug 31, 2020
3e7e4e9
chore: add db back-up to default config
nduchak Sep 1, 2020
eb11972
chore: polish configs
nduchak Sep 1, 2020
05d6918
chore: fix pr comments
nduchak Sep 4, 2020
8ff476f
chore: fix tests
nduchak Sep 4, 2020
13e1a7c
feat: fix most of pr comments
nduchak Sep 7, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ jobs:
type: string
docker:
- image: circleci/node:<< parameters.version >>
environment:
ALLOW_CONFIG_MUTATIONS: true
steps:
- checkout
- run:
Expand Down
2 changes: 1 addition & 1 deletion .tasegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module.exports = {
},
depCheck: {
ignore: [
'tasegir', 'reflect-metadata', '@types/*', 'sqlite3', '@oclif/*',
'cross-env', 'tasegir', 'reflect-metadata', '@types/*', 'sqlite3', '@oclif/*',
]
}
}
5 changes: 4 additions & 1 deletion config/default.json5
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// For full syntax see /src/types.ts::Config interface
{
db: 'sqlite:db.sqlite',
db: 'db.sqlite',

// DB back up config
dbBackUp: { blocks: 10, path: 'db-backups' },
nduchak marked this conversation as resolved.
Show resolved Hide resolved

// CORS setting, please consult https://expressjs.com/en/resources/middleware/cors.html for more details
cors: {
Expand Down
2 changes: 1 addition & 1 deletion config/test.json5
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
db: 'sqlite://db_test.sqlite',
db: 'db_test.sqlite',
blockchain: {
provider: 'ws://localhost:8545',
waitBlockCountBeforeConfirmationRemoved: 10
Expand Down
52 changes: 52 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"types-check": "tasegir types-check",
"lint": "tasegir lint",
"release": "tasegir release",
"test": "tasegir test --target node",
"test": "cross-env ALLOW_CONFIG_MUTATIONS=true tasegir test --target node",
"start": "tasegir run --watch ./src/index.ts"
},
"dependencies": {
Expand Down Expand Up @@ -113,6 +113,7 @@
"@types/validator": "^13.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"cross-env": "^7.0.2",
"dirty-chai": "^2.0.1",
"sinon": "^9.0.1",
"sinon-chai": "^3.5.0",
Expand Down
37 changes: 32 additions & 5 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import feathers from '@feathersjs/feathers'
import express from '@feathersjs/express'
import socketio from '@feathersjs/socketio'

import { Application, SupportedServices } from './definitions'
import { Application, ServiceAddresses, SupportedServices } from './definitions'
import { loggingFactory } from './logger'
import sequelize from './sequelize'
import blockchain from './blockchain'
Expand All @@ -20,6 +20,7 @@ import authentication from './services/authentication'
import storage from './services/storage'
import rates from './services/rates'
import rns from './services/rns'
import { REORG_OUT_OF_RANGE_EVENT_NAME } from './blockchain/events'

const logger = loggingFactory()

Expand All @@ -29,7 +30,9 @@ export const services = {
[SupportedServices.RNS]: rns
}

export async function appFactory (): Promise<Application> {
export type AppOptions = { appResetCallBack: (...args: any) => void }

export async function startApp (options: AppOptions): Promise<{ stop: () => void }> {
const app: Application = express(feathers())

logger.verbose('Current configuration: ', config)
Expand Down Expand Up @@ -58,13 +61,13 @@ export async function appFactory (): Promise<Application> {
/**********************************************************/
// Configure each services

const servicePromises: Promise<void>[] = []
const servicePromises: Promise<{ stop: () => void }>[] = []
for (const service of Object.values(services)) {
app.configure((app) => servicePromises.push(errorHandler(service.initialize, logger)(app)))
}

// Wait for services to initialize
await Promise.all(servicePromises)
const servicesInstances = await Promise.all(servicePromises)

// Log errors in hooks
app.hooks({
Expand All @@ -77,5 +80,29 @@ export async function appFactory (): Promise<Application> {
app.use(express.notFound())
app.use(express.errorHandler({ logger }))

return app
// Start server
const port = config.get('port')
const server = app.listen(port)

server.on('listening', () =>
logger.info(`Server started on port ${port}`)
)

process.on('unhandledRejection', err =>
logger.error(`Unhandled Rejection at: ${err}`)
)

// Subscribe for reorg event
const reorgService = app.service(ServiceAddresses.REORG_EMITTER)
reorgService.on(REORG_OUT_OF_RANGE_EVENT_NAME, () => {
// wait 5 seconds to be sure that reorg event received by connected services
setTimeout(() => options.appResetCallBack(), 5000)
})

return {
stop: () => {
server.close()
servicesInstances.forEach(service => service.stop())
}
}
}
5 changes: 5 additions & 0 deletions src/blockchain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { loggingFactory } from '../logger'
import { NEW_BLOCK_EVENT_NAME, NewBlockEmitterService } from './new-block-emitters'
import { getNewBlockEmitter } from './utils'
import { waitForReadyApp } from '../utils'
import { ReorgEmitterService } from './reorg-emitter'

const logger = loggingFactory('blockchain')

Expand All @@ -20,6 +21,7 @@ export function ethFactory (): Eth {

const CONFIRMATION_CHANNEL = 'confirmations'
const NEW_BLOCK_EMITTER_CHANNEL = 'new-block'
const REORG_CHANNEL = 'reorg'

function channelSetup (app: Application): void {
if (typeof app.channel !== 'function') {
Expand All @@ -29,9 +31,11 @@ function channelSetup (app: Application): void {
app.on('connection', (connection: any) => {
app.channel(CONFIRMATION_CHANNEL).join(connection)
app.channel(NEW_BLOCK_EMITTER_CHANNEL).join(connection)
app.channel(REORG_CHANNEL).join(connection)
})
app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CONFIRMATION_CHANNEL))
app.service(ServiceAddresses.NEW_BLOCK_EMITTER).publish(() => app.channel(NEW_BLOCK_EMITTER_CHANNEL))
app.service(ServiceAddresses.REORG_EMITTER).publish(() => app.channel(REORG_CHANNEL))
}

function subscribeAndEmitNewBlocks (app: Application): void {
Expand All @@ -55,6 +59,7 @@ export default async function (app: Application): Promise<void> {
app.set('eth', eth)
app.use(ServiceAddresses.CONFIRMATIONS, new ConfirmatorService(eth))
app.use(ServiceAddresses.NEW_BLOCK_EMITTER, new NewBlockEmitterService())
app.use(ServiceAddresses.REORG_EMITTER, new ReorgEmitterService())

subscribeAndEmitNewBlocks(app)
channelSetup(app)
Expand Down
45 changes: 45 additions & 0 deletions src/blockchain/reorg-emitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { ServiceMethods } from '@feathersjs/feathers'

import { loggingFactory } from '../logger'
import { REORG_OUT_OF_RANGE_EVENT_NAME } from './events'

const logger = loggingFactory('blockchain:reorg-service')
const DEFAULT_DEBOUNCE_TIME = 5000

export class ReorgEmitterService implements Partial<ServiceMethods<any>> {
private readonly debounceTime: number
private reorgContract: string[] = []
private lastProcessedBlockNumber = 0
private timeoutStarted = false
emit?: Function
events: string[]

constructor (debounceTime?: number) {
this.debounceTime = debounceTime || DEFAULT_DEBOUNCE_TIME
this.events = [REORG_OUT_OF_RANGE_EVENT_NAME]
}

// eslint-disable-next-line require-await
async get (): Promise<void> {
return Promise.resolve()
}

emitReorg (lastProcessedBlockNumber: number, contractName: string): void {
nduchak marked this conversation as resolved.
Show resolved Hide resolved
if (!this.emit) {
throw new Error('ReorgEmitterService invalid setup. Missing \'emit\' function')
}

if (!this.timeoutStarted) {
setTimeout(() => {
logger.warn(`Reorg outside of confirmation range happens on block number ${lastProcessedBlockNumber} for [${this.reorgContract}] contracts`)
this.emit!(REORG_OUT_OF_RANGE_EVENT_NAME, { contracts: this.reorgContract, lastProcessedBlockNumber: this.lastProcessedBlockNumber })
this.reorgContract = []
this.lastProcessedBlockNumber = 0
}, this.debounceTime)
this.timeoutStarted = true
}

this.reorgContract = [...this.reorgContract, contractName]
this.lastProcessedBlockNumber = lastProcessedBlockNumber
}
}
5 changes: 2 additions & 3 deletions src/blockchain/utils.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { AbiItem, keccak256 } from 'web3-utils'
import Eth from 'web3-eth'
import { EventEmitter } from 'events'
import config from 'config'
import { getObject } from 'sequelize-store'

import { loggingFactory } from '../logger'
import eventsEmitterFactory, { EventsEmitterOptions } from './events'
import eventsEmitterFactory, { BaseEventsEmitter, EventsEmitterOptions } from './events'
import { NewBlockEmitterOptions } from '../definitions'
import { BlockTracker } from './block-tracker'
import { AutoStartStopEventEmitter, ListeningNewBlockEmitter, PollingNewBlockEmitter } from './new-block-emitters'
Expand All @@ -29,7 +28,7 @@ export function isServiceInitialized (serviceName: string): boolean {
return blockTracker.getLastFetchedBlock()[0] !== undefined
}

export function getEventsEmitterForService (serviceName: string, eth: Eth, contractAbi: AbiItem[]): EventEmitter {
export function getEventsEmitterForService (serviceName: string, eth: Eth, contractAbi: AbiItem[]): BaseEventsEmitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the BaseEvemtsEmitter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the stop method from BaseEvemtsEmitter

const contractAddresses = config.get<string>(`${serviceName}.contractAddress`)
const contract = new eth.Contract(contractAbi, contractAddresses)
const logger = loggingFactory(`${serviceName}:blockchain`)
Expand Down
68 changes: 39 additions & 29 deletions src/cli/start.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import config from 'config'
import { flags } from '@oclif/command'

import { appFactory, services } from '../app'
import { startApp, services } from '../app'
import { loggingFactory } from '../logger'
import { Flags, Config, SupportedServices, isSupportedServices } from '../definitions'
import { BaseCLICommand } from '../utils'
import { sequelizeFactory } from '../sequelize'
import Event from '../blockchain/event.model'
import DbBackUpJob from '../db-backup'
import { ethFactory } from '../blockchain'

const logger = loggingFactory('cli:start')

Expand All @@ -26,10 +26,6 @@ ${formattedServices}`
port: flags.integer({ char: 'p', description: 'port to attach the server to' }),
db: flags.string({ description: 'database connection URI', env: 'RIFM_DB' }),
provider: flags.string({ description: 'blockchain provider connection URI', env: 'RIFM_PROVIDER' }),
purge: flags.boolean({
char: 'u',
description: 'will purge services that should be lunched (eq. enable/disable is applied)'
}),
enable: flags.string({ char: 'e', multiple: true, description: 'enable specific service' }),
disable: flags.string({ char: 'd', multiple: true, description: 'disable specific service' })
}
Expand Down Expand Up @@ -84,17 +80,12 @@ ${formattedServices}`
return output
}

private async purge (): Promise<void> {
const toBePurgedServices = (Object.keys(services) as Array<keyof typeof services>)
public async precache () {
const toBePrecache = (Object.keys(services) as Array<keyof typeof services>)
.filter(service => config.get<boolean>(`${service}.enabled`))

logger.info(`Purging services: ${toBePurgedServices.join(', ')}`)

await Promise.all(
toBePurgedServices.map((service) => services[service].purge())
toBePrecache.map((service) => services[service].precache(ethFactory()))
)

await Event.destroy({ where: {}, truncate: true, cascade: true })
}

async run (): Promise<void> {
Expand All @@ -103,23 +94,42 @@ ${formattedServices}`
const configOverrides = this.buildConfigObject(flags)
config.util.extendDeep(config, configOverrides)

if (flags.purge) {
nduchak marked this conversation as resolved.
Show resolved Hide resolved
sequelizeFactory()
await this.purge()
}
const backUpJob = new DbBackUpJob(ethFactory())
// An infinite loop which you can exit only with SIGINT/SIGKILL
while (true) {
let stopCallback: () => void = () => {
throw new Error('No stop callback was assigned!')
}

const app = await appFactory()
const port = config.get('port')
const server = app.listen(port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I don't really like moving the server start&listening logic to the appFactory(). I don't actually think that it has to be there right? What was your motivation here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this shape it is not a "factory" anymore...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make the same as we had it in pinner, as we use the same algorithm for reorgs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmmm. I see you renamed it to startApp. I am still against this refactor. While I understand your motivation, the Pinner and Cache are different things, using different setups (Cache uses Feathers and it is a Web Server, Pinner is a specific custom-built daemon service). Yes, we use the same mechanism for handling reorgs, but the context is a bit different. You can still use this "algorithm" even with the original approach of appFactory()...

I think that bootstrapping the Feather's app and actually starting the listening on a port should be kept separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back

// Run backup job
backUpJob.run()

process.on('unhandledRejection', err =>
logger.error(`Unhandled Rejection at: ${err}`)
)
// Promise that resolves when reset callback is called
const resetPromise = new Promise(resolve => {
startApp({
appResetCallBack: () => resolve()
}).then(value => {
// Lets save the function that stops the app
stopCallback = value.stop
})
})

server.on('listening', () =>
this.log('Server started on port %d', port)
)
// Let see if we have to restart the app at some point most probably because
// reorgs outside of confirmation range.
await resetPromise

logger.warn('Reorg detected outside of confirmation range. Rebuilding the service\'s state!')
logger.info('Stopping service')
nduchak marked this conversation as resolved.
Show resolved Hide resolved
stopCallback()
backUpJob.stop()

// Restore DB from backup
await backUpJob.restoreDb()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm so if the restore procedure fails for some reason then Cache craches, right? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's true. For that case, we should notify DevOps that the restore process fails


return Promise.resolve()
// Run pre-cache
await this.precache()

logger.info('Restarting the app')
}
}
}
Loading