diff --git a/config/default.json5 b/config/default.json5 index e5e8e161..be4ef32b 100644 --- a/config/default.json5 +++ b/config/default.json5 @@ -15,7 +15,13 @@ // it is removed from database. // Such parameter is needed for a REST API where a host could miss that an event has // full confirmations as it could be removed from the DB before the endpoint is queried. - waitBlockCountBeforeConfirmationRemoved: 20 + waitBlockCountBeforeConfirmationRemoved: 20, + + // Specify behavior of NewBlockEmitter, that detects new blocks on blockchain. + newBlockEmitter: { + // If to use polling strategy, if false then listening is used. + polling: true + } }, log: { diff --git a/src/app.ts b/src/app.ts index 309934b5..5908ed6d 100644 --- a/src/app.ts +++ b/src/app.ts @@ -51,8 +51,8 @@ export async function appFactory (): Promise { // Custom general services app.configure(sequelize) - app.configure(blockchain) app.configure(configureStore) + app.configure(blockchain) app.configure(healthcheck) /**********************************************************/ diff --git a/src/blockchain/index.ts b/src/blockchain/index.ts index ef989938..229ed838 100644 --- a/src/blockchain/index.ts +++ b/src/blockchain/index.ts @@ -1,9 +1,13 @@ -import Eth from 'web3-eth' import config from 'config' +import Eth, { BlockHeader } from 'web3-eth' +import { getObject } from 'sequelize-store' import { ConfirmatorService } from './confirmator' import { Application, ServiceAddresses } from '../definitions' import { loggingFactory } from '../logger' +import { NEW_BLOCK_EVENT_NAME, NewBlockEmitterService } from './new-block-emitters' +import { getNewBlockEmitter } from './utils' +import { waitForReadyApp } from '../utils' const logger = loggingFactory('blockchain') @@ -14,7 +18,8 @@ export function ethFactory (): Eth { return new Eth(provider) } -const CHANNEL_NAME = 'confirmations' +const CONFIRMATION_CHANNEL = 'confirmations' +const NEW_BLOCK_EMITTER_CHANNEL = 'new-block' function channelSetup (app: Application): void { if (typeof app.channel !== 'function') { @@ -22,15 +27,35 @@ function channelSetup (app: Application): void { return } app.on('connection', (connection: any) => { - app.channel(CHANNEL_NAME).join(connection) + app.channel(CONFIRMATION_CHANNEL).join(connection) + app.channel(NEW_BLOCK_EMITTER_CHANNEL).join(connection) }) - app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CHANNEL_NAME)) + app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CONFIRMATION_CHANNEL)) + app.service(ServiceAddresses.NEW_BLOCK_EMITTER).publish(() => app.channel(NEW_BLOCK_EMITTER_CHANNEL)) } -export default function (app: Application): void { +function subscribeAndEmitNewBlocks (app: Application): void { + const eth = app.get('eth') + const store = getObject() + const newBlockEmitter = getNewBlockEmitter(eth) + const newBlockEmitterService = app.service(ServiceAddresses.NEW_BLOCK_EMITTER) + + // Subscribe for new blocks + newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, (block: BlockHeader) => { + logger.debug('New block: ', block) + newBlockEmitterService?.emit(NEW_BLOCK_EVENT_NAME, block) + store['blockchain.lastFetchedBlock'] = block + }) +} + +export default async function (app: Application): Promise { + await waitForReadyApp(app) const eth = ethFactory() + app.set('eth', eth) app.use(ServiceAddresses.CONFIRMATIONS, new ConfirmatorService(eth)) + app.use(ServiceAddresses.NEW_BLOCK_EMITTER, new NewBlockEmitterService()) + subscribeAndEmitNewBlocks(app) channelSetup(app) } diff --git a/src/blockchain/new-block-emitters.ts b/src/blockchain/new-block-emitters.ts index c42d66c5..dba6a37c 100644 --- a/src/blockchain/new-block-emitters.ts +++ b/src/blockchain/new-block-emitters.ts @@ -1,8 +1,11 @@ import { BlockHeader, Eth } from 'web3-eth' -import { loggingFactory } from '../logger' import { Subscription } from 'web3-core-subscriptions' import { EventEmitter } from 'events' -import { Logger } from '../definitions' +import { getObject } from 'sequelize-store' +import { ServiceMethods } from '@feathersjs/feathers' + +import { loggingFactory } from '../logger' +import type { Logger } from '../definitions' const DEFAULT_POLLING_INTERVAL = 5000 export const NEW_BLOCK_EVENT_NAME = 'newBlock' @@ -127,3 +130,21 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter { this.subscription?.unsubscribe(error => { this.logger.error(error) }) } } + +export class NewBlockEmitterService implements Partial> { + emit?: Function + events: string[] + + constructor () { + this.events = [NEW_BLOCK_EVENT_NAME] + } + + // eslint-disable-next-line require-await + async find (): Promise { + return this.getLastBlock() + } + + getLastBlock (): BlockHeader { + return getObject()['blockchain.lastFetchedBlock'] as BlockHeader + } +} diff --git a/src/blockchain/utils.ts b/src/blockchain/utils.ts index abe89248..74a08fc6 100644 --- a/src/blockchain/utils.ts +++ b/src/blockchain/utils.ts @@ -8,6 +8,7 @@ import { loggingFactory } from '../logger' import eventsEmitterFactory, { EventsEmitterOptions } from './events' import { NewBlockEmitterOptions } from '../definitions' import { BlockTracker } from './block-tracker' +import { AutoStartStopEventEmitter, ListeningNewBlockEmitter, PollingNewBlockEmitter } from './new-block-emitters' function getBlockTracker (keyPrefix?: string): BlockTracker { const store = getObject(keyPrefix) @@ -44,3 +45,13 @@ export function getEventsEmitterForService (serviceName: string, eth: Eth, contr return eventsEmitterFactory(eth, contract, eventsToListen, options) } + +export function getNewBlockEmitter (eth: Eth): AutoStartStopEventEmitter { + const newBlockEmitterOptions = config.get('blockchain.newBlockEmitter') + + if (newBlockEmitterOptions.polling) { + return new PollingNewBlockEmitter(eth, newBlockEmitterOptions.pollingInterval) + } else { + return new ListeningNewBlockEmitter(eth) + } +} diff --git a/src/definitions.ts b/src/definitions.ts index 85414a07..0425f165 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -8,6 +8,7 @@ import type { AgreementService, OfferService } from './services/storage' import type { RatesService } from './services/rates' import type { RnsBaseService } from './services/rns' import { ConfirmatorService } from './blockchain/confirmator' +import { NewBlockEmitterService } from './blockchain/new-block-emitters' export enum SupportedServices { STORAGE = 'storage', @@ -26,7 +27,8 @@ export enum ServiceAddresses { STORAGE_OFFERS = '/storage/v0/offers', STORAGE_AGREEMENTS = '/storage/v0/agreements', XR = '/rates/v0/', - CONFIRMATIONS = '/confirmations' + CONFIRMATIONS = '/confirmations', + NEW_BLOCK_EMITTER = '/new-block' } // A mapping of service names to types. Will be extended in service files. @@ -38,6 +40,7 @@ interface ServiceTypes { [ServiceAddresses.RNS_SOLD]: RnsBaseService & ServiceAddons [ServiceAddresses.RNS_OFFERS]: RnsBaseService & ServiceAddons [ServiceAddresses.CONFIRMATIONS]: ConfirmatorService & ServiceAddons + [ServiceAddresses.NEW_BLOCK_EMITTER]: NewBlockEmitterService & ServiceAddons } // The application instance type that will be used everywhere else diff --git a/src/store.ts b/src/store.ts index 58a71844..d200006e 100644 --- a/src/store.ts +++ b/src/store.ts @@ -4,6 +4,7 @@ import type { Sequelize } from 'sequelize' export function initStore (sequelize: Sequelize): Promise { return init(sequelize, { + 'blockchain.lastFetchedBlock': 'json', 'storage.lastFetchedBlockNumber': 'int', 'storage.lastFetchedBlockHash': 'string', 'storage.lastProcessedBlockNumber': 'int',