From 41be320c8d257b97c17687ceeca2b4eab38ccf4e Mon Sep 17 00:00:00 2001 From: Moshe Kolodny Date: Fri, 8 Dec 2017 11:05:02 -0500 Subject: [PATCH 1/5] Add smart retries to table.mutate --- .circleci/config.yml | 6 + integration-test/.eslintrc.yml | 3 + .../mutate-rows-acceptance-test.json | 117 +++++++++++++++ integration-test/mutate-rows.js | 136 ++++++++++++++++++ package.json | 12 +- patches/patch-for-v4.patch | 13 ++ patches/patch-for-v6-and-up.patch | 13 ++ src/table.js | 101 ++++++++----- test/table.js | 7 + 9 files changed, 371 insertions(+), 37 deletions(-) create mode 100644 integration-test/.eslintrc.yml create mode 100644 integration-test/mutate-rows-acceptance-test.json create mode 100644 integration-test/mutate-rows.js create mode 100644 patches/patch-for-v4.patch create mode 100644 patches/patch-for-v6-and-up.patch diff --git a/.circleci/config.yml b/.circleci/config.yml index 1c91aadd1..f756ace2f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -9,6 +9,9 @@ unit_tests: &unit_tests - run: name: Run unit tests. command: npm test + - run: + name: Run integration tests. + command: npm run integration-test - run: name: Submit coverage data to codecov. command: node_modules/.bin/codecov @@ -99,6 +102,9 @@ jobs: - run: name: Run unit tests. command: npm test + - run: + name: Run integration tests. + command: npm run integration-test - run: name: Submit coverage data to codecov. command: node_modules/.bin/codecov diff --git a/integration-test/.eslintrc.yml b/integration-test/.eslintrc.yml new file mode 100644 index 000000000..6db2a46c5 --- /dev/null +++ b/integration-test/.eslintrc.yml @@ -0,0 +1,3 @@ +--- +env: + mocha: true diff --git a/integration-test/mutate-rows-acceptance-test.json b/integration-test/mutate-rows-acceptance-test.json new file mode 100644 index 000000000..7ecc1b01d --- /dev/null +++ b/integration-test/mutate-rows-acceptance-test.json @@ -0,0 +1,117 @@ +{ + "tests": [ + { + "name": "valid mutation", + "max_retries": 3, + "mutations_request": [ + { "method": "insert", "key": "foo", "data": {} }, + { "method": "insert", "key": "bar", "data": {} }, + { "method": "insert", "key": "baz", "data": {} } + ], + "responses": [ + { "code": 200, "entry_codes": [ 0, 0, 0 ] } + ], + "mutation_batches_invoked": [ + [ "foo", "bar", "baz" ] + ] + }, { + "name": "retries the failed mutations", + "max_retries": 3, + "mutations_request": [ + { "method": "insert", "key": "foo", "data": {} }, + { "method": "insert", "key": "bar", "data": {} }, + { "method": "insert", "key": "baz", "data": {} } + ], + "responses": [ + { "code": 200, "entry_codes": [ 0, 4, 4 ] }, + { "code": 200, "entry_codes": [ 4, 0 ] }, + { "code": 200, "entry_codes": [ 4 ] }, + { "code": 200, "entry_codes": [ 0 ] } + ], + "mutation_batches_invoked": [ + [ "foo", "bar", "baz" ], + [ "bar", "baz" ], + [ "bar" ], + [ "bar" ] + ] + }, { + "name": "has a `PartialFailureError` error when an entry fails after the retries", + "max_retries": 3, + "mutations_request": [ + { "method": "insert", "key": "foo", "data": {} }, + { "method": "insert", "key": "bar", "data": {} }, + { "method": "insert", "key": "baz", "data": {} } + ], + "responses": [ + { "code": 200, "entry_codes": [ 0, 4, 0 ] }, + { "code": 200, "entry_codes": [ 4 ] }, + { "code": 200, "entry_codes": [ 4 ] }, + { "code": 200, "entry_codes": [ 4 ] } + ], + "mutation_batches_invoked": [ + [ "foo", "bar", "baz" ], + [ "bar" ], + [ "bar" ], + [ "bar" ] + ], + "errors": [ + { "index_in_mutations_request": 1 } + ] + }, { + "name": "does not retry unretryable mutations", + "max_retries": 5, + "mutations_request": [ + { "method": "insert", "key": "foo", "data": {} }, + { "method": "insert", "key": "bar", "data": {} }, + { "method": "insert", "key": "baz", "data": {} }, + { "method": "insert", "key": "qux", "data": {} }, + { "method": "insert", "key": "quux", "data": {} }, + { "method": "insert", "key": "corge", "data": {} } + ], + "responses": [ + { "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] }, + { "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] }, + { "code": 200, "entry_codes": [ 1, 4, 4, 0 ] }, + { "code": 200, "entry_codes": [ 0, 4 ] }, + { "code": 200, "entry_codes": [ 4 ] }, + { "code": 200, "entry_codes": [ 1 ] } + ], + "mutation_batches_invoked": [ + [ "foo", "bar", "baz", "qux", "quux", "corge" ], + [ "foo", "bar", "baz", "qux", "quux" ], + [ "foo", "bar", "baz", "qux" ], + [ "bar", "baz" ], + [ "baz" ], + [ "baz" ] + ], + "errors": [ + { "index_in_mutations_request": 0 }, + { "index_in_mutations_request": 2 }, + { "index_in_mutations_request": 5 } + ] + }, { + "name": "considers network errors towards the retry count", + "max_retries": 3, + "mutations_request": [ + { "method": "insert", "key": "foo", "data": {} }, + { "method": "insert", "key": "bar", "data": {} }, + { "method": "insert", "key": "baz", "data": {} } + ], + "responses": [ + { "code": 200, "entry_codes": [ 0, 4, 0 ] }, + { "code": 429 }, + { "code": 200, "entry_codes": [ 4 ] }, + { "code": 200, "entry_codes": [ 4 ] } + ], + "mutation_batches_invoked": [ + [ "foo", "bar", "baz" ], + [ "bar" ], + [ "bar" ], + [ "bar" ] + ], + "errors": [ + { "index_in_mutations_request": 1 } + ] + } + ] +} diff --git a/integration-test/mutate-rows.js b/integration-test/mutate-rows.js new file mode 100644 index 000000000..b5eb76bf0 --- /dev/null +++ b/integration-test/mutate-rows.js @@ -0,0 +1,136 @@ +'use strict'; + +const Bigtable = require('../'); + +const tests = require('./mutate-rows-acceptance-test.json').tests; + +const assert = require('assert'); +const grpc = require('grpc'); +const sinon = require('sinon'); +const through = require('through2'); + +function dispatch(emitter, response) { + const emits = []; + emits.push({name: 'response', arg: {code: response.code}}); + if (response.entry_codes) { + emits.push({name: 'data', arg: entryResponses(response.entry_codes)}); + } + emits.push({name: 'end'}); + let index = 0; + setImmediate(next); + + function next() { + if (index < emits.length) { + const emit = emits[index]; + index++; + emitter.emit(emit.name, emit.arg); + setImmediate(next); + } + } +} + +function entryResponses(statusCodes) { + return { + entries: statusCodes.map((code, index) => ({ + index, + status: {code}, + })), + }; +} + +function getDeltas(array) { + return array.reduce((acc, item, index) => { + return index ? acc.concat(item - array[index - 1]) : [item]; + }, []); +} + +describe('Bigtable/Table', () => { + const bigtable = new Bigtable(); + bigtable.grpcCredentials = grpc.credentials.createInsecure(); + const bigtableService = bigtable.getService_({service: 'Bigtable'}); + + const INSTANCE = bigtable.instance('instance'); + const TABLE = INSTANCE.table('table'); + + describe('mutate()', () => { + let clock; + let mutationBatchesInvoked; + let mutationCallTimes; + let responses; + let stub; + + beforeEach(() => { + clock = sinon.useFakeTimers({ + toFake: [ + 'setTimeout', + 'clearTimeout', + 'setImmediate', + 'clearImmediate', + 'setInterval', + 'clearInterval', + 'Date', + 'nextTick', + ], + }); + mutationBatchesInvoked = []; + mutationCallTimes = []; + responses = null; + stub = sinon.stub(bigtableService, 'mutateRows').callsFake(grpcOpts => { + mutationBatchesInvoked.push( + grpcOpts.entries.map(entry => entry.rowKey.asciiSlice()) + ); + mutationCallTimes.push(new Date().getTime()); + const emitter = through.obj(); + dispatch(emitter, responses.shift()); + return emitter; + }); + }); + + afterEach(() => { + clock.uninstall(); + stub.restore(); + }); + + tests.forEach(test => { + it(test.name, done => { + responses = test.responses; + TABLE.maxRetries = test.max_retries; + TABLE.mutate(test.mutations_request, error => { + assert.deepEqual( + mutationBatchesInvoked, + test.mutation_batches_invoked + ); + getDeltas(mutationCallTimes).forEach((delta, index) => { + if (index === 0) { + const message = 'First request should happen Immediately'; + assert.strictEqual(index, 0, message); + return; + } + const minBackoff = 1000 * Math.pow(2, index); + + // Adjust for some flakiness with the fake timers. + const maxBackoff = minBackoff + 1010; + const message = + `Backoff for retry #${index} should be between ` + + `${minBackoff} and ${maxBackoff}, was ${delta}`; + assert(delta > minBackoff, message); + assert(delta < maxBackoff, message); + }); + if (test.errors) { + const expectedIndices = test.errors.map(error => { + return error.index_in_mutations_request; + }); + const actualIndices = error.errors.map(error => { + return test.mutations_request.indexOf(error.entry); + }); + assert.deepEqual(expectedIndices, actualIndices); + } else { + assert.ifError(error); + } + done(); + }); + clock.runAll(); + }); + }); + }); +}); diff --git a/package.json b/package.json index 0e7de4685..04f10c731 100644 --- a/package.json +++ b/package.json @@ -48,14 +48,16 @@ "test-no-cover": "repo-tools test run --cmd mocha -- test/*.js --no-timeouts", "test": "repo-tools test run --cmd npm -- run cover", "generate-scaffolding": "repo-tools generate all && repo-tools generate lib_samples_readme -l samples/ --config ../.cloud-repo-tools.json", - "lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ test/", - "prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js", + "lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ integration-test/ test/", + "prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js integration-test/*.js integration-test/*/*.js", "publish-module": "node ../../scripts/publish.js bigtable", - "system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts" + "system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts", + "preintegration-test": "git apply patches/patch-for-v4.patch || git apply patches/patch-for-v6-and-up.patch || true", + "integration-test": "repo-tools test run --cmd mocha -- integration-test/*.js --timeout 10000" }, "dependencies": { "@google-cloud/common": "^0.15.1", - "@google-cloud/common-grpc": "^0.4.0", + "@google-cloud/common-grpc": "^0.5.0", "arrify": "^1.0.0", "concat-stream": "^1.5.0", "create-error-class": "^3.0.2", @@ -66,6 +68,7 @@ "node-int64": "^0.4.0", "prop-assign": "^1.0.0", "pumpify": "^1.3.3", + "retry-request": "^3.3.0", "safe-buffer": "^5.1.1", "string-format-obj": "^1.0.0", "through2": "^2.0.0" @@ -78,6 +81,7 @@ "eslint-config-prettier": "^2.6.0", "eslint-plugin-node": "^5.2.0", "eslint-plugin-prettier": "^2.3.1", + "grpc": "^1.7.2", "ink-docstrap": "^1.3.0", "intelli-espower-loader": "^1.0.1", "jsdoc": "^3.5.5", diff --git a/patches/patch-for-v4.patch b/patches/patch-for-v4.patch new file mode 100644 index 000000000..4587e7541 --- /dev/null +++ b/patches/patch-for-v4.patch @@ -0,0 +1,13 @@ +--- a/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js ++++ b/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js +@@ -5,7 +5,9 @@ if (!process.version || + process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) { + module.exports = nextTick; + } else { +- module.exports = process.nextTick; ++ module.exports = function() { ++ return process.nextTick.apply(this, arguments); ++ }; + } + + function nextTick(fn, arg1, arg2, arg3) { diff --git a/patches/patch-for-v6-and-up.patch b/patches/patch-for-v6-and-up.patch new file mode 100644 index 000000000..5955f190c --- /dev/null +++ b/patches/patch-for-v6-and-up.patch @@ -0,0 +1,13 @@ +--- a/node_modules/process-nextick-args/index.js ++++ b/node_modules/process-nextick-args/index.js +@@ -5,7 +5,9 @@ if (!process.version || + process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) { + module.exports = nextTick; + } else { +- module.exports = process.nextTick; ++ module.exports = function() { ++ return process.nextTick.apply(this, arguments); ++ }; + } + + function nextTick(fn, arg1, arg2, arg3) { diff --git a/src/table.js b/src/table.js index 476521a02..5eef1402f 100644 --- a/src/table.js +++ b/src/table.js @@ -24,6 +24,7 @@ var flatten = require('lodash.flatten'); var is = require('is'); var propAssign = require('prop-assign'); var pumpify = require('pumpify'); +var retryRequest = require('retry-request'); var through = require('through2'); var util = require('util'); @@ -32,6 +33,10 @@ var Filter = require('./filter.js'); var Mutation = require('./mutation.js'); var Row = require('./row.js'); +// See protos/google/rpc/code.proto +// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) +const retryCodes = new Set([4, 10, 14]); + /** * Create a Table object to interact with a Cloud Bigtable table. * @@ -927,45 +932,75 @@ Table.prototype.insert = function(entries, callback) { Table.prototype.mutate = function(entries, callback) { entries = flatten(arrify(entries)); - var grpcOpts = { - service: 'Bigtable', - method: 'mutateRows', - }; + var requestsMade = 0; - var reqOpts = { - objectMode: true, - tableName: this.id, - entries: entries.map(Mutation.parse), - }; - - var mutationErrors = []; - - this.requestStream(grpcOpts, reqOpts) - .on('error', callback) - .on('data', function(obj) { - obj.entries.forEach(function(entry) { - // Mutation was successful. - if (entry.status.code === 0) { - return; - } + var maxRetries = typeof this.maxRetries === 'number' ? this.maxRetries : 3; + var pendingEntryIndices = new Set(entries.map((entry, index) => index)); + var entryToIndex = new Map(entries.map((entry, index) => [entry, index])); + var mutationErrorsByEntryIndex = new Map(); - var status = commonGrpc.Service.decorateStatus_(entry.status); - status.entry = entries[entry.index]; - - mutationErrors.push(status); + var onBatchResponse = error => { + if (pendingEntryIndices.size !== 0 && requestsMade <= maxRetries) { + setTimeout( + makeNextRequestBatch, + retryRequest.getNextRetryDelay(requestsMade) + ); + return; + } + var err = error; + if (mutationErrorsByEntryIndex.size !== 0) { + var mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); + err = new common.util.PartialFailureError({ + errors: mutationErrors, }); - }) - .on('end', function() { - var err = null; + } + callback(err); + }; - if (mutationErrors.length > 0) { - err = new common.util.PartialFailureError({ - errors: mutationErrors, + var makeNextRequestBatch = () => { + var grpcOpts = { + service: 'Bigtable', + method: 'mutateRows', + retryOpts: { + currentRetryAttempt: requestsMade, + }, + }; + var entryBatch = entries.filter((entry, index) => + pendingEntryIndices.has(index) + ); + var reqOpts = { + objectMode: true, + tableName: this.id, + entries: entryBatch.map(Mutation.parse), + }; + this.requestStream(grpcOpts, reqOpts) + .on('request', () => requestsMade++) + .on('data', function(obj) { + obj.entries.forEach(function(entry) { + var originalEntry = entryBatch[entry.index]; + var originalEntriesIndex = entryToIndex.get(originalEntry); + + // Mutation was successful. + if (entry.status.code === 0) { + pendingEntryIndices.delete(originalEntriesIndex); + mutationErrorsByEntryIndex.delete(originalEntriesIndex); + return; + } + + if (!retryCodes.has(entry.status.code)) { + pendingEntryIndices.delete(originalEntriesIndex); + } + + var status = commonGrpc.Service.decorateStatus_(entry.status); + status.entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, status); }); - } + }) + .on('end', onBatchResponse) + .on('error', onBatchResponse); + }; - callback(err); - }); + makeNextRequestBatch(); }; /** diff --git a/test/table.js b/test/table.js index 79e63f616..f3481200f 100644 --- a/test/table.js +++ b/test/table.js @@ -976,6 +976,9 @@ describe('Bigtable/Table', function() { assert.deepEqual(grpcOpts, { service: 'Bigtable', method: 'mutateRows', + retryOpts: { + currentRetryAttempt: 0, + }, }); assert.strictEqual(reqOpts.tableName, TABLE_NAME); @@ -1004,6 +1007,7 @@ describe('Bigtable/Table', function() { }); setImmediate(function() { + stream.emit('request'); stream.emit('error', error); }); @@ -1012,6 +1016,7 @@ describe('Bigtable/Table', function() { }); it('should return the error to the callback', function(done) { + table.maxRetries = 0; table.mutate(entries, function(err) { assert.strictEqual(err, error); done(); @@ -1104,6 +1109,7 @@ describe('Bigtable/Table', function() { }); setImmediate(function() { + stream.emit('request'); stream.end({entries: fakeStatuses}); }); @@ -1112,6 +1118,7 @@ describe('Bigtable/Table', function() { }); it('should execute callback', function(done) { + table.maxRetries = 0; table.mutate(entries, done); }); }); From 921d1ef7a77d1b109d76338557eae1d98f9d4336 Mon Sep 17 00:00:00 2001 From: Moshe Kolodny Date: Mon, 11 Dec 2017 17:47:31 -0500 Subject: [PATCH 2/5] moved integration-test to system tests addressed PR comments. --- .circleci/config.yml | 6 ------ integration-test/.eslintrc.yml | 3 --- package.json | 10 ++++------ .../mutate-rows-retry-test.json | 0 {integration-test => system-test}/mutate-rows.js | 4 ++-- 5 files changed, 6 insertions(+), 17 deletions(-) delete mode 100644 integration-test/.eslintrc.yml rename integration-test/mutate-rows-acceptance-test.json => system-test/mutate-rows-retry-test.json (100%) rename {integration-test => system-test}/mutate-rows.js (96%) diff --git a/.circleci/config.yml b/.circleci/config.yml index f756ace2f..1c91aadd1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -9,9 +9,6 @@ unit_tests: &unit_tests - run: name: Run unit tests. command: npm test - - run: - name: Run integration tests. - command: npm run integration-test - run: name: Submit coverage data to codecov. command: node_modules/.bin/codecov @@ -102,9 +99,6 @@ jobs: - run: name: Run unit tests. command: npm test - - run: - name: Run integration tests. - command: npm run integration-test - run: name: Submit coverage data to codecov. command: node_modules/.bin/codecov diff --git a/integration-test/.eslintrc.yml b/integration-test/.eslintrc.yml deleted file mode 100644 index 6db2a46c5..000000000 --- a/integration-test/.eslintrc.yml +++ /dev/null @@ -1,3 +0,0 @@ ---- -env: - mocha: true diff --git a/package.json b/package.json index 04f10c731..42beea2d4 100644 --- a/package.json +++ b/package.json @@ -48,12 +48,11 @@ "test-no-cover": "repo-tools test run --cmd mocha -- test/*.js --no-timeouts", "test": "repo-tools test run --cmd npm -- run cover", "generate-scaffolding": "repo-tools generate all && repo-tools generate lib_samples_readme -l samples/ --config ../.cloud-repo-tools.json", - "lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ integration-test/ test/", - "prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js integration-test/*.js integration-test/*/*.js", + "lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ test/", + "prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js", "publish-module": "node ../../scripts/publish.js bigtable", - "system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts", - "preintegration-test": "git apply patches/patch-for-v4.patch || git apply patches/patch-for-v6-and-up.patch || true", - "integration-test": "repo-tools test run --cmd mocha -- integration-test/*.js --timeout 10000" + "presystem-test": "git apply patches/patch-for-v4.patch || git apply patches/patch-for-v6-and-up.patch || true", + "system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts" }, "dependencies": { "@google-cloud/common": "^0.15.1", @@ -81,7 +80,6 @@ "eslint-config-prettier": "^2.6.0", "eslint-plugin-node": "^5.2.0", "eslint-plugin-prettier": "^2.3.1", - "grpc": "^1.7.2", "ink-docstrap": "^1.3.0", "intelli-espower-loader": "^1.0.1", "jsdoc": "^3.5.5", diff --git a/integration-test/mutate-rows-acceptance-test.json b/system-test/mutate-rows-retry-test.json similarity index 100% rename from integration-test/mutate-rows-acceptance-test.json rename to system-test/mutate-rows-retry-test.json diff --git a/integration-test/mutate-rows.js b/system-test/mutate-rows.js similarity index 96% rename from integration-test/mutate-rows.js rename to system-test/mutate-rows.js index b5eb76bf0..f473680e0 100644 --- a/integration-test/mutate-rows.js +++ b/system-test/mutate-rows.js @@ -2,10 +2,10 @@ const Bigtable = require('../'); -const tests = require('./mutate-rows-acceptance-test.json').tests; +const tests = require('./mutate-rows-retry-test.json').tests; const assert = require('assert'); -const grpc = require('grpc'); +const grpc = require('@google-cloud/common-grpc').grpc; const sinon = require('sinon'); const through = require('through2'); From 5fa4aa6878ec68e47029d958223f38ca6cd886b8 Mon Sep 17 00:00:00 2001 From: Moshe Kolodny Date: Tue, 12 Dec 2017 13:42:11 -0500 Subject: [PATCH 3/5] Use implicit setTimeout for retries from retry-request. --- package.json | 3 +-- src/table.js | 6 +----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 42beea2d4..435bcbab3 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,7 @@ }, "dependencies": { "@google-cloud/common": "^0.15.1", - "@google-cloud/common-grpc": "^0.5.0", + "@google-cloud/common-grpc": "^0.5.1", "arrify": "^1.0.0", "concat-stream": "^1.5.0", "create-error-class": "^3.0.2", @@ -67,7 +67,6 @@ "node-int64": "^0.4.0", "prop-assign": "^1.0.0", "pumpify": "^1.3.3", - "retry-request": "^3.3.0", "safe-buffer": "^5.1.1", "string-format-obj": "^1.0.0", "through2": "^2.0.0" diff --git a/src/table.js b/src/table.js index 5eef1402f..a5cefce36 100644 --- a/src/table.js +++ b/src/table.js @@ -24,7 +24,6 @@ var flatten = require('lodash.flatten'); var is = require('is'); var propAssign = require('prop-assign'); var pumpify = require('pumpify'); -var retryRequest = require('retry-request'); var through = require('through2'); var util = require('util'); @@ -941,10 +940,7 @@ Table.prototype.mutate = function(entries, callback) { var onBatchResponse = error => { if (pendingEntryIndices.size !== 0 && requestsMade <= maxRetries) { - setTimeout( - makeNextRequestBatch, - retryRequest.getNextRetryDelay(requestsMade) - ); + makeNextRequestBatch(); return; } var err = error; From e622bcd19c0a03c9646a4ad9512e6284b299c4cc Mon Sep 17 00:00:00 2001 From: Moshe Kolodny Date: Tue, 12 Dec 2017 20:15:05 -0500 Subject: [PATCH 4/5] Add unit tests for retries --- test/table.js | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/test/table.js b/test/table.js index f3481200f..6b70b3121 100644 --- a/test/table.js +++ b/test/table.js @@ -1122,6 +1122,70 @@ describe('Bigtable/Table', function() { table.mutate(entries, done); }); }); + + describe('retries', function() { + var fakeStatuses; + var entryRequests; + + beforeEach(function() { + entryRequests = []; + fakeStatuses = [ + [ + { + index: 0, + status: { + code: 0, + }, + }, + { + index: 1, + status: { + code: 4, + }, + }, + ], + [ + { + index: 0, + status: { + code: 0, + }, + }, + ], + ]; + FakeGrpcService.decorateStatus_ = function() { + return {}; + }; + table.requestStream = function(_, reqOpts) { + entryRequests.push(reqOpts.entries); + var stream = new Stream({ + objectMode: true, + }); + + setImmediate(function() { + stream.emit('request'); + stream.end({entries: fakeStatuses.shift()}); + }); + + return stream; + }; + }); + + it('should succeed after a retry', function(done) { + table.maxRetries = 1; + table.mutate(entries, done); + }); + + it('should retry the same failed entry', function(done) { + table.maxRetries = 1; + table.mutate(entries, function() { + assert.strictEqual(entryRequests[0].length, 2); + assert.strictEqual(entryRequests[1].length, 1); + assert.strictEqual(entryRequests[0][1], entryRequests[1][0]); + done(); + }); + }); + }); }); describe('row', function() { From e4c03faf4009dfc247468bbaa98a8188fc080dfd Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Tue, 12 Dec 2017 21:44:26 -0500 Subject: [PATCH 5/5] Small code style / convention changes. --- src/table.js | 50 +++++++++++-------- .../{ => data}/mutate-rows-retry-test.json | 0 system-test/mutate-rows.js | 2 +- 3 files changed, 30 insertions(+), 22 deletions(-) rename system-test/{ => data}/mutate-rows-retry-test.json (100%) diff --git a/src/table.js b/src/table.js index a5cefce36..4b106f22d 100644 --- a/src/table.js +++ b/src/table.js @@ -34,7 +34,7 @@ var Row = require('./row.js'); // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) -const retryCodes = new Set([4, 10, 14]); +const RETRY_STATUS_CODES = new Set([4, 10, 14]); /** * Create a Table object to interact with a Cloud Bigtable table. @@ -929,48 +929,56 @@ Table.prototype.insert = function(entries, callback) { * }); */ Table.prototype.mutate = function(entries, callback) { + var self = this; + entries = flatten(arrify(entries)); - var requestsMade = 0; + var numRequestsMade = 0; - var maxRetries = typeof this.maxRetries === 'number' ? this.maxRetries : 3; + var maxRetries = is.number(this.maxRetries) ? this.maxRetries : 3; var pendingEntryIndices = new Set(entries.map((entry, index) => index)); var entryToIndex = new Map(entries.map((entry, index) => [entry, index])); var mutationErrorsByEntryIndex = new Map(); - var onBatchResponse = error => { - if (pendingEntryIndices.size !== 0 && requestsMade <= maxRetries) { - makeNextRequestBatch(); + function onBatchResponse(err) { + if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) { + makeNextBatchRequest(); return; } - var err = error; + if (mutationErrorsByEntryIndex.size !== 0) { var mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); err = new common.util.PartialFailureError({ errors: mutationErrors, }); } + callback(err); - }; + } - var makeNextRequestBatch = () => { + function makeNextBatchRequest() { var grpcOpts = { service: 'Bigtable', method: 'mutateRows', retryOpts: { - currentRetryAttempt: requestsMade, + currentRetryAttempt: numRequestsMade, }, }; - var entryBatch = entries.filter((entry, index) => - pendingEntryIndices.has(index) - ); + + var entryBatch = entries.filter((entry, index) => { + return pendingEntryIndices.has(index); + }); + var reqOpts = { objectMode: true, - tableName: this.id, + tableName: self.id, entries: entryBatch.map(Mutation.parse), }; - this.requestStream(grpcOpts, reqOpts) - .on('request', () => requestsMade++) + + self + .requestStream(grpcOpts, reqOpts) + .on('error', onBatchResponse) + .on('request', () => numRequestsMade++) .on('data', function(obj) { obj.entries.forEach(function(entry) { var originalEntry = entryBatch[entry.index]; @@ -983,20 +991,20 @@ Table.prototype.mutate = function(entries, callback) { return; } - if (!retryCodes.has(entry.status.code)) { + if (!RETRY_STATUS_CODES.has(entry.status.code)) { pendingEntryIndices.delete(originalEntriesIndex); } var status = commonGrpc.Service.decorateStatus_(entry.status); status.entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, status); }); }) - .on('end', onBatchResponse) - .on('error', onBatchResponse); - }; + .on('end', onBatchResponse); + } - makeNextRequestBatch(); + makeNextBatchRequest(); }; /** diff --git a/system-test/mutate-rows-retry-test.json b/system-test/data/mutate-rows-retry-test.json similarity index 100% rename from system-test/mutate-rows-retry-test.json rename to system-test/data/mutate-rows-retry-test.json diff --git a/system-test/mutate-rows.js b/system-test/mutate-rows.js index f473680e0..de5a891bf 100644 --- a/system-test/mutate-rows.js +++ b/system-test/mutate-rows.js @@ -2,7 +2,7 @@ const Bigtable = require('../'); -const tests = require('./mutate-rows-retry-test.json').tests; +const tests = require('./data/mutate-rows-retry-test.json').tests; const assert = require('assert'); const grpc = require('@google-cloud/common-grpc').grpc;