Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

Commit

Permalink
feat: tracking last processed and fetch blocks with their hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
AuHau committed Jun 19, 2020
1 parent a9931d2 commit c3d3ad2
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 154 deletions.
105 changes: 65 additions & 40 deletions src/blockchain/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ const DEFAULT_POLLING_INTERVAL = 5000
const NEW_EVENT_EVENT_NAME = 'newEvent'
const INIT_FINISHED_EVENT_NAME = 'initFinished'
const NEW_BLOCK_EVENT_NAME = 'newBlock'
const PROCESSED_BLOCK_KEY = 'lastProcessedBlock'
const LAST_FETCHED_BLOCK_NUMBER_KEY = 'lastFetchedBlockNumber'
const LAST_FETCHED_BLOCK_HASH_KEY = 'lastFetchedBlockHash'
const LAST_PROCESSED_BLOCK_NUMBER_KEY = 'lastProcessedBlockNumber'
const LAST_PROCESSED_BLOCK_HASH_KEY = 'lastProcessedBlockHash'

export interface BlockTrackerStore {
[PROCESSED_BLOCK_KEY]?: number
[LAST_FETCHED_BLOCK_NUMBER_KEY]?: number
[LAST_FETCHED_BLOCK_HASH_KEY]?: string
[LAST_PROCESSED_BLOCK_NUMBER_KEY]?: number
[LAST_PROCESSED_BLOCK_HASH_KEY]?: string
}

export enum EventsEmitterStrategy {
Expand Down Expand Up @@ -62,12 +68,26 @@ export class BlockTracker {
this.store = store
}

setLastProcessedBlock (block: number): void {
this.store[PROCESSED_BLOCK_KEY] = block
setLastFetchedBlock (blockNumber: number, blockHash: string): void {
this.store[LAST_FETCHED_BLOCK_HASH_KEY] = blockHash
this.store[LAST_FETCHED_BLOCK_NUMBER_KEY] = blockNumber
}

getLastProcessedBlock (): number | undefined {
return this.store[PROCESSED_BLOCK_KEY]
getLastFetchedBlock (): [number?, string?] {
return [this.store[LAST_FETCHED_BLOCK_NUMBER_KEY], this.store[LAST_FETCHED_BLOCK_HASH_KEY]]
}

setLastProcessedBlockIfHigher (blockNumber: number, blockHash: string): void {
if ((this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY] || -1) > blockNumber) {
return
}

this.store[LAST_PROCESSED_BLOCK_HASH_KEY] = blockHash
this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY] = blockNumber
}

getLastProcessedBlock (): [number?, string?] {
return [this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY], this.store[LAST_PROCESSED_BLOCK_HASH_KEY]]
}
}

Expand Down Expand Up @@ -125,12 +145,12 @@ export class PollingNewBlockEmitter extends AutoStartStopEventEmitter {

private async fetchLastBlockNumber (): Promise<void> {
try {
const currentLastBlockNumber = await this.eth.getBlockNumber()
const lastBlock = await this.eth.getBlock('latest')

if (this.lastBlockNumber !== currentLastBlockNumber) {
this.lastBlockNumber = currentLastBlockNumber
this.logger.verbose(`New block ${currentLastBlockNumber}`)
this.emit(NEW_BLOCK_EVENT_NAME, currentLastBlockNumber)
if (this.lastBlockNumber !== lastBlock.number) {
this.lastBlockNumber = lastBlock.number
this.logger.verbose(`New block with number ${lastBlock.number} with hash ${lastBlock.hash}`)
this.emit(NEW_BLOCK_EVENT_NAME, lastBlock)
}
} catch (e) {
this.logger.error(`While fetching latest block error happend: ${e}`)
Expand Down Expand Up @@ -167,9 +187,9 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter {
async start (): Promise<void> {
try {
// Emit block number right away
const currentLastBlockNumber = await this.eth.getBlockNumber()
this.logger.info(`Current block ${currentLastBlockNumber}`)
this.emit(NEW_BLOCK_EVENT_NAME, currentLastBlockNumber)
const lastBlock = await this.eth.getBlock('latest')
this.logger.verbose(`Current block with number ${lastBlock.number} with hash ${lastBlock.hash}`)
this.emit(NEW_BLOCK_EVENT_NAME, lastBlock)

this.subscription = this.eth.subscribe('newBlockHeaders', (error, blockHeader) => {
if (error) {
Expand All @@ -178,8 +198,8 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter {
return
}

this.logger.verbose(`New block ${blockHeader.number}`)
this.emit(NEW_BLOCK_EVENT_NAME, blockHeader.number)
this.logger.verbose(`New block with number ${lastBlock.number} with hash ${lastBlock.hash}`)
this.emit(NEW_BLOCK_EVENT_NAME, blockHeader)
})
} catch (e) {
this.logger.error(e)
Expand Down Expand Up @@ -250,7 +270,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
* Specifically when this caching service is first launched this it will process past events.
*/
async init (): Promise<void> {
if (this.blockTracker.getLastProcessedBlock() === undefined) {
if (this.blockTracker.getLastFetchedBlock()[0] === undefined) {
const from = this.startingBlock
await this.processPastEvents(from, 'latest')
}
Expand All @@ -267,15 +287,15 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
this.startEvents()

if (this.confirmations > 0) {
this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.confirmEvents.bind(this))
this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.confirmationRoutine.bind(this))
}
}

stop (): void {
this.stopEvents()

if (this.confirmations > 0) {
this.newBlockEmitter.off(NEW_BLOCK_EVENT_NAME, this.confirmEvents.bind(this))
this.newBlockEmitter.off(NEW_BLOCK_EVENT_NAME, this.confirmationRoutine.bind(this))
}
}

Expand All @@ -294,20 +314,20 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
*
* Before emitting it validates that the Event is still valid on blockchain using the transaction's receipt.
*
* @param currentBlockNumber
* @param currentBlock
*/
private async confirmEvents (currentBlockNumber: number): Promise<void> {
private async confirmationRoutine (currentBlock: BlockHeader): Promise<void> {
try {
const dbEvents = await Event.findAll({
where: {
blockNumber: { [Op.lte]: currentBlockNumber - this.confirmations },
blockNumber: { [Op.lte]: currentBlock.number - this.confirmations },
event: this.events,
emitted: false
}
})

const ethEvents = dbEvents.map(event => JSON.parse(event.content)) as EventData[]
ethEvents.forEach(this.emitEvent.bind(this))
ethEvents.forEach(this.confirmEvent.bind(this))
this.logger.info(`Confirmed ${ethEvents.length} events.`)

// Update DB that events were emitted
Expand All @@ -318,6 +338,11 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
}
}

private confirmEvent (data: EventData): void {
this.blockTracker.setLastProcessedBlockIfHigher(data.blockNumber, data.blockHash)
this.emitEvent(data)
}

protected emitEvent (data: EventData): void {
this.logger.debug('Emitting event', [data])
this.emit(NEW_EVENT_EVENT_NAME, data)
Expand Down Expand Up @@ -386,10 +411,10 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
* Main method for processing events. It should be called after retrieving Events from blockchain.
*
* @param events
* @param currentBlock
* @param currentBlockNumber
*/
protected async processEvents (events: EventData | EventData[], currentBlock?: number): Promise<void> {
currentBlock = currentBlock || await this.eth.getBlockNumber()
protected async processEvents (events: EventData | EventData[], currentBlockNumber?: number): Promise<void> {
currentBlockNumber = currentBlockNumber || await this.eth.getBlockNumber()

if (!Array.isArray(events)) {
events = [events]
Expand All @@ -407,7 +432,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
return
}

const thresholdBlock = currentBlock - this.confirmations
const thresholdBlock = currentBlockNumber - this.confirmations
this.logger.verbose(`Threshold block ${thresholdBlock},`)

const eventsToBeConfirmed = events
Expand All @@ -419,7 +444,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
.filter(event => event.blockNumber <= thresholdBlock)
this.logger.info(`${eventsToBeEmitted.length} events to be emitted.`)

eventsToBeEmitted.forEach(this.emitEvent.bind(this))
eventsToBeEmitted.forEach(this.confirmEvent.bind(this))
}

/**
Expand All @@ -431,10 +456,10 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
async processPastEvents (from: number | string, to: number | string): Promise<void> {
await this.semaphore.acquire()
try {
const currentBlock = await this.eth.getBlockNumber()
const currentBlock = await this.eth.getBlock('latest')

if (to === 'latest') {
to = currentBlock
to = currentBlock.number
}

this.logger.info(`=> Processing past events from ${from} to ${to}`)
Expand All @@ -444,8 +469,8 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
toBlock: to
}))

await this.processEvents(events, currentBlock)
this.blockTracker.setLastProcessedBlock(currentBlock)
await this.processEvents(events, currentBlock.number)
this.blockTracker.setLastFetchedBlock(currentBlock.number, currentBlock.hash)

const [secondsLapsed] = process.hrtime(startTime)
this.logger.info(`=> Finished processing past events in ${secondsLapsed}s`)
Expand All @@ -472,28 +497,28 @@ export class PollingEventsEmitter extends BaseEventsEmitter {
super(eth, contract, events, logger, options)
}

async poll (currentBlock: number): Promise<void> {
async poll (currentBlock: BlockHeader): Promise<void> {
await this.semaphore.acquire()
this.logger.verbose(`Received new block number ${currentBlock}`)
try {
const lastProcessedBlock = this.blockTracker.getLastProcessedBlock() as number // undefined is checked in init()
const [lastFetchedBlockNumber, lastFetchedBlockHash] = this.blockTracker.getLastFetchedBlock()

// Nothing new, lets fast-forward
if (lastProcessedBlock === currentBlock) {
if (lastFetchedBlockNumber === currentBlock.number) {
this.logger.verbose('Nothing new to process')
return
}

this.logger.info(`Checking new events between blocks ${lastProcessedBlock}-${currentBlock}`)
this.logger.info(`Checking new events between blocks ${lastFetchedBlockNumber}-${currentBlock}`)
// TODO: Possible to filter-out the events with "topics" property directly from the node
const events = await this.contract.getPastEvents('allEvents', {
fromBlock: lastProcessedBlock + 1, // +1 because both fromBlock and toBlock is "or equal"
toBlock: currentBlock
fromBlock: (lastFetchedBlockNumber as number) + 1, // +1 because both fromBlock and toBlock is "or equal"
toBlock: currentBlock.number
})
this.logger.debug('Received events: ', events)

await this.processEvents(events, currentBlock)
this.blockTracker.setLastProcessedBlock(currentBlock)
await this.processEvents(events, currentBlock.number)
this.blockTracker.setLastFetchedBlock(currentBlock.number, currentBlock.hash)
} catch (e) {
this.logger.error('Error in the processing loop:\n' + JSON.stringify(e, undefined, 2))
} finally {
Expand Down
2 changes: 1 addition & 1 deletion src/blockchain/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function getBlockDate (eth: Eth, blockNumber: number): Promise<Date

export function isServiceInitialized (serviceName: string): boolean {
const blockTracker = getBlockTracker(`${serviceName}.`)
return blockTracker.getLastProcessedBlock() !== undefined
return blockTracker.getLastFetchedBlock() !== undefined
}

export function getEventsEmitterForService (serviceName: string, eth: Eth, contractAbi: AbiItem[]): EventEmitter {
Expand Down
6 changes: 3 additions & 3 deletions src/services/rns/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ const rns: CachedService = {
logger.info(`Removed ${offersCount} offers entries, ${soldCount} sold domains, ${transferCount} transfers and ${domainsCount} domains`)

const store = getObject()
delete store['rns.placement.lastProcessedBlock']
delete store['rns.reverse.lastProcessedBlock']
delete store['rns.owner.lastProcessedBlock']
delete store['rns.placement.lastFetchedBlock']
delete store['rns.reverse.lastFetchedBlock']
delete store['rns.owner.lastFetchedBlock']
}

}
Expand Down
2 changes: 1 addition & 1 deletion src/services/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const storage: CachedService = {
logger.info(`Removed ${priceCount} billing plans entries, ${offersCount} offers and ${agreementsCount} agreements`)

const store = getObject()
delete store['storage.lastProcessedBlock']
delete store['storage.lastFetchedBlockNumber']
},

precache
Expand Down
8 changes: 4 additions & 4 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import type { Sequelize } from 'sequelize'

export function initStore (sequelize: Sequelize): Promise<void> {
return init(sequelize, {
'storage.lastProcessedBlock': 'int',
'storage.lastFetchedBlock': 'json',
'rates.lastUpdate': 'int',
'rns.owner.lastProcessedBlock': 'int',
'rns.reverse.lastProcessedBlock': 'int',
'rns.placement.lastProcessedBlock': 'int'
'rns.owner.lastFetchedBlock': 'json',
'rns.reverse.lastFetchedBlock': 'json',
'rns.placement.lastFetchedBlock': 'json'
})
}

Expand Down
Loading

0 comments on commit c3d3ad2

Please sign in to comment.