-
-
Notifications
You must be signed in to change notification settings - Fork 327
Add subscription subprovider to handle websocket pub/sub #207
Changes from all commits
1101080
6540f6a
bc8d205
282fd2b
aeb620c
70f2eac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ const inherits = require('util').inherits | |
const ethUtil = require('ethereumjs-util') | ||
const Subprovider = require('./subprovider.js') | ||
const Stoplight = require('../util/stoplight.js') | ||
const EventEmitter = require('events').EventEmitter | ||
|
||
module.exports = FilterSubprovider | ||
|
||
|
@@ -125,7 +126,7 @@ FilterSubprovider.prototype.newLogFilter = function(opts, cb) { | |
var blockHandler = function(block, cb){ | ||
self._logsForBlock(block, function(err, logs){ | ||
if (err) return cb(err) | ||
logs.forEach(newLogHandler) | ||
newLogHandler(logs) | ||
cb() | ||
}) | ||
} | ||
|
@@ -147,7 +148,7 @@ FilterSubprovider.prototype.newPendingTransactionFilter = function(cb) { | |
var blockHandler = function(block, cb){ | ||
self._txHashesForBlock(block, function(err, txs){ | ||
if (err) return cb(err) | ||
txs.forEach(newTxHandler) | ||
newTxHandler(txs) | ||
cb() | ||
}) | ||
} | ||
|
@@ -205,6 +206,8 @@ FilterSubprovider.prototype.uninstallFilter = function(filterId, cb) { | |
return | ||
} | ||
|
||
self.filters[filterId].removeAllListeners() | ||
|
||
var destroyHandler = self.filterDestroyHandlers[filterId] | ||
delete self.filters[filterId] | ||
delete self.asyncBlockHandlers[filterId] | ||
|
@@ -292,9 +295,12 @@ FilterSubprovider.prototype._txHashesForBlock = function(block, cb) { | |
// BlockFilter | ||
// | ||
|
||
inherits(BlockFilter, EventEmitter) | ||
|
||
function BlockFilter(opts) { | ||
// console.log('BlockFilter - new') | ||
const self = this | ||
EventEmitter.apply(self) | ||
self.type = 'block' | ||
self.engine = opts.engine | ||
self.blockNumber = opts.blockNumber | ||
|
@@ -306,6 +312,7 @@ BlockFilter.prototype.update = function(block){ | |
const self = this | ||
var blockHash = bufferToHex(block.hash) | ||
self.updates.push(blockHash) | ||
self.emit('data', block) | ||
} | ||
|
||
BlockFilter.prototype.getChanges = function(){ | ||
|
@@ -325,9 +332,12 @@ BlockFilter.prototype.clearChanges = function(){ | |
// LogFilter | ||
// | ||
|
||
inherits(LogFilter, EventEmitter) | ||
|
||
function LogFilter(opts) { | ||
// console.log('LogFilter - new') | ||
const self = this | ||
EventEmitter.apply(self) | ||
self.type = 'log' | ||
self.fromBlock = opts.fromBlock || 'latest' | ||
self.toBlock = opts.toBlock || 'latest' | ||
|
@@ -376,15 +386,22 @@ LogFilter.prototype.validateLog = function(log){ | |
return topicsMatch | ||
} | ||
|
||
LogFilter.prototype.update = function(log){ | ||
LogFilter.prototype.update = function(logs){ | ||
// console.log('LogFilter - update') | ||
const self = this | ||
// validate filter match | ||
var validated = self.validateLog(log) | ||
if (!validated) return | ||
// add to results | ||
self.updates.push(log) | ||
self.allResults.push(log) | ||
var validLogs = [] | ||
logs.forEach(function(log) { | ||
var validated = self.validateLog(log) | ||
if (!validated) return | ||
// add to results | ||
validLogs.push(log) | ||
self.updates.push(log) | ||
self.allResults.push(log) | ||
}) | ||
if (validLogs.length > 0) { | ||
self.emit('data', validLogs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this also need the null error event? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, the only consumer of the event is the |
||
} | ||
} | ||
|
||
LogFilter.prototype.getChanges = function(){ | ||
|
@@ -411,9 +428,12 @@ LogFilter.prototype.clearChanges = function(){ | |
// PendingTxFilter | ||
// | ||
|
||
inherits(PendingTransactionFilter, EventEmitter) | ||
|
||
function PendingTransactionFilter(){ | ||
// console.log('PendingTransactionFilter - new') | ||
const self = this | ||
EventEmitter.apply(self) | ||
self.type = 'pendingTx' | ||
self.updates = [] | ||
self.allResults = [] | ||
|
@@ -424,15 +444,22 @@ PendingTransactionFilter.prototype.validateUnique = function(tx){ | |
return self.allResults.indexOf(tx) === -1 | ||
} | ||
|
||
PendingTransactionFilter.prototype.update = function(tx){ | ||
PendingTransactionFilter.prototype.update = function(txs){ | ||
// console.log('PendingTransactionFilter - update') | ||
const self = this | ||
// validate filter match | ||
var validated = self.validateUnique(tx) | ||
if (!validated) return | ||
// add to results | ||
self.updates.push(tx) | ||
self.allResults.push(tx) | ||
var validTxs = [] | ||
txs.forEach(function (tx) { | ||
// validate filter match | ||
var validated = self.validateUnique(tx) | ||
if (!validated) return | ||
// add to results | ||
validTxs.push(tx) | ||
self.updates.push(tx) | ||
self.allResults.push(tx) | ||
}) | ||
if (validTxs.length > 0) { | ||
self.emit('data', validTxs) | ||
} | ||
} | ||
|
||
PendingTransactionFilter.prototype.getChanges = function(){ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
const EventEmitter = require('events').EventEmitter | ||
const FilterSubprovider = require('./filters.js') | ||
const from = require('../util/rpc-hex-encoding.js') | ||
const inherits = require('util').inherits | ||
const utils = require('ethereumjs-util') | ||
|
||
function SubscriptionSubprovider(opts) { | ||
const self = this | ||
|
||
opts = opts || {} | ||
|
||
EventEmitter.apply(this, Array.prototype.slice.call(arguments)) | ||
FilterSubprovider.apply(this, [opts]) | ||
|
||
this.subscriptions = {} | ||
} | ||
|
||
inherits(SubscriptionSubprovider, FilterSubprovider) | ||
|
||
// a cheap crack at multiple inheritance | ||
// I don't really care if `instanceof EventEmitter` passes... | ||
Object.assign(SubscriptionSubprovider.prototype, EventEmitter.prototype) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather leave it like it is - I see no reason why this class needs to be tightly coupled to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that I've thought about it a bit more, I wonder if there shouldn't be an event middleware stack. As pub/sub grows in scope, I could imagine wanting to write middleware that would squelch or mutate notifications. Maybe that's just YAGNI, though... |
||
|
||
// preserve our constructor, though | ||
SubscriptionSubprovider.prototype.constructor = SubscriptionSubprovider | ||
|
||
SubscriptionSubprovider.prototype.eth_subscribe = function(payload, cb) { | ||
const self = this | ||
let createSubscriptionFilter = () => {} | ||
let subscriptionType = payload.params[0] | ||
|
||
switch (subscriptionType) { | ||
case 'logs': | ||
let options = payload.params[1] | ||
|
||
createSubscriptionFilter = self.newLogFilter.bind(self, options) | ||
break | ||
case 'newPendingTransactions': | ||
createSubscriptionFilter = self.newPendingTransactionFilter.bind(self) | ||
break | ||
case 'newHeads': | ||
createSubscriptionFilter = self.newBlockFilter.bind(self) | ||
break | ||
case 'syncing': | ||
default: | ||
cb(new Error('unsupported subscription type')) | ||
return | ||
} | ||
|
||
createSubscriptionFilter(function(err, id) { | ||
if (err) return cb(err) | ||
self.subscriptions[id] = subscriptionType | ||
|
||
self.filters[id].on('data', function(results) { | ||
if (!Array.isArray(results)) { | ||
results = [results] | ||
} | ||
|
||
var notificationHandler = self._notificationHandler.bind(self, id, subscriptionType) | ||
results.forEach(notificationHandler) | ||
self.filters[id].clearChanges() | ||
}) | ||
if (subscriptionType === 'newPendingTransactions') { | ||
self.checkForPendingBlocks() | ||
} | ||
cb(null, id) | ||
}) | ||
} | ||
|
||
SubscriptionSubprovider.prototype._notificationHandler = function (id, subscriptionType, result) { | ||
const self = this | ||
if (subscriptionType === 'newHeads') { | ||
result = self._notificationResultFromBlock(result) | ||
} | ||
|
||
// it seems that web3 doesn't expect there to be a separate error event | ||
// so we must emit null along with the result object | ||
self.emit('data', null, { | ||
jsonrpc: "2.0", | ||
method: "eth_subscription", | ||
params: { | ||
subscription: id, | ||
result: result, | ||
}, | ||
}) | ||
} | ||
|
||
SubscriptionSubprovider.prototype._notificationResultFromBlock = function(block) { | ||
return { | ||
hash: utils.bufferToHex(block.hash), | ||
parentHash: utils.bufferToHex(block.parentHash), | ||
sha3Uncles: utils.bufferToHex(block.sha3Uncles), | ||
miner: utils.bufferToHex(block.miner), | ||
stateRoot: utils.bufferToHex(block.stateRoot), | ||
transactionsRoot: utils.bufferToHex(block.transactionsRoot), | ||
receiptsRoot: utils.bufferToHex(block.receiptsRoot), | ||
logsBloom: utils.bufferToHex(block.logsBloom), | ||
difficulty: from.intToQuantityHex(utils.bufferToInt(block.difficulty)), | ||
number: from.intToQuantityHex(utils.bufferToInt(block.number)), | ||
gasLimit: from.intToQuantityHex(utils.bufferToInt(block.gasLimit)), | ||
gasUsed: from.intToQuantityHex(utils.bufferToInt(block.gasUsed)), | ||
nonce: block.nonce ? utils.bufferToHex(block.nonce): null, | ||
timestamp: from.intToQuantityHex(utils.bufferToInt(block.timestamp)), | ||
extraData: utils.bufferToHex(block.extraData) | ||
} | ||
} | ||
|
||
SubscriptionSubprovider.prototype.eth_unsubscribe = function(payload, cb) { | ||
const self = this | ||
let subscriptionId = payload.params[0] | ||
if (!self.subscriptions[subscriptionId]) { | ||
cb(new Error(`Subscription ID ${subscriptionId} not found.`)) | ||
} else { | ||
let subscriptionType = self.subscriptions[subscriptionId] | ||
self.uninstallFilter(subscriptionId, function (err, result) { | ||
delete self.subscriptions[subscriptionId] | ||
cb(err, result) | ||
}) | ||
} | ||
} | ||
|
||
SubscriptionSubprovider.prototype.handleRequest = function(payload, next, end) { | ||
switch(payload.method){ | ||
case 'eth_subscribe': | ||
this.eth_subscribe(payload, end) | ||
break | ||
case 'eth_unsubscribe': | ||
this.eth_unsubscribe(payload, end) | ||
break | ||
default: | ||
FilterSubprovider.prototype.handleRequest.apply(this, Array.prototype.slice.call(arguments)) | ||
} | ||
} | ||
|
||
module.exports = SubscriptionSubprovider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is correct and fine but you can also just do