diff --git a/index.js b/index.js index f45b0130..8314134e 100644 --- a/index.js +++ b/index.js @@ -22,7 +22,7 @@ function Web3ProviderEngine(opts) { // block polling const skipInitLockProvider = { sendAsync: self._handleAsync.bind(self) } const blockTrackerProvider = opts.blockTrackerProvider || skipInitLockProvider - self._blockTracker = new EthBlockTracker({ + self._blockTracker = opts.blockTracker || new EthBlockTracker({ provider: blockTrackerProvider, pollingInterval: opts.pollingInterval || 4000, }) diff --git a/subproviders/filters.js b/subproviders/filters.js index 3ad1adf2..0ce01da6 100644 --- a/subproviders/filters.js +++ b/subproviders/filters.js @@ -3,6 +3,7 @@ const inherits = require('util').inherits const ethUtil = require('ethereumjs-util') const Subprovider = require('./subprovider.js') const Stoplight = require('../util/stoplight.js') +const EventEmitter = require('events').EventEmitter module.exports = FilterSubprovider @@ -125,7 +126,7 @@ FilterSubprovider.prototype.newLogFilter = function(opts, cb) { var blockHandler = function(block, cb){ self._logsForBlock(block, function(err, logs){ if (err) return cb(err) - logs.forEach(newLogHandler) + newLogHandler(logs) cb() }) } @@ -147,7 +148,7 @@ FilterSubprovider.prototype.newPendingTransactionFilter = function(cb) { var blockHandler = function(block, cb){ self._txHashesForBlock(block, function(err, txs){ if (err) return cb(err) - txs.forEach(newTxHandler) + newTxHandler(txs) cb() }) } @@ -205,6 +206,8 @@ FilterSubprovider.prototype.uninstallFilter = function(filterId, cb) { return } + self.filters[filterId].removeAllListeners() + var destroyHandler = self.filterDestroyHandlers[filterId] delete self.filters[filterId] delete self.asyncBlockHandlers[filterId] @@ -292,9 +295,12 @@ FilterSubprovider.prototype._txHashesForBlock = function(block, cb) { // BlockFilter // +inherits(BlockFilter, EventEmitter) + function BlockFilter(opts) { // console.log('BlockFilter - new') const self = this + EventEmitter.apply(self) self.type = 'block' self.engine = opts.engine self.blockNumber = opts.blockNumber @@ -306,6 +312,7 @@ BlockFilter.prototype.update = function(block){ const self = this var blockHash = bufferToHex(block.hash) self.updates.push(blockHash) + self.emit('data', block) } BlockFilter.prototype.getChanges = function(){ @@ -325,9 +332,12 @@ BlockFilter.prototype.clearChanges = function(){ // LogFilter // +inherits(LogFilter, EventEmitter) + function LogFilter(opts) { // console.log('LogFilter - new') const self = this + EventEmitter.apply(self) self.type = 'log' self.fromBlock = opts.fromBlock || 'latest' self.toBlock = opts.toBlock || 'latest' @@ -376,15 +386,22 @@ LogFilter.prototype.validateLog = function(log){ return topicsMatch } -LogFilter.prototype.update = function(log){ +LogFilter.prototype.update = function(logs){ // console.log('LogFilter - update') const self = this // validate filter match - var validated = self.validateLog(log) - if (!validated) return - // add to results - self.updates.push(log) - self.allResults.push(log) + var validLogs = [] + logs.forEach(function(log) { + var validated = self.validateLog(log) + if (!validated) return + // add to results + validLogs.push(log) + self.updates.push(log) + self.allResults.push(log) + }) + if (validLogs.length > 0) { + self.emit('data', validLogs) + } } LogFilter.prototype.getChanges = function(){ @@ -411,9 +428,12 @@ LogFilter.prototype.clearChanges = function(){ // PendingTxFilter // +inherits(PendingTransactionFilter, EventEmitter) + function PendingTransactionFilter(){ // console.log('PendingTransactionFilter - new') const self = this + EventEmitter.apply(self) self.type = 'pendingTx' self.updates = [] self.allResults = [] @@ -424,15 +444,22 @@ PendingTransactionFilter.prototype.validateUnique = function(tx){ return self.allResults.indexOf(tx) === -1 } -PendingTransactionFilter.prototype.update = function(tx){ +PendingTransactionFilter.prototype.update = function(txs){ // console.log('PendingTransactionFilter - update') const self = this - // validate filter match - var validated = self.validateUnique(tx) - if (!validated) return - // add to results - self.updates.push(tx) - self.allResults.push(tx) + var validTxs = [] + txs.forEach(function (tx) { + // validate filter match + var validated = self.validateUnique(tx) + if (!validated) return + // add to results + validTxs.push(tx) + self.updates.push(tx) + self.allResults.push(tx) + }) + if (validTxs.length > 0) { + self.emit('data', validTxs) + } } PendingTransactionFilter.prototype.getChanges = function(){ diff --git a/subproviders/subscriptions.js b/subproviders/subscriptions.js new file mode 100644 index 00000000..cced01f7 --- /dev/null +++ b/subproviders/subscriptions.js @@ -0,0 +1,135 @@ +const EventEmitter = require('events').EventEmitter +const FilterSubprovider = require('./filters.js') +const from = require('../util/rpc-hex-encoding.js') +const inherits = require('util').inherits +const utils = require('ethereumjs-util') + +function SubscriptionSubprovider(opts) { + const self = this + + opts = opts || {} + + EventEmitter.apply(this, Array.prototype.slice.call(arguments)) + FilterSubprovider.apply(this, [opts]) + + this.subscriptions = {} +} + +inherits(SubscriptionSubprovider, FilterSubprovider) + +// a cheap crack at multiple inheritance +// I don't really care if `instanceof EventEmitter` passes... +Object.assign(SubscriptionSubprovider.prototype, EventEmitter.prototype) + +// preserve our constructor, though +SubscriptionSubprovider.prototype.constructor = SubscriptionSubprovider + +SubscriptionSubprovider.prototype.eth_subscribe = function(payload, cb) { + const self = this + let createSubscriptionFilter = () => {} + let subscriptionType = payload.params[0] + + switch (subscriptionType) { + case 'logs': + let options = payload.params[1] + + createSubscriptionFilter = self.newLogFilter.bind(self, options) + break + case 'newPendingTransactions': + createSubscriptionFilter = self.newPendingTransactionFilter.bind(self) + break + case 'newHeads': + createSubscriptionFilter = self.newBlockFilter.bind(self) + break + case 'syncing': + default: + cb(new Error('unsupported subscription type')) + return + } + + createSubscriptionFilter(function(err, id) { + if (err) return cb(err) + self.subscriptions[id] = subscriptionType + + self.filters[id].on('data', function(results) { + if (!Array.isArray(results)) { + results = [results] + } + + var notificationHandler = self._notificationHandler.bind(self, id, subscriptionType) + results.forEach(notificationHandler) + self.filters[id].clearChanges() + }) + if (subscriptionType === 'newPendingTransactions') { + self.checkForPendingBlocks() + } + cb(null, id) + }) +} + +SubscriptionSubprovider.prototype._notificationHandler = function (id, subscriptionType, result) { + const self = this + if (subscriptionType === 'newHeads') { + result = self._notificationResultFromBlock(result) + } + + // it seems that web3 doesn't expect there to be a separate error event + // so we must emit null along with the result object + self.emit('data', null, { + jsonrpc: "2.0", + method: "eth_subscription", + params: { + subscription: id, + result: result, + }, + }) +} + +SubscriptionSubprovider.prototype._notificationResultFromBlock = function(block) { + return { + hash: utils.bufferToHex(block.hash), + parentHash: utils.bufferToHex(block.parentHash), + sha3Uncles: utils.bufferToHex(block.sha3Uncles), + miner: utils.bufferToHex(block.miner), + stateRoot: utils.bufferToHex(block.stateRoot), + transactionsRoot: utils.bufferToHex(block.transactionsRoot), + receiptsRoot: utils.bufferToHex(block.receiptsRoot), + logsBloom: utils.bufferToHex(block.logsBloom), + difficulty: from.intToQuantityHex(utils.bufferToInt(block.difficulty)), + number: from.intToQuantityHex(utils.bufferToInt(block.number)), + gasLimit: from.intToQuantityHex(utils.bufferToInt(block.gasLimit)), + gasUsed: from.intToQuantityHex(utils.bufferToInt(block.gasUsed)), + nonce: block.nonce ? utils.bufferToHex(block.nonce): null, + timestamp: from.intToQuantityHex(utils.bufferToInt(block.timestamp)), + extraData: utils.bufferToHex(block.extraData) + } +} + +SubscriptionSubprovider.prototype.eth_unsubscribe = function(payload, cb) { + const self = this + let subscriptionId = payload.params[0] + if (!self.subscriptions[subscriptionId]) { + cb(new Error(`Subscription ID ${subscriptionId} not found.`)) + } else { + let subscriptionType = self.subscriptions[subscriptionId] + self.uninstallFilter(subscriptionId, function (err, result) { + delete self.subscriptions[subscriptionId] + cb(err, result) + }) + } +} + +SubscriptionSubprovider.prototype.handleRequest = function(payload, next, end) { + switch(payload.method){ + case 'eth_subscribe': + this.eth_subscribe(payload, end) + break + case 'eth_unsubscribe': + this.eth_unsubscribe(payload, end) + break + default: + FilterSubprovider.prototype.handleRequest.apply(this, Array.prototype.slice.call(arguments)) + } +} + +module.exports = SubscriptionSubprovider diff --git a/test/index.js b/test/index.js index 06bb0d78..c0d6c7bf 100644 --- a/test/index.js +++ b/test/index.js @@ -3,6 +3,7 @@ require('./cache-utils') require('./cache') require('./inflight-cache') require('./filters') +require('./subscriptions') require('./solc') require('./wallet') require('./subproviders/sanitizer') diff --git a/test/subscriptions.js b/test/subscriptions.js new file mode 100644 index 00000000..699ed283 --- /dev/null +++ b/test/subscriptions.js @@ -0,0 +1,237 @@ +const test = require('tape') +const ProviderEngine = require('../index.js') +const SubscriptionSubprovider = require('../subproviders/subscriptions.js') +const TestBlockProvider = require('./util/block.js') +const createPayload = require('../util/create-payload.js') +const injectMetrics = require('./util/inject-metrics') + +subscriptionTest('basic block subscription', { + method: 'eth_subscribe', + params: ['newHeads'] + }, + function afterInstall(t, testMeta, response, cb){ + // nothing to do here, we just need a new block, which subscriptionTest does for us + cb() + }, + function subscriptionChanges(t, testMeta, response, cb){ + let returnedBlockHash = response.params.result.hash + t.equal(returnedBlockHash, testMeta.block.hash, 'correct result') + cb() + } +) + +subscriptionTest('log subscription - basic', { + method: 'eth_subscribe', + params: ['logs', { + topics: ['0x00000000000000000000000000000000000000000000000000deadbeefcafe01'] + }], + }, + function afterInstall(t, testMeta, response, cb){ + testMeta.tx = testMeta.blockProvider.addTx({ + topics: ['0x00000000000000000000000000000000000000000000000000deadbeefcafe01'] + }) + testMeta.badTx = testMeta.blockProvider.addTx({ + topics: ['0x00000000000000000000000000000000000000000000000000deadbeefcafe02'] + }) + cb() + }, + function subscriptionChanges(t, testMeta, response, cb){ + var matchedTx = response.params.result + t.equal(matchedTx, testMeta.tx, 'correct result') + cb() + } +) + +subscriptionTest('log subscription - and logic', { + method: 'eth_subscribe', + params: ['logs', { + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }], + }, + function afterInstall(t, testMeta, response, cb){ + testMeta.tx = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }) + testMeta.badTx = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + ], + }) + cb() + }, + function subscriptionChangesOne(t, testMeta, response, cb){ + var matchedTx = response.params.result + t.equal(matchedTx, testMeta.tx, 'correct result') + cb() + } +) + +subscriptionTest('log subscription - or logic', { + method: 'eth_subscribe', + params: ['logs', { + topics: [ + [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + ], + }], + }, + function afterInstall(t, testMeta, response, cb){ + testMeta.tx1 = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + ], + }) + cb() + }, + function subscriptionChangesOne(t, testMeta, response, cb){ + var matchedTx1 = response.params.result + t.equal(matchedTx1, testMeta.tx1, 'correct result') + + testMeta.tx2 = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }) + cb() + }, + function subscriptionChangesTwo(t, testMeta, response, cb){ + var matchedTx2 = response.params.result + t.equal(matchedTx2, testMeta.tx2, 'correct result') + cb() + } +) + +subscriptionTest('log subscription - wildcard logic', { + method: 'eth_subscribe', + params: ['logs', { + topics: [ + null, + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }], + }, + function afterInstall(t, testMeta, response, cb){ + testMeta.tx1 = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe01', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }) + cb() + }, + function subscriptionChangesOne(t, testMeta, response, cb){ + var matchedTx1 = response.params.result + t.equal(matchedTx1, testMeta.tx1, 'correct result') + testMeta.tx2 = testMeta.blockProvider.addTx({ + topics: [ + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + '0x00000000000000000000000000000000000000000000000000deadbeefcafe02', + ], + }) + cb() + }, + function subscriptionChangesTwo(t, testMeta, response, cb){ + var matchedTx2 = response.params.result + t.equal(matchedTx2, testMeta.tx2, 'correct result') + cb() + } +) + +function subscriptionTest(label, subscriptionPayload, afterInstall, subscriptionChangesOne, subscriptionChangesTwo) { + let testMeta = {} + let t = test('subscriptions - '+label, function(t) { + // subscribe + // new block + // check for notification + + + // handle "test_rpc" + let subscriptionSubprovider = testMeta.subscriptionSubprovider = injectMetrics(new SubscriptionSubprovider()) + // handle block requests + let blockProvider = testMeta.blockProvider = injectMetrics(new TestBlockProvider()) + + let engine = testMeta.engine = new ProviderEngine({ + pollingInterval: 20, + pollingShouldUnref: false, + }) + engine.addProvider(subscriptionSubprovider) + engine.addProvider(blockProvider) + engine.once('block', startTest) + + setTimeout(() => { + engine.start() + }, 1) + + function startTest(){ + // register subscription + engine.sendAsync(createPayload(subscriptionPayload), function(err, response){ + t.ifError(err, 'did not error') + t.ok(response, 'has response') + + let method = subscriptionPayload.method + + t.equal(subscriptionSubprovider.getWitnessed(method).length, 1, 'subscriptionSubprovider did see "'+method+'"') + t.equal(subscriptionSubprovider.getHandled(method).length, 1, 'subscriptionSubprovider did handle "'+method+'"') + + let subscriptionId = testMeta.subscriptionId = response.result + + // manipulates next block to trigger a notification + afterInstall(t, testMeta, response, function(err){ + t.ifError(err, 'did not error') + subscriptionSubprovider.once('data', continueTest) + // create next block so that notification is sent + testMeta.block = testMeta.blockProvider.nextBlock() + }) + }) + } + + // handle first notification + function continueTest(err, notification){ + let subscriptionId = testMeta.subscriptionId + // after subscription check one + t.ifError(err, 'did not error') + t.ok(notification, 'has notification') + t.equal(notification.params.subscription, subscriptionId, 'notification has correct subscription id') + + // test-specific checks, and make changes to next block to trigger next notification + subscriptionChangesOne(t, testMeta, notification, function(err){ + t.ifError(err, 'did not error') + + if (subscriptionChangesTwo) { + subscriptionSubprovider.once('data', function (err, notification) { + t.ifError(err, 'did not error') + t.ok(notification, 'has notification') + + // final checks + subscriptionChangesTwo(t, testMeta, notification, function (err) { + t.ifError(err, 'did not error') + end() + }) + }) + + // trigger a new block so that the above handler runs + testMeta.block = testMeta.blockProvider.nextBlock() + } else { + end() + } + }) + } + + function end() { + engine.sendAsync(createPayload({ method: 'eth_unsubscribe', params: [testMeta.subscriptionId] }), function (err, response) { + testMeta.engine.stop() + t.ifError(err, 'did not error') + t.end() + }) + } + }) +}