Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

Commit

Permalink
feat: semaphore support
Browse files Browse the repository at this point in the history
  • Loading branch information
AuHau committed Mar 11, 2020
1 parent 75b8ce2 commit 4dbfbd0
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 24 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,3 @@ coverage

# Dependency directory
node_modules

# Local dev DB
db.sqlite
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"@feathersjs/feathers": "^4.5.1",
"@feathersjs/socketio": "^4.5.1",
"@rsksmart/rif-martketplace-storage-pinning": "github:rsksmart/rif-martketplace-storage-pinning#feat/npm_package_preparations",
"async-sema": "^3.1.0",
"colors": "^1.4.0",
"compression": "^1.7.4",
"conf": "^6.2.0",
Expand Down
45 changes: 28 additions & 17 deletions src/blockchain/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { EventEmitter } from 'events'
import { NotImplemented } from '@feathersjs/errors'
import { Op } from 'sequelize'
import { Logger } from 'winston'
import { Sema } from 'async-sema'

import { factory } from '../logger'
import Event, { EventInterface } from '../models/event.model'
Expand All @@ -23,7 +24,7 @@ export interface PollingOptions {
}

export enum EventsEmitterStrategy {
POLLING= 1,
POLLING = 1,
LISTENING,
}

Expand Down Expand Up @@ -178,6 +179,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
protected readonly events: string[]
protected readonly contract: Contract
protected readonly eth: Eth
protected readonly semaphore: Sema
private readonly confirmations: number
private isInitialized = false

Expand All @@ -187,6 +189,7 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
this.contract = contract
this.events = events
this.confirmations = options?.confirmations || 0
this.semaphore = new Sema(1) // Allow only one caller

if (options?.blockTracker) {
if (options.blockTracker instanceof BlockTracker) {
Expand Down Expand Up @@ -227,9 +230,9 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
this.isInitialized = true
}

async start (): Promise<void> {
start (): void {
if (!this.isInitialized) {
await this.init()
this.init()
}

this.startEvents()
Expand Down Expand Up @@ -362,24 +365,29 @@ export abstract class BaseEventsEmitter extends AutoStartStopEventEmitter {
* @param to
*/
async processPastEvents (from: number | string, to: number | string): Promise<void> {
const currentBlock = await this.eth.getBlockNumber()
await this.semaphore.acquire()
try {
const currentBlock = await this.eth.getBlockNumber()

if (to === 'latest') {
to = currentBlock
}
if (to === 'latest') {
to = currentBlock
}

this.logger.info(`=> Processing past events from ${from} to ${to}`)
const startTime = process.hrtime()
const events = (await this.contract.getPastEvents('allEvents', {
fromBlock: from,
toBlock: to
}))
this.logger.info(`=> Processing past events from ${from} to ${to}`)
const startTime = process.hrtime()
const events = (await this.contract.getPastEvents('allEvents', {
fromBlock: from,
toBlock: to
}))

await this.processEvents(events, currentBlock)
this.blockTracker.setLastProcessedBlock(currentBlock)
await this.processEvents(events, currentBlock)
this.blockTracker.setLastProcessedBlock(currentBlock)

const [secondsLapsed] = process.hrtime(startTime)
this.logger.info(`=> Finished processing past events in ${secondsLapsed}s`)
const [secondsLapsed] = process.hrtime(startTime)
this.logger.info(`=> Finished processing past events in ${secondsLapsed}s`)
} finally {
this.semaphore.release()
}
}
}

Expand All @@ -400,6 +408,7 @@ export class PollingEventsEmitter extends BaseEventsEmitter {
}

async poll (currentBlock: number): Promise<void> {
await this.semaphore.acquire()
this.logger.info(`Received new block number ${currentBlock}`)
try {
const lastProcessedBlock = this.blockTracker.getLastProcessedBlock()
Expand All @@ -421,6 +430,8 @@ export class PollingEventsEmitter extends BaseEventsEmitter {
this.blockTracker.setLastProcessedBlock(currentBlock)
} catch (e) {
this.logger.error('Error in the processing loop:\n' + JSON.stringify(e, undefined, 2))
} finally {
this.semaphore.release()
}
}

Expand Down
75 changes: 71 additions & 4 deletions test/blockchain/events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class StoreMock implements Store {
}
}

function sleep (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
function sleep<T> (ms: number, ...args: T[]): Promise<T> {
return new Promise(resolve => setTimeout(() => resolve(...args), ms))
}

function subscribeMock (sequence: Array<Error | BlockHeader>, interval = 100): (event: string, cb: (err?: Error, blockHeader?: BlockHeader) => void) => void {
Expand Down Expand Up @@ -95,8 +95,13 @@ export class DummyEventsEmitter extends BaseEventsEmitter {
super(eth, contract, events, logger, options)
}

createEvent (data: EventData | EventData[]): Promise<void> {
return this.processEvents(data)
async createEvent (data: EventData | EventData[]): Promise<void> {
await this.semaphore.acquire()
try {
return this.processEvents(data)
} finally {
this.semaphore.release()
}
}

startEvents (): void {
Expand Down Expand Up @@ -246,6 +251,37 @@ describe('BaseEventsEmitter', () => {
await sequelize.sync({ force: true })
})

it('should wait for previous processing finished', async function () {
const events = [
eventMock({ blockNumber: 4, transactionHash: '1', logIndex: 1 }),
eventMock({ blockNumber: 8, transactionHash: '2', logIndex: 1 }),
eventMock({ blockNumber: 9, transactionHash: '3', logIndex: 1 }),
eventMock({ blockNumber: 10, transactionHash: '4', logIndex: 1 })
]

const eth = Substitute.for<Eth>()
eth.getBlockNumber().returns(Promise.resolve(10), Promise.resolve(11))

const contract = Substitute.for<Contract>()
contract.getPastEvents(Arg.all()).returns(sleep(200, events))

const blockTracker = new BlockTracker(new StoreMock())
const newBlockEmitter = new EventEmitter()
const options = { blockTracker, newBlockEmitter }
const spy = sinon.spy()
const eventsEmitter = new DummyEventsEmitter(eth, contract, ['testEvent'], options)
eventsEmitter.on(DATA_EVENT_NAME, spy) // Will start processingPastEvents() which will be delayed

// Directly calling processEvents(), which should be blocked by the processingPastEvents()
const createEventPromise = eventsEmitter.createEvent(events)
await sleep(50)
eth.received(1).getBlockNumber()

// After the processingEvents() is finished
await createEventPromise
eth.received(2).getBlockNumber()
})

describe('with confirmations', () => {
it('should process past events', async function () {
const events = [
Expand Down Expand Up @@ -525,4 +561,35 @@ describe('PollingEventsEmitter', function () {
expect(blockTracker.getLastProcessedBlock()).to.eql(11)
expect(spy.getCalls().length).to.eql(1, 'Expected only one emitted event')
})

it('should wait for previous processing finished', async function () {
const events = [
eventMock({ blockNumber: 4, transactionHash: '1', logIndex: 1 }),
eventMock({ blockNumber: 8, transactionHash: '2', logIndex: 1 }),
eventMock({ blockNumber: 9, transactionHash: '3', logIndex: 1 }),
eventMock({ blockNumber: 10, transactionHash: '4', logIndex: 1 })
]

const eth = Substitute.for<Eth>()
eth.getBlockNumber().returns(Promise.resolve(10))

const contract = Substitute.for<Contract>()
contract.getPastEvents(Arg.all()).returns(sleep(200, events), Promise.resolve(events))

const blockTracker = new BlockTracker(new StoreMock())
const newBlockEmitter = new EventEmitter()
const options = { blockTracker, newBlockEmitter }
const spy = sinon.spy()
const eventsEmitter = new PollingEventsEmitter(eth, contract, ['testEvent'], options)
eventsEmitter.on(DATA_EVENT_NAME, spy) // Will start processingPastEvents() which will be delayed

// Directly calling processEvents(), which should be blocked by the processingPastEvents()
newBlockEmitter.emit(NEW_BLOCK_EVENT, 11)
await sleep(50)
contract.received(1).getPastEvents(Arg.all())

// After the processingEvents() is finished
await sleep(500)
contract.received(2).getPastEvents(Arg.all())
})
})

0 comments on commit 4dbfbd0

Please sign in to comment.