Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Add subscription subprovider to handle websocket pub/sub #207

Merged
merged 6 commits into from
Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function Web3ProviderEngine(opts) {
// block polling
const skipInitLockProvider = { sendAsync: self._handleAsync.bind(self) }
const blockTrackerProvider = opts.blockTrackerProvider || skipInitLockProvider
self._blockTracker = new EthBlockTracker({
self._blockTracker = opts.blockTracker || new EthBlockTracker({
provider: blockTrackerProvider,
pollingInterval: opts.pollingInterval || 4000,
})
Expand Down
57 changes: 42 additions & 15 deletions subproviders/filters.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const inherits = require('util').inherits
const ethUtil = require('ethereumjs-util')
const Subprovider = require('./subprovider.js')
const Stoplight = require('../util/stoplight.js')
const EventEmitter = require('events').EventEmitter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is correct and fine but you can also just do

const EventEmitter = require('events')


module.exports = FilterSubprovider

Expand Down Expand Up @@ -125,7 +126,7 @@ FilterSubprovider.prototype.newLogFilter = function(opts, cb) {
var blockHandler = function(block, cb){
self._logsForBlock(block, function(err, logs){
if (err) return cb(err)
logs.forEach(newLogHandler)
newLogHandler(logs)
cb()
})
}
Expand All @@ -147,7 +148,7 @@ FilterSubprovider.prototype.newPendingTransactionFilter = function(cb) {
var blockHandler = function(block, cb){
self._txHashesForBlock(block, function(err, txs){
if (err) return cb(err)
txs.forEach(newTxHandler)
newTxHandler(txs)
cb()
})
}
Expand Down Expand Up @@ -205,6 +206,8 @@ FilterSubprovider.prototype.uninstallFilter = function(filterId, cb) {
return
}

self.filters[filterId].removeAllListeners()

var destroyHandler = self.filterDestroyHandlers[filterId]
delete self.filters[filterId]
delete self.asyncBlockHandlers[filterId]
Expand Down Expand Up @@ -292,9 +295,12 @@ FilterSubprovider.prototype._txHashesForBlock = function(block, cb) {
// BlockFilter
//

inherits(BlockFilter, EventEmitter)

function BlockFilter(opts) {
// console.log('BlockFilter - new')
const self = this
EventEmitter.apply(self)
self.type = 'block'
self.engine = opts.engine
self.blockNumber = opts.blockNumber
Expand All @@ -306,6 +312,7 @@ BlockFilter.prototype.update = function(block){
const self = this
var blockHash = bufferToHex(block.hash)
self.updates.push(blockHash)
self.emit('data', block)
}

BlockFilter.prototype.getChanges = function(){
Expand All @@ -325,9 +332,12 @@ BlockFilter.prototype.clearChanges = function(){
// LogFilter
//

inherits(LogFilter, EventEmitter)

function LogFilter(opts) {
// console.log('LogFilter - new')
const self = this
EventEmitter.apply(self)
self.type = 'log'
self.fromBlock = opts.fromBlock || 'latest'
self.toBlock = opts.toBlock || 'latest'
Expand Down Expand Up @@ -376,15 +386,22 @@ LogFilter.prototype.validateLog = function(log){
return topicsMatch
}

LogFilter.prototype.update = function(log){
LogFilter.prototype.update = function(logs){
// console.log('LogFilter - update')
const self = this
// validate filter match
var validated = self.validateLog(log)
if (!validated) return
// add to results
self.updates.push(log)
self.allResults.push(log)
var validLogs = []
logs.forEach(function(log) {
var validated = self.validateLog(log)
if (!validated) return
// add to results
validLogs.push(log)
self.updates.push(log)
self.allResults.push(log)
})
if (validLogs.length > 0) {
self.emit('data', validLogs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this also need the null error event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the only consumer of the event is the SubscriptionSubprovider - would rather emit error in that case.

}
}

LogFilter.prototype.getChanges = function(){
Expand All @@ -411,9 +428,12 @@ LogFilter.prototype.clearChanges = function(){
// PendingTxFilter
//

inherits(PendingTransactionFilter, EventEmitter)

function PendingTransactionFilter(){
// console.log('PendingTransactionFilter - new')
const self = this
EventEmitter.apply(self)
self.type = 'pendingTx'
self.updates = []
self.allResults = []
Expand All @@ -424,15 +444,22 @@ PendingTransactionFilter.prototype.validateUnique = function(tx){
return self.allResults.indexOf(tx) === -1
}

PendingTransactionFilter.prototype.update = function(tx){
PendingTransactionFilter.prototype.update = function(txs){
// console.log('PendingTransactionFilter - update')
const self = this
// validate filter match
var validated = self.validateUnique(tx)
if (!validated) return
// add to results
self.updates.push(tx)
self.allResults.push(tx)
var validTxs = []
txs.forEach(function (tx) {
// validate filter match
var validated = self.validateUnique(tx)
if (!validated) return
// add to results
validTxs.push(tx)
self.updates.push(tx)
self.allResults.push(tx)
})
if (validTxs.length > 0) {
self.emit('data', validTxs)
}
}

PendingTransactionFilter.prototype.getChanges = function(){
Expand Down
135 changes: 135 additions & 0 deletions subproviders/subscriptions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
const EventEmitter = require('events').EventEmitter
const FilterSubprovider = require('./filters.js')
const from = require('../util/rpc-hex-encoding.js')
const inherits = require('util').inherits
const utils = require('ethereumjs-util')

function SubscriptionSubprovider(opts) {
const self = this

opts = opts || {}

EventEmitter.apply(this, Array.prototype.slice.call(arguments))
FilterSubprovider.apply(this, [opts])

this.subscriptions = {}
}

inherits(SubscriptionSubprovider, FilterSubprovider)

// a cheap crack at multiple inheritance
// I don't really care if `instanceof EventEmitter` passes...
Object.assign(SubscriptionSubprovider.prototype, EventEmitter.prototype)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of SubscriptionSubprovider being an EE, we can just make engine an EE and require that to be passed into the constructor -- and lol directly emit events on it /shrug

Copy link
Contributor Author

@benjamincburns benjamincburns Jan 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave it like it is - I see no reason why this class needs to be tightly coupled to engine like that. Further, in ganache-core I make the provider an EventEmitter (it must be, per web3 requirements), and I proxy the EE methods for the data event directly to the SubscriptionSubprovider. I like it this way over proxying to the engine because it's more explicit that I'm subscribing to pub/sub events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've thought about it a bit more, I wonder if there shouldn't be an event middleware stack. As pub/sub grows in scope, I could imagine wanting to write middleware that would squelch or mutate notifications.

Maybe that's just YAGNI, though...


// preserve our constructor, though
SubscriptionSubprovider.prototype.constructor = SubscriptionSubprovider

SubscriptionSubprovider.prototype.eth_subscribe = function(payload, cb) {
const self = this
let createSubscriptionFilter = () => {}
let subscriptionType = payload.params[0]

switch (subscriptionType) {
case 'logs':
let options = payload.params[1]

createSubscriptionFilter = self.newLogFilter.bind(self, options)
break
case 'newPendingTransactions':
createSubscriptionFilter = self.newPendingTransactionFilter.bind(self)
break
case 'newHeads':
createSubscriptionFilter = self.newBlockFilter.bind(self)
break
case 'syncing':
default:
cb(new Error('unsupported subscription type'))
return
}

createSubscriptionFilter(function(err, id) {
if (err) return cb(err)
self.subscriptions[id] = subscriptionType

self.filters[id].on('data', function(results) {
if (!Array.isArray(results)) {
results = [results]
}

var notificationHandler = self._notificationHandler.bind(self, id, subscriptionType)
results.forEach(notificationHandler)
self.filters[id].clearChanges()
})
if (subscriptionType === 'newPendingTransactions') {
self.checkForPendingBlocks()
}
cb(null, id)
})
}

SubscriptionSubprovider.prototype._notificationHandler = function (id, subscriptionType, result) {
const self = this
if (subscriptionType === 'newHeads') {
result = self._notificationResultFromBlock(result)
}

// it seems that web3 doesn't expect there to be a separate error event
// so we must emit null along with the result object
self.emit('data', null, {
jsonrpc: "2.0",
method: "eth_subscription",
params: {
subscription: id,
result: result,
},
})
}

SubscriptionSubprovider.prototype._notificationResultFromBlock = function(block) {
return {
hash: utils.bufferToHex(block.hash),
parentHash: utils.bufferToHex(block.parentHash),
sha3Uncles: utils.bufferToHex(block.sha3Uncles),
miner: utils.bufferToHex(block.miner),
stateRoot: utils.bufferToHex(block.stateRoot),
transactionsRoot: utils.bufferToHex(block.transactionsRoot),
receiptsRoot: utils.bufferToHex(block.receiptsRoot),
logsBloom: utils.bufferToHex(block.logsBloom),
difficulty: from.intToQuantityHex(utils.bufferToInt(block.difficulty)),
number: from.intToQuantityHex(utils.bufferToInt(block.number)),
gasLimit: from.intToQuantityHex(utils.bufferToInt(block.gasLimit)),
gasUsed: from.intToQuantityHex(utils.bufferToInt(block.gasUsed)),
nonce: block.nonce ? utils.bufferToHex(block.nonce): null,
timestamp: from.intToQuantityHex(utils.bufferToInt(block.timestamp)),
extraData: utils.bufferToHex(block.extraData)
}
}

SubscriptionSubprovider.prototype.eth_unsubscribe = function(payload, cb) {
const self = this
let subscriptionId = payload.params[0]
if (!self.subscriptions[subscriptionId]) {
cb(new Error(`Subscription ID ${subscriptionId} not found.`))
} else {
let subscriptionType = self.subscriptions[subscriptionId]
self.uninstallFilter(subscriptionId, function (err, result) {
delete self.subscriptions[subscriptionId]
cb(err, result)
})
}
}

SubscriptionSubprovider.prototype.handleRequest = function(payload, next, end) {
switch(payload.method){
case 'eth_subscribe':
this.eth_subscribe(payload, end)
break
case 'eth_unsubscribe':
this.eth_unsubscribe(payload, end)
break
default:
FilterSubprovider.prototype.handleRequest.apply(this, Array.prototype.slice.call(arguments))
}
}

module.exports = SubscriptionSubprovider
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require('./cache-utils')
require('./cache')
require('./inflight-cache')
require('./filters')
require('./subscriptions')
require('./solc')
require('./wallet')
require('./subproviders/sanitizer')
Expand Down
Loading