Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

Commit

Permalink
feat: expose new-block event (#221)
Browse files Browse the repository at this point in the history
* feat: expose new-block event using enw-block-emitter

* chore: fix lint

* feat: expose new-block event using new-block-emitter

* feat: add new-block custom event to new-block-emitter-service

* feat: add to channel

* chore: add separated channel for new-block-emitter event

* feat: implement find method for new-block-emitter service

* feat: store full block object in store
  • Loading branch information
nduchak authored Jul 29, 2020
1 parent ff8e131 commit c7b73db
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
8 changes: 7 additions & 1 deletion config/default.json5
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
// it is removed from database.
// Such parameter is needed for a REST API where a host could miss that an event has
// full confirmations as it could be removed from the DB before the endpoint is queried.
waitBlockCountBeforeConfirmationRemoved: 20
waitBlockCountBeforeConfirmationRemoved: 20,

// Specify behavior of NewBlockEmitter, that detects new blocks on blockchain.
newBlockEmitter: {
// If to use polling strategy, if false then listening is used.
polling: true
}
},

log: {
Expand Down
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ export async function appFactory (): Promise<Application> {

// Custom general services
app.configure(sequelize)
app.configure(blockchain)
app.configure(configureStore)
app.configure(blockchain)
app.configure(healthcheck)

/**********************************************************/
Expand Down
35 changes: 30 additions & 5 deletions src/blockchain/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import Eth from 'web3-eth'
import config from 'config'
import Eth, { BlockHeader } from 'web3-eth'
import { getObject } from 'sequelize-store'

import { ConfirmatorService } from './confirmator'
import { Application, ServiceAddresses } from '../definitions'
import { loggingFactory } from '../logger'
import { NEW_BLOCK_EVENT_NAME, NewBlockEmitterService } from './new-block-emitters'
import { getNewBlockEmitter } from './utils'
import { waitForReadyApp } from '../utils'

const logger = loggingFactory('blockchain')

Expand All @@ -14,23 +18,44 @@ export function ethFactory (): Eth {
return new Eth(provider)
}

const CHANNEL_NAME = 'confirmations'
const CONFIRMATION_CHANNEL = 'confirmations'
const NEW_BLOCK_EMITTER_CHANNEL = 'new-block'

function channelSetup (app: Application): void {
if (typeof app.channel !== 'function') {
// If no real-time functionality has been configured just return
return
}
app.on('connection', (connection: any) => {
app.channel(CHANNEL_NAME).join(connection)
app.channel(CONFIRMATION_CHANNEL).join(connection)
app.channel(NEW_BLOCK_EMITTER_CHANNEL).join(connection)
})
app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CHANNEL_NAME))
app.service(ServiceAddresses.CONFIRMATIONS).publish(() => app.channel(CONFIRMATION_CHANNEL))
app.service(ServiceAddresses.NEW_BLOCK_EMITTER).publish(() => app.channel(NEW_BLOCK_EMITTER_CHANNEL))
}

export default function (app: Application): void {
function subscribeAndEmitNewBlocks (app: Application): void {
const eth = app.get('eth')
const store = getObject()
const newBlockEmitter = getNewBlockEmitter(eth)
const newBlockEmitterService = app.service(ServiceAddresses.NEW_BLOCK_EMITTER)

// Subscribe for new blocks
newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, (block: BlockHeader) => {
logger.debug('New block: ', block)
newBlockEmitterService?.emit(NEW_BLOCK_EVENT_NAME, block)
store['blockchain.lastFetchedBlock'] = block
})
}

export default async function (app: Application): Promise<void> {
await waitForReadyApp(app)
const eth = ethFactory()

app.set('eth', eth)
app.use(ServiceAddresses.CONFIRMATIONS, new ConfirmatorService(eth))
app.use(ServiceAddresses.NEW_BLOCK_EMITTER, new NewBlockEmitterService())

subscribeAndEmitNewBlocks(app)
channelSetup(app)
}
25 changes: 23 additions & 2 deletions src/blockchain/new-block-emitters.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { BlockHeader, Eth } from 'web3-eth'
import { loggingFactory } from '../logger'
import { Subscription } from 'web3-core-subscriptions'
import { EventEmitter } from 'events'
import { Logger } from '../definitions'
import { getObject } from 'sequelize-store'
import { ServiceMethods } from '@feathersjs/feathers'

import { loggingFactory } from '../logger'
import type { Logger } from '../definitions'

const DEFAULT_POLLING_INTERVAL = 5000
export const NEW_BLOCK_EVENT_NAME = 'newBlock'
Expand Down Expand Up @@ -127,3 +130,21 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter {
this.subscription?.unsubscribe(error => { this.logger.error(error) })
}
}

export class NewBlockEmitterService implements Partial<ServiceMethods<any>> {
emit?: Function
events: string[]

constructor () {
this.events = [NEW_BLOCK_EVENT_NAME]
}

// eslint-disable-next-line require-await
async find (): Promise<BlockHeader> {
return this.getLastBlock()
}

getLastBlock (): BlockHeader {
return getObject()['blockchain.lastFetchedBlock'] as BlockHeader
}
}
11 changes: 11 additions & 0 deletions src/blockchain/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { loggingFactory } from '../logger'
import eventsEmitterFactory, { EventsEmitterOptions } from './events'
import { NewBlockEmitterOptions } from '../definitions'
import { BlockTracker } from './block-tracker'
import { AutoStartStopEventEmitter, ListeningNewBlockEmitter, PollingNewBlockEmitter } from './new-block-emitters'

function getBlockTracker (keyPrefix?: string): BlockTracker {
const store = getObject(keyPrefix)
Expand Down Expand Up @@ -44,3 +45,13 @@ export function getEventsEmitterForService (serviceName: string, eth: Eth, contr

return eventsEmitterFactory(eth, contract, eventsToListen, options)
}

export function getNewBlockEmitter (eth: Eth): AutoStartStopEventEmitter {
const newBlockEmitterOptions = config.get<NewBlockEmitterOptions>('blockchain.newBlockEmitter')

if (newBlockEmitterOptions.polling) {
return new PollingNewBlockEmitter(eth, newBlockEmitterOptions.pollingInterval)
} else {
return new ListeningNewBlockEmitter(eth)
}
}
5 changes: 4 additions & 1 deletion src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { AgreementService, OfferService } from './services/storage'
import type { RatesService } from './services/rates'
import type { RnsBaseService } from './services/rns'
import { ConfirmatorService } from './blockchain/confirmator'
import { NewBlockEmitterService } from './blockchain/new-block-emitters'

export enum SupportedServices {
STORAGE = 'storage',
Expand All @@ -26,7 +27,8 @@ export enum ServiceAddresses {
STORAGE_OFFERS = '/storage/v0/offers',
STORAGE_AGREEMENTS = '/storage/v0/agreements',
XR = '/rates/v0/',
CONFIRMATIONS = '/confirmations'
CONFIRMATIONS = '/confirmations',
NEW_BLOCK_EMITTER = '/new-block'
}

// A mapping of service names to types. Will be extended in service files.
Expand All @@ -38,6 +40,7 @@ interface ServiceTypes {
[ServiceAddresses.RNS_SOLD]: RnsBaseService & ServiceAddons<any>
[ServiceAddresses.RNS_OFFERS]: RnsBaseService & ServiceAddons<any>
[ServiceAddresses.CONFIRMATIONS]: ConfirmatorService & ServiceAddons<any>
[ServiceAddresses.NEW_BLOCK_EMITTER]: NewBlockEmitterService & ServiceAddons<any>
}

// The application instance type that will be used everywhere else
Expand Down
1 change: 1 addition & 0 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Sequelize } from 'sequelize'

export function initStore (sequelize: Sequelize): Promise<void> {
return init(sequelize, {
'blockchain.lastFetchedBlock': 'json',
'storage.lastFetchedBlockNumber': 'int',
'storage.lastFetchedBlockHash': 'string',
'storage.lastProcessedBlockNumber': 'int',
Expand Down

0 comments on commit c7b73db

Please sign in to comment.