Skip to content

Commit

Permalink
Merge pull request #13 from marshall007/update-by-query
Browse files Browse the repository at this point in the history
WIP: `_update_by_query` support
  • Loading branch information
voldern authored Sep 11, 2016
2 parents 5ff422f + b4188b2 commit 154a474
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 26 deletions.
83 changes: 77 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ util.inherits(ElasticsearchBulkIndexWritable, FlushWritable);
*/
function transformRecords(records) {
return records.reduce(function(bulkOperations, record) {
assert(record.index, 'index is required');
assert(record.type, 'type is required');

record.action = record.action || 'index';

var operation = {};

operation[record.action] = {
Expand All @@ -36,14 +31,38 @@ function transformRecords(records) {
bulkOperations.push(operation);

if (record.action !== 'delete') {
assert(record.body, 'body is required');
bulkOperations.push(record.body);
}

return bulkOperations;
}, []);
}

/**
* Validate incoming operations, ensuring they have all relevant fields defined.
*
* @private
* @param {object} operation
* @return {object}
*/
function validateOperation(operation) {
assert(operation.index, 'index is required');
assert(operation.type, 'type is required');

operation.action = operation.action || 'index';

if (operation.action !== 'delete') {
assert(operation.body, 'body is required');

if (operation.action === 'update_by_query') {
assert(operation.body.script, 'body.script is required');
assert(operation.body.query, 'body.query is required');
}
}

return operation;
}

/**
* A simple wrapper around Elasticsearch for bulk writing items
*
Expand Down Expand Up @@ -109,6 +128,48 @@ ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records,
}.bind(this));
};

/**
* Bulk update records in Elasticsearch using UpdateByQuery
*
* @private
* @param {object} operation
* @param {Function} callback
*/
ElasticsearchBulkIndexWritable.prototype.partialUpdate = function partialUpdate(operation, callback) {
if (this.logger) {
this.logger.debug('Executing update_by_query in Elasticsearch');
}

var op = _.cloneDeep(operation);
delete op.action;

this.client.updateByQuery(op, function bulkCallback(err, data) {
if (err) {
err.operation = operation;
return callback(err);
}

if (data.failures.length !== 0) {
if (this.logger) {
data.failures.forEach(this.logger.error.bind(this.logger));
}

var error = new Error('One or more failures occurred during update_by_query.');
error.operation = operation;

return callback(error);
}

if (this.logger) {
this.logger.info('Updated %d records (via update_by_query) in Elasticsearch', data.updated);
}

this.writtenRecords += data.updated;

callback(null, data);
}.bind(this));
};

/**
* Flush method needed by the underlying stream implementation
*
Expand Down Expand Up @@ -165,6 +226,16 @@ ElasticsearchBulkIndexWritable.prototype._write = function _write(record, enc, c
this.logger.debug('Adding to Elasticsearch queue', { record: record });
}

try {
validateOperation(record);
} catch (error) {
return callback(error);
}

if (record.action === 'update_by_query') {
return this.partialUpdate(record, callback);
}

this.queue.push(record);

if (this.queue.length >= this.highWaterMark) {
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"license": "MIT",
"devDependencies": {
"chai": "^3.4.0",
"clone": "^1.0.2",
"eslint": "^1.7.3",
"eslint-config-vgno": "^4.0.1",
"istanbul": "^0.4.0",
Expand Down
70 changes: 51 additions & 19 deletions test/elasticsearch-bulk-stream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict';

var chai = require('chai'),
var _ = require('lodash'),
chai = require('chai'),
sinon = require('sinon'),
sinonChai = require('sinon-chai'),
clone = require('clone'),
ElasticsearchBulkIndexWritable = require('../');

chai.use(sinonChai);
Expand All @@ -14,12 +14,30 @@ var recordFixture = require('./fixture/record.json');
var recordDeleteFixture = require('./fixture/record-delete.json');
var recordParentFixture = require('./fixture/record-parent.json');
var recordUpdateFixture = require('./fixture/record-update.json');
var recordUpdateByQueryFixture = require('./fixture/record-update-by-query.json');
var successResponseFixture = require('./fixture/success-response.json');
var successDeleteResponseFixture = require('./fixture/success-delete-response.json');
var successParentResponseFixture = require('./fixture/success-parent-response.json');
var successUpdateResponseFixture = require('./fixture/success-update-response.json');
var successUpdateByQueryResponseFixture = require('./fixture/success-update-by-query-response.json');
var errorResponseFixture = require('./fixture/error-response.json');

function getMissingFieldTest(fieldName, testFixture) {
return function(done) {
this.stream.on('error', function(error) {
expect(error).to.be.instanceOf(Error);
expect(error.message).to.eq(fieldName + ' is required');

done();
});

var fixture = _.cloneDeep(testFixture || recordFixture);
_.unset(fixture, fieldName);

this.stream.end(fixture);
};
}

describe('ElastisearchBulkIndexWritable', function() {
beforeEach(function() {
this.sinon = sinon.sandbox.create();
Expand Down Expand Up @@ -78,22 +96,6 @@ describe('ElastisearchBulkIndexWritable', function() {
});

describe('flushing', function() {
function getMissingFieldTest(fieldName) {
return function(done) {
this.stream.on('error', function(error) {
expect(error).to.be.instanceOf(Error);
expect(error.message).to.eq(fieldName + ' is required');

done();
});

var fixture = clone(recordFixture);
delete fixture[fieldName];

this.stream.end(fixture);
};
}

beforeEach(function() {
this.client = {
bulk: this.sinon.stub()
Expand Down Expand Up @@ -250,7 +252,8 @@ describe('ElastisearchBulkIndexWritable', function() {
describe('custom action', function() {
beforeEach(function() {
this.client = {
bulk: this.sinon.stub()
bulk: this.sinon.stub(),
updateByQuery: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
Expand Down Expand Up @@ -303,5 +306,34 @@ describe('ElastisearchBulkIndexWritable', function() {
expect(this.client.bulk).to.have.callCount(1);
expect(this.client.bulk).to.have.been.calledWith(expectedArgument);
});

it('should support update_by_query', function() {
this.client.updateByQuery.yields(null, successUpdateByQueryResponseFixture);
this.stream.write(recordUpdateByQueryFixture);

var expectedArgument = {
index: 'indexName',
type: 'recordType',
body: {
script: {
inline: 'ctx._source.counter++'
},
query: {
term: {
foo: 'bar'
}
}
}
};

expect(this.client.updateByQuery).to.have.callCount(1);
expect(this.client.updateByQuery).to.have.been.calledWith(expectedArgument);
});

it('should throw error on body.script missing in update_by_query record',
getMissingFieldTest('body.script', recordUpdateByQueryFixture));

it('should throw error on body.query missing in update_by_query record',
getMissingFieldTest('body.query', recordUpdateByQueryFixture));
});
});
15 changes: 15 additions & 0 deletions test/fixture/record-update-by-query.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"index": "indexName",
"type": "recordType",
"action": "update_by_query",
"body": {
"script": {
"inline": "ctx._source.counter++"
},
"query": {
"term": {
"foo": "bar"
}
}
}
}
14 changes: 14 additions & 0 deletions test/fixture/success-update-by-query-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"took": 30,
"timed_out": false,
"updated": 120,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": 0,
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
"total": 120,
"failures" : [ ]
}

0 comments on commit 154a474

Please sign in to comment.