Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add smart retries to table.mutate #17

Merged
merged 5 commits into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ unit_tests: &unit_tests
- run:
name: Run unit tests.
command: npm test
- run:
name: Run integration tests.
command: npm run integration-test
- run:
name: Submit coverage data to codecov.
command: node_modules/.bin/codecov
Expand Down Expand Up @@ -99,6 +102,9 @@ jobs:
- run:
name: Run unit tests.
command: npm test
- run:
name: Run integration tests.
command: npm run integration-test
- run:
name: Submit coverage data to codecov.
command: node_modules/.bin/codecov
Expand Down
3 changes: 3 additions & 0 deletions integration-test/.eslintrc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
env:
mocha: true
117 changes: 117 additions & 0 deletions integration-test/mutate-rows-acceptance-test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"tests": [
{
"name": "valid mutation",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 0, 0 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ]
]
}, {
"name": "retries the failed mutations",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 4 ] },
{ "code": 200, "entry_codes": [ 4, 0 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 0 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar", "baz" ],
[ "bar" ],
[ "bar" ]
]
}, {
"name": "has a `PartialFailureError` error when an entry fails after the retries",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 0 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar" ],
[ "bar" ],
[ "bar" ]
],
"errors": [
{ "index_in_mutations_request": 1 }
]
}, {
"name": "does not retry unretryable mutations",
"max_retries": 5,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} },
{ "method": "insert", "key": "qux", "data": {} },
{ "method": "insert", "key": "quux", "data": {} },
{ "method": "insert", "key": "corge", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] },
{ "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] },
{ "code": 200, "entry_codes": [ 1, 4, 4, 0 ] },
{ "code": 200, "entry_codes": [ 0, 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 1 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz", "qux", "quux", "corge" ],
[ "foo", "bar", "baz", "qux", "quux" ],
[ "foo", "bar", "baz", "qux" ],
[ "bar", "baz" ],
[ "baz" ],
[ "baz" ]
],
"errors": [
{ "index_in_mutations_request": 0 },
{ "index_in_mutations_request": 2 },
{ "index_in_mutations_request": 5 }
]
}, {
"name": "considers network errors towards the retry count",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 0 ] },
{ "code": 429 },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar" ],
[ "bar" ],
[ "bar" ]
],
"errors": [
{ "index_in_mutations_request": 1 }
]
}
]
}
136 changes: 136 additions & 0 deletions integration-test/mutate-rows.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
'use strict';

const Bigtable = require('../');

const tests = require('./mutate-rows-acceptance-test.json').tests;

const assert = require('assert');
const grpc = require('grpc');
const sinon = require('sinon');
const through = require('through2');

function dispatch(emitter, response) {
const emits = [];
emits.push({name: 'response', arg: {code: response.code}});
if (response.entry_codes) {
emits.push({name: 'data', arg: entryResponses(response.entry_codes)});
}
emits.push({name: 'end'});
let index = 0;
setImmediate(next);

function next() {
if (index < emits.length) {
const emit = emits[index];
index++;
emitter.emit(emit.name, emit.arg);
setImmediate(next);
}
}
}

function entryResponses(statusCodes) {
return {
entries: statusCodes.map((code, index) => ({
index,
status: {code},
})),
};
}

function getDeltas(array) {
return array.reduce((acc, item, index) => {
return index ? acc.concat(item - array[index - 1]) : [item];
}, []);
}

describe('Bigtable/Table', () => {
const bigtable = new Bigtable();
bigtable.grpcCredentials = grpc.credentials.createInsecure();
const bigtableService = bigtable.getService_({service: 'Bigtable'});

const INSTANCE = bigtable.instance('instance');
const TABLE = INSTANCE.table('table');

describe('mutate()', () => {
let clock;
let mutationBatchesInvoked;
let mutationCallTimes;
let responses;
let stub;

beforeEach(() => {
clock = sinon.useFakeTimers({
toFake: [
'setTimeout',
'clearTimeout',
'setImmediate',
'clearImmediate',
'setInterval',
'clearInterval',
'Date',
'nextTick',
],
});
mutationBatchesInvoked = [];
mutationCallTimes = [];
responses = null;
stub = sinon.stub(bigtableService, 'mutateRows').callsFake(grpcOpts => {
mutationBatchesInvoked.push(
grpcOpts.entries.map(entry => entry.rowKey.asciiSlice())
);
mutationCallTimes.push(new Date().getTime());
const emitter = through.obj();
dispatch(emitter, responses.shift());
return emitter;
});
});

afterEach(() => {
clock.uninstall();
stub.restore();
});

tests.forEach(test => {
it(test.name, done => {
responses = test.responses;
TABLE.maxRetries = test.max_retries;
TABLE.mutate(test.mutations_request, error => {
assert.deepEqual(
mutationBatchesInvoked,
test.mutation_batches_invoked
);
getDeltas(mutationCallTimes).forEach((delta, index) => {
if (index === 0) {
const message = 'First request should happen Immediately';
assert.strictEqual(index, 0, message);
return;
}
const minBackoff = 1000 * Math.pow(2, index);

// Adjust for some flakiness with the fake timers.
const maxBackoff = minBackoff + 1010;
const message =
`Backoff for retry #${index} should be between ` +
`${minBackoff} and ${maxBackoff}, was ${delta}`;
assert(delta > minBackoff, message);
assert(delta < maxBackoff, message);
});
if (test.errors) {
const expectedIndices = test.errors.map(error => {
return error.index_in_mutations_request;
});
const actualIndices = error.errors.map(error => {
return test.mutations_request.indexOf(error.entry);
});
assert.deepEqual(expectedIndices, actualIndices);
} else {
assert.ifError(error);
}
done();
});
clock.runAll();
});
});
});
});
12 changes: 8 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@
"test-no-cover": "repo-tools test run --cmd mocha -- test/*.js --no-timeouts",
"test": "repo-tools test run --cmd npm -- run cover",
"generate-scaffolding": "repo-tools generate all && repo-tools generate lib_samples_readme -l samples/ --config ../.cloud-repo-tools.json",
"lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ test/",
"prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js",
"lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ integration-test/ test/",
"prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js integration-test/*.js integration-test/*/*.js",
"publish-module": "node ../../scripts/publish.js bigtable",
"system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts"
"system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts",
"preintegration-test": "git apply patches/patch-for-v4.patch || git apply patches/patch-for-v6-and-up.patch || true",
"integration-test": "repo-tools test run --cmd mocha -- integration-test/*.js --timeout 10000"
},
"dependencies": {
"@google-cloud/common": "^0.15.1",
"@google-cloud/common-grpc": "^0.4.0",
"@google-cloud/common-grpc": "^0.5.0",
"arrify": "^1.0.0",
"concat-stream": "^1.5.0",
"create-error-class": "^3.0.2",
Expand All @@ -66,6 +68,7 @@
"node-int64": "^0.4.0",
"prop-assign": "^1.0.0",
"pumpify": "^1.3.3",
"retry-request": "^3.3.0",
"safe-buffer": "^5.1.1",
"string-format-obj": "^1.0.0",
"through2": "^2.0.0"
Expand All @@ -78,6 +81,7 @@
"eslint-config-prettier": "^2.6.0",
"eslint-plugin-node": "^5.2.0",
"eslint-plugin-prettier": "^2.3.1",
"grpc": "^1.7.2",

This comment was marked as spam.

This comment was marked as spam.

"ink-docstrap": "^1.3.0",
"intelli-espower-loader": "^1.0.1",
"jsdoc": "^3.5.5",
Expand Down
13 changes: 13 additions & 0 deletions patches/patch-for-v4.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--- a/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js
+++ b/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js
@@ -5,7 +5,9 @@ if (!process.version ||
process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) {
module.exports = nextTick;
} else {
- module.exports = process.nextTick;
+ module.exports = function() {
+ return process.nextTick.apply(this, arguments);
+ };
}

function nextTick(fn, arg1, arg2, arg3) {
13 changes: 13 additions & 0 deletions patches/patch-for-v6-and-up.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--- a/node_modules/process-nextick-args/index.js
+++ b/node_modules/process-nextick-args/index.js
@@ -5,7 +5,9 @@ if (!process.version ||
process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) {
module.exports = nextTick;
} else {
- module.exports = process.nextTick;
+ module.exports = function() {
+ return process.nextTick.apply(this, arguments);
+ };
}

function nextTick(fn, arg1, arg2, arg3) {
Loading