diff --git a/src/blockchain/events.ts b/src/blockchain/events.ts index 620d021c..7c7a1c8d 100644 --- a/src/blockchain/events.ts +++ b/src/blockchain/events.ts @@ -17,10 +17,16 @@ const DEFAULT_POLLING_INTERVAL = 5000 const NEW_EVENT_EVENT_NAME = 'newEvent' const INIT_FINISHED_EVENT_NAME = 'initFinished' const NEW_BLOCK_EVENT_NAME = 'newBlock' -const PROCESSED_BLOCK_KEY = 'lastProcessedBlock' +const LAST_FETCHED_BLOCK_NUMBER_KEY = 'lastFetchedBlockNumber' +const LAST_FETCHED_BLOCK_HASH_KEY = 'lastFetchedBlockHash' +const LAST_PROCESSED_BLOCK_NUMBER_KEY = 'lastProcessedBlockNumber' +const LAST_PROCESSED_BLOCK_HASH_KEY = 'lastProcessedBlockHash' export interface BlockTrackerStore { - [PROCESSED_BLOCK_KEY]?: number + [LAST_FETCHED_BLOCK_NUMBER_KEY]?: number + [LAST_FETCHED_BLOCK_HASH_KEY]?: string + [LAST_PROCESSED_BLOCK_NUMBER_KEY]?: number + [LAST_PROCESSED_BLOCK_HASH_KEY]?: string } export enum EventsEmitterStrategy { @@ -62,12 +68,26 @@ export class BlockTracker { this.store = store } - setLastProcessedBlock (block: number): void { - this.store[PROCESSED_BLOCK_KEY] = block + setLastFetchedBlock (blockNumber: number, blockHash: string): void { + this.store[LAST_FETCHED_BLOCK_HASH_KEY] = blockHash + this.store[LAST_FETCHED_BLOCK_NUMBER_KEY] = blockNumber } - getLastProcessedBlock (): number | undefined { - return this.store[PROCESSED_BLOCK_KEY] + getLastFetchedBlock (): [number?, string?] { + return [this.store[LAST_FETCHED_BLOCK_NUMBER_KEY], this.store[LAST_FETCHED_BLOCK_HASH_KEY]] + } + + setLastProcessedBlockIfHigher (blockNumber: number, blockHash: string): void { + if ((this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY] || -1) > blockNumber) { + return + } + + this.store[LAST_PROCESSED_BLOCK_HASH_KEY] = blockHash + this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY] = blockNumber + } + + getLastProcessedBlock (): [number?, string?] { + return [this.store[LAST_PROCESSED_BLOCK_NUMBER_KEY], this.store[LAST_PROCESSED_BLOCK_HASH_KEY]] } } @@ -125,12 +145,12 @@ export class PollingNewBlockEmitter extends AutoStartStopEventEmitter { private async fetchLastBlockNumber (): Promise { try { - const currentLastBlockNumber = await this.eth.getBlockNumber() + const lastBlock = await this.eth.getBlock('latest') - if (this.lastBlockNumber !== currentLastBlockNumber) { - this.lastBlockNumber = currentLastBlockNumber - this.logger.verbose(`New block ${currentLastBlockNumber}`) - this.emit(NEW_BLOCK_EVENT_NAME, currentLastBlockNumber) + if (this.lastBlockNumber !== lastBlock.number) { + this.lastBlockNumber = lastBlock.number + this.logger.verbose(`New block with number ${lastBlock.number} with hash ${lastBlock.hash}`) + this.emit(NEW_BLOCK_EVENT_NAME, lastBlock) } } catch (e) { this.logger.error(`While fetching latest block error happend: ${e}`) @@ -167,9 +187,9 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter { async start (): Promise { try { // Emit block number right away - const currentLastBlockNumber = await this.eth.getBlockNumber() - this.logger.info(`Current block ${currentLastBlockNumber}`) - this.emit(NEW_BLOCK_EVENT_NAME, currentLastBlockNumber) + const lastBlock = await this.eth.getBlock('latest') + this.logger.verbose(`Current block with number ${lastBlock.number} with hash ${lastBlock.hash}`) + this.emit(NEW_BLOCK_EVENT_NAME, lastBlock) this.subscription = this.eth.subscribe('newBlockHeaders', (error, blockHeader) => { if (error) { @@ -178,8 +198,8 @@ export class ListeningNewBlockEmitter extends AutoStartStopEventEmitter { return } - this.logger.verbose(`New block ${blockHeader.number}`) - this.emit(NEW_BLOCK_EVENT_NAME, blockHeader.number) + this.logger.verbose(`New block with number ${lastBlock.number} with hash ${lastBlock.hash}`) + this.emit(NEW_BLOCK_EVENT_NAME, blockHeader) }) } catch (e) { this.logger.error(e) @@ -250,7 +270,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { * Specifically when this caching service is first launched this it will process past events. */ async init (): Promise { - if (this.blockTracker.getLastProcessedBlock() === undefined) { + if (this.blockTracker.getLastFetchedBlock()[0] === undefined) { const from = this.startingBlock await this.processPastEvents(from, 'latest') } @@ -267,7 +287,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { this.startEvents() if (this.confirmations > 0) { - this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.confirmEvents.bind(this)) + this.newBlockEmitter.on(NEW_BLOCK_EVENT_NAME, this.confirmationRoutine.bind(this)) } } @@ -275,7 +295,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { this.stopEvents() if (this.confirmations > 0) { - this.newBlockEmitter.off(NEW_BLOCK_EVENT_NAME, this.confirmEvents.bind(this)) + this.newBlockEmitter.off(NEW_BLOCK_EVENT_NAME, this.confirmationRoutine.bind(this)) } } @@ -294,20 +314,20 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { * * Before emitting it validates that the Event is still valid on blockchain using the transaction's receipt. * - * @param currentBlockNumber + * @param currentBlock */ - private async confirmEvents (currentBlockNumber: number): Promise { + private async confirmationRoutine (currentBlock: BlockHeader): Promise { try { const dbEvents = await Event.findAll({ where: { - blockNumber: { [Op.lte]: currentBlockNumber - this.confirmations }, + blockNumber: { [Op.lte]: currentBlock.number - this.confirmations }, event: this.events, emitted: false } }) const ethEvents = dbEvents.map(event => JSON.parse(event.content)) as EventData[] - ethEvents.forEach(this.emitEvent.bind(this)) + ethEvents.forEach(this.confirmEvent.bind(this)) this.logger.info(`Confirmed ${ethEvents.length} events.`) // Update DB that events were emitted @@ -318,6 +338,11 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { } } + private confirmEvent (data: EventData): void { + this.blockTracker.setLastProcessedBlockIfHigher(data.blockNumber, data.blockHash) + this.emitEvent(data) + } + protected emitEvent (data: EventData): void { this.logger.debug('Emitting event', [data]) this.emit(NEW_EVENT_EVENT_NAME, data) @@ -386,10 +411,10 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { * Main method for processing events. It should be called after retrieving Events from blockchain. * * @param events - * @param currentBlock + * @param currentBlockNumber */ - protected async processEvents (events: EventData | EventData[], currentBlock?: number): Promise { - currentBlock = currentBlock || await this.eth.getBlockNumber() + protected async processEvents (events: EventData | EventData[], currentBlockNumber?: number): Promise { + currentBlockNumber = currentBlockNumber || await this.eth.getBlockNumber() if (!Array.isArray(events)) { events = [events] @@ -407,7 +432,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { return } - const thresholdBlock = currentBlock - this.confirmations + const thresholdBlock = currentBlockNumber - this.confirmations this.logger.verbose(`Threshold block ${thresholdBlock},`) const eventsToBeConfirmed = events @@ -419,7 +444,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { .filter(event => event.blockNumber <= thresholdBlock) this.logger.info(`${eventsToBeEmitted.length} events to be emitted.`) - eventsToBeEmitted.forEach(this.emitEvent.bind(this)) + eventsToBeEmitted.forEach(this.confirmEvent.bind(this)) } /** @@ -431,10 +456,10 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { async processPastEvents (from: number | string, to: number | string): Promise { await this.semaphore.acquire() try { - const currentBlock = await this.eth.getBlockNumber() + const currentBlock = await this.eth.getBlock('latest') if (to === 'latest') { - to = currentBlock + to = currentBlock.number } this.logger.info(`=> Processing past events from ${from} to ${to}`) @@ -444,8 +469,8 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter { toBlock: to })) - await this.processEvents(events, currentBlock) - this.blockTracker.setLastProcessedBlock(currentBlock) + await this.processEvents(events, currentBlock.number) + this.blockTracker.setLastFetchedBlock(currentBlock.number, currentBlock.hash) const [secondsLapsed] = process.hrtime(startTime) this.logger.info(`=> Finished processing past events in ${secondsLapsed}s`) @@ -472,28 +497,28 @@ export class PollingEventsEmitter extends BaseEventsEmitter { super(eth, contract, events, logger, options) } - async poll (currentBlock: number): Promise { + async poll (currentBlock: BlockHeader): Promise { await this.semaphore.acquire() this.logger.verbose(`Received new block number ${currentBlock}`) try { - const lastProcessedBlock = this.blockTracker.getLastProcessedBlock() as number // undefined is checked in init() + const [lastFetchedBlockNumber, lastFetchedBlockHash] = this.blockTracker.getLastFetchedBlock() // Nothing new, lets fast-forward - if (lastProcessedBlock === currentBlock) { + if (lastFetchedBlockNumber === currentBlock.number) { this.logger.verbose('Nothing new to process') return } - this.logger.info(`Checking new events between blocks ${lastProcessedBlock}-${currentBlock}`) + this.logger.info(`Checking new events between blocks ${lastFetchedBlockNumber}-${currentBlock}`) // TODO: Possible to filter-out the events with "topics" property directly from the node const events = await this.contract.getPastEvents('allEvents', { - fromBlock: lastProcessedBlock + 1, // +1 because both fromBlock and toBlock is "or equal" - toBlock: currentBlock + fromBlock: (lastFetchedBlockNumber as number) + 1, // +1 because both fromBlock and toBlock is "or equal" + toBlock: currentBlock.number }) this.logger.debug('Received events: ', events) - await this.processEvents(events, currentBlock) - this.blockTracker.setLastProcessedBlock(currentBlock) + await this.processEvents(events, currentBlock.number) + this.blockTracker.setLastFetchedBlock(currentBlock.number, currentBlock.hash) } catch (e) { this.logger.error('Error in the processing loop:\n' + JSON.stringify(e, undefined, 2)) } finally { diff --git a/src/blockchain/utils.ts b/src/blockchain/utils.ts index 0d028d65..fc438d10 100644 --- a/src/blockchain/utils.ts +++ b/src/blockchain/utils.ts @@ -19,7 +19,7 @@ export async function getBlockDate (eth: Eth, blockNumber: number): Promise { return init(sequelize, { - 'storage.lastProcessedBlock': 'int', + 'storage.lastFetchedBlock': 'json', 'rates.lastUpdate': 'int', - 'rns.owner.lastProcessedBlock': 'int', - 'rns.reverse.lastProcessedBlock': 'int', - 'rns.placement.lastProcessedBlock': 'int' + 'rns.owner.lastFetchedBlock': 'json', + 'rns.reverse.lastFetchedBlock': 'json', + 'rns.placement.lastFetchedBlock': 'json' }) } diff --git a/test/blockchain/events.spec.ts b/test/blockchain/events.spec.ts index f612649f..409f5932 100644 --- a/test/blockchain/events.spec.ts +++ b/test/blockchain/events.spec.ts @@ -1,4 +1,4 @@ -import { BlockHeader, Eth } from 'web3-eth' +import { BlockHeader, BlockTransactionString, Eth } from 'web3-eth' import { Arg, Substitute } from '@fluffy-spoon/substitute' import sinon from 'sinon' import chai from 'chai' @@ -28,8 +28,13 @@ chai.use(chaiAsPromised) chai.use(dirtyChai) const expect = chai.expect const setImmediatePromise = util.promisify(setImmediate) + const NEW_BLOCK_EVENT = 'newBlock' const DATA_EVENT_NAME = 'newEvent' +const STORE_LAST_FETCHED_BLOCK_NUMBER_KEY = 'lastFetchedBlockNumber' +const STORE_LAST_FETCHED_BLOCK_HASH_KEY = 'lastFetchedBlockHash' +const STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY = 'lastProcessedBlockNumber' +const STORE_LAST_PROCESSED_BLOCK_HASH_KEY = 'lastProcessedBlockHash' function subscribeMock (sequence: Array, interval = 100): (event: string, cb: (err?: Error, blockHeader?: BlockHeader) => void) => void { let counter = 0 @@ -69,6 +74,13 @@ function eventMock (options?: Partial): EventData { return testEvent } +function blockFactory (blockNumber: number, blockHash = '0x123'): BlockTransactionString { + const block = Substitute.for() + block.number.returns!(blockNumber) + block.hash.returns!(blockHash) + return block +} + /** * Dummy implementation for testing BaseEventsEmitter */ @@ -103,51 +115,85 @@ export class DummyEventsEmitter extends BaseEventsEmitter { } } -// TODO: More focused assertions of expected calls. describe('BlockTracker', () => { - const STORE_LAST_PROCESSED_BLOCK_KEY = 'lastProcessedBlock' - it('read initial block from store', function () { - const store = { [STORE_LAST_PROCESSED_BLOCK_KEY]: 111 } + const store = { + [STORE_LAST_FETCHED_BLOCK_HASH_KEY]: '0x123', + [STORE_LAST_FETCHED_BLOCK_NUMBER_KEY]: 10, + [STORE_LAST_PROCESSED_BLOCK_HASH_KEY]: '0x321', + [STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY]: 8 + } + const bt = new BlockTracker(store) + expect(bt.getLastFetchedBlock()).to.eql([10, '0x123']) + expect(bt.getLastProcessedBlock()).to.eql([8, '0x321']) + }) + + it('should save last fetched block', function () { + const store = {} as BlockTrackerStore const bt = new BlockTracker(store) - expect(bt.getLastProcessedBlock()).to.eql(111) + + expect(bt.getLastFetchedBlock()).to.be.eql([undefined, undefined]) + expect(store[STORE_LAST_FETCHED_BLOCK_NUMBER_KEY]).to.be.undefined() + expect(store[STORE_LAST_FETCHED_BLOCK_HASH_KEY]).to.be.undefined() + + bt.setLastFetchedBlock(9, '0x123') + expect(bt.getLastFetchedBlock()).to.eql([9, '0x123']) + expect(store[STORE_LAST_FETCHED_BLOCK_NUMBER_KEY]).to.eql(9) + expect(store[STORE_LAST_FETCHED_BLOCK_HASH_KEY]).to.eql('0x123') }) - it('should save block', function () { + it('should save last processed block only if higher', function () { const store = {} as BlockTrackerStore const bt = new BlockTracker(store) - expect(bt.getLastProcessedBlock()).to.be.undefined() - expect(store[STORE_LAST_PROCESSED_BLOCK_KEY]).to.be.undefined() + expect(bt.getLastProcessedBlock()).to.be.eql([undefined, undefined]) + expect(store[STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY]).to.be.undefined() + expect(store[STORE_LAST_PROCESSED_BLOCK_HASH_KEY]).to.be.undefined() - bt.setLastProcessedBlock(10) - expect(bt.getLastProcessedBlock()).to.eql(10) - expect(store[STORE_LAST_PROCESSED_BLOCK_KEY]).to.eql(10) + bt.setLastProcessedBlockIfHigher(10, '0x123') + expect(bt.getLastProcessedBlock()).to.eql([10, '0x123']) + expect(store[STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY]).to.eql(10) + expect(store[STORE_LAST_PROCESSED_BLOCK_HASH_KEY]).to.eql('0x123') + + bt.setLastProcessedBlockIfHigher(9, '0x123') + expect(bt.getLastProcessedBlock()).to.eql([10, '0x123']) + expect(store[STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY]).to.eql(10) + expect(store[STORE_LAST_PROCESSED_BLOCK_HASH_KEY]).to.eql('0x123') + + bt.setLastProcessedBlockIfHigher(11, '0x1233') + expect(bt.getLastProcessedBlock()).to.eql([11, '0x1233']) + expect(store[STORE_LAST_PROCESSED_BLOCK_NUMBER_KEY]).to.eql(11) + expect(store[STORE_LAST_PROCESSED_BLOCK_HASH_KEY]).to.eql('0x1233') }) }) describe('PollingNewBlockEmitter', () => { it('should immediately emit event', async function () { const spy = sinon.spy() + const block = blockFactory(111) const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlock(Arg.all()).returns(Promise.resolve(block)) const emitter = new PollingNewBlockEmitter(eth, 100) emitter.on(NEW_BLOCK_EVENT, spy) // We have to wait for all previous schedules events in event-loop to finish await setImmediatePromise() - eth.received(1).getBlockNumber() - expect(spy.calledOnceWith(10)).to.be.true('Emitter callback should have been called with 10.') + eth.received(1).getBlock(Arg.all()) + expect(spy.calledOnceWith(block)).to.be.true('Emitter callback should have been called with 10.') + + emitter.off(NEW_BLOCK_EVENT, spy) // Cleanup }) it('should emit only new events', async function () { const spy = sinon.spy() const eth = Substitute.for() - eth.getBlockNumber().returns( - Promise.resolve(10), - Promise.resolve(10), - Promise.resolve(11) + const firstBlock = blockFactory(10) + const secondBlock = blockFactory(11) + eth.getBlock(Arg.all()).returns( + Promise.resolve(firstBlock), + Promise.resolve(blockFactory(10)), + Promise.resolve(secondBlock) ) const emitter = new PollingNewBlockEmitter(eth, 100) @@ -155,18 +201,21 @@ describe('PollingNewBlockEmitter', () => { // Lets wait for 3 events polls await sleep(210) - eth.received(3).getBlockNumber() + eth.received(3).getBlock(Arg.all()) expect(spy.calledTwice).to.be.true('Emitter callback should have been called twice.') - expect(spy.firstCall.calledWithExactly(10)).to.be.true('Emitter callback should have been called first with 10.') - expect(spy.secondCall.calledWithExactly(11)).to.be.true('Emitter callback should have been called second time with 11.') + expect(spy.firstCall.calledWithExactly(firstBlock)).to.be.true('Emitter callback should have been called first with 10.') + expect(spy.secondCall.calledWithExactly(secondBlock)).to.be.true('Emitter callback should have been called second time with 11.') + + emitter.off(NEW_BLOCK_EVENT, spy) // Cleanup }) it('should stop on removeListener', async function () { const spy = sinon.spy() + const block = blockFactory(10) const eth = Substitute.for() - eth.getBlockNumber().returns( - Promise.resolve(10), - Promise.resolve(10) + eth.getBlock(Arg.all()).returns( + Promise.resolve(block), + Promise.resolve(blockFactory(10)) ) const emitter = new PollingNewBlockEmitter(eth, 100) @@ -179,9 +228,9 @@ describe('PollingNewBlockEmitter', () => { // Lets make sure it is off await sleep(110) - eth.received(2).getBlockNumber() + eth.received(2).getBlock(Arg.all()) expect(spy.calledOnce).to.be.true('Emitter callback should have been called once.') - expect(spy.firstCall.calledWithExactly(10)).to.be.true('Emitter callback should have been called first with 10.') + expect(spy.firstCall.calledWithExactly(block)).to.be.true('Emitter callback should have been called first with 10.') }) }) @@ -190,27 +239,29 @@ describe('ListeningNewBlockEmitter', () => { it('should immediately emit event', async function () { const spy = sinon.spy() + const block = blockFactory(10) const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlock(Arg.all()).returns(Promise.resolve(block)) const emitter = new ListeningNewBlockEmitter(eth) emitter.on(NEW_BLOCK_EVENT, spy) // We have to wait for all previous schedules events in event-loop to finish await setImmediatePromise() - eth.received(1).getBlockNumber() - expect(spy.calledOnceWith(10)).to.be.true('Emitter callback should have been called with 10.') + eth.received(1).getBlock(Arg.all()) + expect(spy.calledOnceWith(block)).to.be.true('Emitter callback should have been called with 10.') }) it('should listen for events from blockchain', async function () { const spy = sinon.spy() + const block = blockFactory(9) const block1 = Substitute.for() block1.number.returns!(10) const block2 = Substitute.for() block2.number.returns!(11) const subscribe = subscribeMock([block1, block2], 100) const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(9)) + eth.getBlock(Arg.all()).returns(Promise.resolve(block)) // @ts-ignore eth.subscribe(Arg.all()).mimicks(subscribe) @@ -220,11 +271,11 @@ describe('ListeningNewBlockEmitter', () => { // Lets wait for 3 events fired await sleep(410) - eth.received(1).getBlockNumber() + eth.received(1).getBlock(Arg.all()) expect(spy).to.have.callCount(3) - expect(spy.firstCall).to.be.calledWithExactly(9) - expect(spy.secondCall).to.be.calledWithExactly(10) - expect(spy.thirdCall).to.be.calledWithExactly(11) + expect(spy.firstCall).to.be.calledWithExactly(block) + expect(spy.secondCall).to.be.calledWithExactly(block1) + expect(spy.thirdCall).to.be.calledWithExactly(block2) }) }) @@ -248,7 +299,8 @@ describe('BaseEventsEmitter', () => { ] const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10), Promise.resolve(11)) + eth.getBlockNumber().resolves(11) + eth.getBlock('latest').resolves(blockFactory(11)) const contract = Substitute.for() contract.getPastEvents(Arg.all()).returns(sleep(200, events)) @@ -263,24 +315,26 @@ describe('BaseEventsEmitter', () => { // Directly calling processEvents(), which should be blocked by the processingPastEvents() const createEventPromise = eventsEmitter.createEvent(events) await sleep(50) - eth.received(1).getBlockNumber() + eth.received(1).getBlock('latest') + eth.received(0).getBlockNumber() // After the processingEvents() is finished await createEventPromise - eth.received(2).getBlockNumber() + eth.received(1).getBlock('latest') + eth.received(1).getBlockNumber() }) describe('with confirmations', () => { it('should process past events', async function () { const events = [ - eventMock({ blockNumber: 4, transactionHash: '1', logIndex: 1 }), - eventMock({ blockNumber: 8, transactionHash: '2', logIndex: 1 }), - eventMock({ blockNumber: 9, transactionHash: '3', logIndex: 1 }), - eventMock({ blockNumber: 10, transactionHash: '4', logIndex: 1 }) + eventMock({ blockHash: '0x123', blockNumber: 4, transactionHash: '1', logIndex: 1 }), + eventMock({ blockHash: '0x125', blockNumber: 8, transactionHash: '2', logIndex: 1 }), + eventMock({ blockHash: '0x123', blockNumber: 9, transactionHash: '3', logIndex: 1 }), + eventMock({ blockHash: '0x123', blockNumber: 10, transactionHash: '4', logIndex: 1 }) ] const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlock('latest').resolves(blockFactory(10)) const contract = Substitute.for() contract.getPastEvents(Arg.all()).returns(Promise.resolve(events)) @@ -294,20 +348,21 @@ describe('BaseEventsEmitter', () => { await sleep(100) expect(spy.callCount).to.be.eql(2, 'Expected two events emitted.') - eth.received(1).getBlockNumber() + eth.received(1).getBlock('latest') contract.received(1).getPastEvents(Arg.all()) expect(await Event.count()).to.eql(2) - expect(blockTracker.getLastProcessedBlock()).to.eql(10) + expect(blockTracker.getLastProcessedBlock()).to.eql([8, '0x125']) + expect(blockTracker.getLastFetchedBlock()).to.eql([10, '0x123']) }) it('should process new events', async function () { - const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) - const contract = Substitute.for() - contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) + const eth = Substitute.for() + eth.getBlockNumber().resolves(10) const blockTracker = new BlockTracker({}) + blockTracker.setLastFetchedBlock(3, '') // Leads to no processPastEvents + const newBlockEmitter = new EventEmitter() const options: EventsEmitterOptions = { confirmations: 2, blockTracker, newBlockEmitter } const spy = sinon.spy() @@ -324,19 +379,18 @@ describe('BaseEventsEmitter', () => { expect(await Event.count()).to.eql(2) expect(spy.callCount).to.be.eql(2, 'Expected two events emitted.') - eth.received(2).getBlockNumber() - contract.received(1).getPastEvents(Arg.all()) - expect(blockTracker.getLastProcessedBlock()).to.eql(10) + eth.received(1).getBlockNumber() + contract.received(0).getPastEvents(Arg.all()) }) it('should handle re-emitted events', async function () { - const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) - const contract = Substitute.for() - contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) + const eth = Substitute.for() + eth.getBlockNumber().resolves(10) const blockTracker = new BlockTracker({}) + blockTracker.setLastFetchedBlock(6, '') // Leads to no processPastEvents + const newBlockEmitter = new EventEmitter() const options: EventsEmitterOptions = { confirmations: 2, blockTracker, newBlockEmitter } const spy = sinon.spy() @@ -369,13 +423,13 @@ describe('BaseEventsEmitter', () => { }) it('should handle only re-emitted events that were not already processed in past', async function () { - const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) - const contract = Substitute.for() - contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) + const eth = Substitute.for() + eth.getBlockNumber().resolves(10) const blockTracker = new BlockTracker({}) + blockTracker.setLastFetchedBlock(6, '') // Leads to no processPastEvents + const newBlockEmitter = new EventEmitter() const options: EventsEmitterOptions = { confirmations: 2, blockTracker, newBlockEmitter } const spy = sinon.spy() @@ -409,13 +463,13 @@ describe('BaseEventsEmitter', () => { }) it('should confirm events', async function () { - const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) - const contract = Substitute.for() - contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) + const eth = Substitute.for() + eth.getBlockNumber().resolves(10) const blockTracker = new BlockTracker({}) + blockTracker.setLastFetchedBlock(3, '') // Leads to no processPastEvents + const newBlockEmitter = new EventEmitter() const options: EventsEmitterOptions = { confirmations: 2, blockTracker, newBlockEmitter } const emitterSpy = sinon.spy() @@ -429,58 +483,59 @@ describe('BaseEventsEmitter', () => { blockNumber: 7, transactionHash: '1', logIndex: 1, - content: '{"transactionHash": "1", "blockNumber": 7}' + content: '{"transactionHash": "1", "blockHash": "0x321", "blockNumber": 7}' }, { event: 'testEvent', blockNumber: 8, transactionHash: '2', logIndex: 1, - content: '{"transactionHash": "2", "blockNumber": 8}' + content: '{"transactionHash": "2", "blockHash": "0x321", "blockNumber": 8}' }, { event: 'testEvent', blockNumber: 9, transactionHash: '3', logIndex: 1, - content: '{"transactionHash": "3", "blockNumber": 9}' + content: '{"transactionHash": "3", "blockHash": "0x321", "blockNumber": 9}' }, { event: 'testEvent', blockNumber: 9, transactionHash: '4', logIndex: 1, - content: '{"transactionHash": "4", "blockNumber": 9}' + content: '{"transactionHash": "4", "blockHash": "0x321", "blockNumber": 9}' }, { event: 'testEvent', blockNumber: 10, transactionHash: '5', logIndex: 1, - content: '{"transactionHash": "5", "blockNumber": 10}' + content: '{"transactionHash": "5", "blockHash": "0x321", "blockNumber": 10}' }, { event: 'testEvent', blockNumber: 11, transactionHash: '6', logIndex: 1, - content: '{"transactionHash": "6", "blockNumber": 11}' + content: '{"transactionHash": "6", "blockHash": "0x321", "blockNumber": 11}' } ] await Event.bulkCreate(events) expect(await Event.count({ where: { emitted: true } })).to.eql(0) // Start confirmations process - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) await sleep(500) expect(emitterSpy.callCount).to.be.eql(4, 'Expected four events emitted.') expect(await Event.count({ where: { emitted: true } })).to.eql(4) + expect(blockTracker.getLastProcessedBlock()).to.eql([9, '0x321']) }) it('each emitter should confirm only his events', async function () { const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlockNumber().resolves(10) const contract = Substitute.for() contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) @@ -514,59 +569,59 @@ describe('BaseEventsEmitter', () => { blockNumber: 7, transactionHash: '1', logIndex: 1, - content: '{"transactionHash": "1", "blockNumber": 7, "event": "firstEvent"}' + content: '{"transactionHash": "1", "blockHash": "0x321", "blockNumber": 7, "event": "firstEvent"}' }, { event: 'secondEvent', blockNumber: 8, transactionHash: '2', logIndex: 1, - content: '{"transactionHash": "2", "blockNumber": 8, "event": "secondEvent"}' + content: '{"transactionHash": "2", "blockHash": "0x321", "blockNumber": 8, "event": "secondEvent"}' }, { event: 'firstEvent', blockNumber: 9, transactionHash: '3', logIndex: 1, - content: '{"transactionHash": "3", "blockNumber": 9, "event": "firstEvent"}' + content: '{"transactionHash": "3", "blockHash": "0x321", "blockNumber": 9, "event": "firstEvent"}' }, { event: 'secondEvent', blockNumber: 10, transactionHash: '4', logIndex: 1, - content: '{"transactionHash": "4", "blockNumber": 10, "event": "secondEvent"}' + content: '{"transactionHash": "4", "blockHash": "0x321", "blockNumber": 10, "event": "secondEvent"}' }, { event: 'firstEvent', blockNumber: 11, transactionHash: '5', logIndex: 1, - content: '{"transactionHash": "5", "blockNumber": 11, "event": "firstEvent"}' + content: '{"transactionHash": "5", "blockHash": "0x321", "blockNumber": 11, "event": "firstEvent"}' }, { event: 'firstEvent', blockNumber: 12, transactionHash: '6', logIndex: 1, - content: '{"transactionHash": "6", "blockNumber": 12, "event": "firstEvent"}' + content: '{"transactionHash": "6", "blockHash": "0x321", "blockNumber": 12, "event": "firstEvent"}' }, { event: 'secondEvent', blockNumber: 13, transactionHash: '7', logIndex: 1, - content: '{"transactionHash": "7", "blockNumber": 13, "event": "secondEvent"}' + content: '{"transactionHash": "7", "blockHash": "0x321", "blockNumber": 13, "event": "secondEvent"}' } ] await Event.bulkCreate(events) expect(await Event.count({ where: { emitted: true } })).to.eql(0) // Start confirmations process - firstNewBlockEmitter.emit(NEW_BLOCK_EVENT, 13) + firstNewBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(13)) await sleep(500) - secondNewBlockEmitter.emit(NEW_BLOCK_EVENT, 13) + secondNewBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(13)) await sleep(500) expect(firstEmitterSpy.callCount).to.be.eql(3, 'Expected three firstEvent events emitted.') @@ -589,7 +644,7 @@ describe('BaseEventsEmitter', () => { ] const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlock('latest').resolves(blockFactory(10)) const contract = Substitute.for() contract.getPastEvents(Arg.all()).returns(Promise.resolve(events)) @@ -603,19 +658,19 @@ describe('BaseEventsEmitter', () => { await sleep(100) expect(spy.callCount).to.be.eql(3, 'Expected three events emitted.') - eth.received(1).getBlockNumber() + eth.received(1).getBlock('latest') contract.received(1).getPastEvents(Arg.all()) - expect(blockTracker.getLastProcessedBlock()).to.eql(10) + expect(blockTracker.getLastFetchedBlock()).to.eql([10, '0x123']) }) it('should emits new events', async function () { - const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) - const contract = Substitute.for() - contract.getPastEvents(Arg.all()).returns(Promise.resolve([])) + const eth = Substitute.for() + eth.getBlockNumber().resolves(10) const blockTracker = new BlockTracker({}) + blockTracker.setLastFetchedBlock(6, '') // Leads to no processPastEvents + const newBlockEmitter = new EventEmitter() const options = { blockTracker, newBlockEmitter } const spy = sinon.spy() @@ -633,9 +688,9 @@ describe('BaseEventsEmitter', () => { await sleep(100) // In order to processPastEvents() finish expect(spy.callCount).to.be.eql(3, 'Expected three events emitted.') - expect(blockTracker.getLastProcessedBlock()).to.eql(10) - contract.received(1).getPastEvents(Arg.all()) - eth.received(2).getBlockNumber() + expect(blockTracker.getLastFetchedBlock()).to.eql([6, '']) + contract.received(0).getPastEvents(Arg.all()) + eth.received(1).getBlockNumber() }) }) }) @@ -650,7 +705,7 @@ describe('PollingEventsEmitter', function () { ) const blockTracker = new BlockTracker({}) - blockTracker.setLastProcessedBlock(10) + blockTracker.setLastFetchedBlock(10, '0x123') const newBlockEmitter = new EventEmitter() const options = { blockTracker, newBlockEmitter } @@ -659,14 +714,14 @@ describe('PollingEventsEmitter', function () { eventsEmitter.on(DATA_EVENT_NAME, spy) await setImmediatePromise() - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) await sleep(100) - newBlockEmitter.emit(NEW_BLOCK_EVENT, 12) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(12)) await sleep(100) contract.received(2).getPastEvents(Arg.all()) - expect(blockTracker.getLastProcessedBlock()).to.eql(12) + expect(blockTracker.getLastFetchedBlock()).to.eql([12, '0x123']) expect(spy.callCount).to.eql(2, 'Expected two emitted events') }) @@ -679,7 +734,7 @@ describe('PollingEventsEmitter', function () { ) const blockTracker = new BlockTracker({}) - blockTracker.setLastProcessedBlock(10) + blockTracker.setLastFetchedBlock(10, '0x123') const newBlockEmitter = new EventEmitter() const options = { blockTracker, newBlockEmitter } @@ -688,14 +743,14 @@ describe('PollingEventsEmitter', function () { eventsEmitter.on(DATA_EVENT_NAME, spy) await setImmediatePromise() - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) await sleep(100) - newBlockEmitter.emit(NEW_BLOCK_EVENT, 12) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(12)) await sleep(100) contract.received(2).getPastEvents(Arg.all()) - expect(blockTracker.getLastProcessedBlock()).to.eql(12) + expect(blockTracker.getLastFetchedBlock()).to.eql([12, '0x123']) expect(spy.callCount).to.eql(1) }) @@ -707,7 +762,7 @@ describe('PollingEventsEmitter', function () { ) const blockTracker = new BlockTracker({}) - blockTracker.setLastProcessedBlock(10) + blockTracker.setLastFetchedBlock(10, '0x123') const newBlockEmitter = new EventEmitter() const options = { blockTracker, newBlockEmitter } @@ -716,14 +771,14 @@ describe('PollingEventsEmitter', function () { eventsEmitter.on(DATA_EVENT_NAME, spy) await setImmediatePromise() - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) await sleep(100) - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) // Testing if same block is ignored + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) // Testing if same block is ignored await sleep(100) contract.received(1).getPastEvents(Arg.all()) - expect(blockTracker.getLastProcessedBlock()).to.eql(11) + expect(blockTracker.getLastFetchedBlock()).to.eql([11, '0x123']) expect(spy.callCount).to.eql(1, 'Expected only one emitted event') }) @@ -736,7 +791,7 @@ describe('PollingEventsEmitter', function () { ] const eth = Substitute.for() - eth.getBlockNumber().returns(Promise.resolve(10)) + eth.getBlockNumber().resolves(10) const contract = Substitute.for() contract.getPastEvents(Arg.all()).returns(sleep(200, events), Promise.resolve(events)) @@ -749,7 +804,7 @@ describe('PollingEventsEmitter', function () { eventsEmitter.on(DATA_EVENT_NAME, spy) // Will start processingPastEvents() which will be delayed // Directly calling processEvents(), which should be blocked by the processingPastEvents() - newBlockEmitter.emit(NEW_BLOCK_EVENT, 11) + newBlockEmitter.emit(NEW_BLOCK_EVENT, blockFactory(11)) await sleep(50) contract.received(1).getPastEvents(Arg.all())