Skip to content

Commit

Permalink
Merge pull request #1 from voldern/rename-library
Browse files Browse the repository at this point in the history
Rename package to elasticsearch-writable-stream
  • Loading branch information
voldern authored Sep 13, 2016
2 parents 154a474 + 64236ca commit a436147
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 28 deletions.
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# elasticsearch-bulk-index-stream
# elasticsearch-writable-stream

A writable stream for bulk doing bulk actions, like indexing, in Elasticsearch.
A writable stream for doing operations in Elasticsearch with support
for bulk actions. Supports virtually all indexing operations including
index, update, update_by_query, and delete.

[![build status](https://travis-ci.org/voldern/elasticsearch-bulk-index-stream.svg)](https://travis-ci.org/voldern/elasticsearch-bulk-index-stream)
[![modules status](https://david-dm.org/voldern/elasticsearch-bulk-index-stream.svg)](https://david-dm.org/voldern/elasticsearch-bulk-index-stream)
This module used to be known as [elasticsearch-bulk-index-stream](https://www.npmjs.com/package/elasticsearch-bulk-index-stream),
but was renamed because the package has added support for non-bulk actions.

[![npm badge](https://nodei.co/npm/elasticsearch-bulk-index-stream.png?downloads=true)](https://nodei.co/npm/elasticsearch-bulk-index-stream)
[![build status](https://travis-ci.org/voldern/elasticsearch-writable-stream.svg)](https://travis-ci.org/voldern/elasticsearch-writable-stream)
[![modules status](https://david-dm.org/voldern/elasticsearch-writable-stream.svg)](https://david-dm.org/voldern/elasticsearch-writable-stream)

[![npm badge](https://nodei.co/npm/elasticsearch-writable-stream.png?downloads=true)](https://nodei.co/npm/elasticsearch-writable-stream)

# Usage

Expand All @@ -32,6 +37,9 @@ will be buffered before doing a bulk operation. The stream will also
write all buffered items if its is closed, before emitting the
`finish` event.

The `update_by_query` action bypasses the buffer and gets executed at
once since its not supported by the bulk API.

## Flushing

Its also possible to send in the option `flushTimeout` to indicate
Expand All @@ -49,9 +57,9 @@ sent in as `options.logger` to the constructor.
# Example

```javascript
var ElasticsearchBulkIndexStream = require('elasticsearch-bulk-index-stream');
var ElasticsearchWritableStream = require('elasticsearch-writable-stream');

var stream = new ElasticsearchBulkIndexStream(elasticsearchClient, {
var stream = new ElasticsearchWritableStream(elasticsearchClient, {
highWaterMark: 256,
flushTimeout: 500
});
Expand Down
14 changes: 7 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('util'),
FlushWritable = require('flushwritable'),
_ = require('lodash');

util.inherits(ElasticsearchBulkIndexWritable, FlushWritable);
util.inherits(ElasticsearchWritable, FlushWritable);

/**
* Transform records into a format required by Elasticsearch bulk API
Expand Down Expand Up @@ -73,7 +73,7 @@ function validateOperation(operation) {
* @param {number} [options.flushTimeout=null] Number of ms to flush records after, if highWaterMark hasn't been reached
* @param {Object} [options.logger] Instance of a logger like bunyan or winston
*/
function ElasticsearchBulkIndexWritable(client, options) {
function ElasticsearchWritable(client, options) {
assert(client, 'client is required');

options = options || {};
Expand All @@ -97,7 +97,7 @@ function ElasticsearchBulkIndexWritable(client, options) {
* @param {array} records
* @param {Function} callback
*/
ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records, callback) {
ElasticsearchWritable.prototype.bulkWrite = function bulkWrite(records, callback) {
this.client.bulk({ body: records }, function bulkCallback(err, data) {
if (err) {
err.records = records;
Expand Down Expand Up @@ -135,7 +135,7 @@ ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records,
* @param {object} operation
* @param {Function} callback
*/
ElasticsearchBulkIndexWritable.prototype.partialUpdate = function partialUpdate(operation, callback) {
ElasticsearchWritable.prototype.partialUpdate = function partialUpdate(operation, callback) {
if (this.logger) {
this.logger.debug('Executing update_by_query in Elasticsearch');
}
Expand Down Expand Up @@ -177,7 +177,7 @@ ElasticsearchBulkIndexWritable.prototype.partialUpdate = function partialUpdate(
* @param {Function} callback
* @return {undefined}
*/
ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) {
ElasticsearchWritable.prototype._flush = function _flush(callback) {
clearTimeout(this.flushTimeoutId);

if (this.queue.length === 0) {
Expand Down Expand Up @@ -221,7 +221,7 @@ ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) {
* @param {Function} callback
* @returns {undefined}
*/
ElasticsearchBulkIndexWritable.prototype._write = function _write(record, enc, callback) {
ElasticsearchWritable.prototype._write = function _write(record, enc, callback) {
if (this.logger) {
this.logger.debug('Adding to Elasticsearch queue', { record: record });
}
Expand Down Expand Up @@ -255,4 +255,4 @@ ElasticsearchBulkIndexWritable.prototype._write = function _write(record, enc, c
callback();
};

module.exports = ElasticsearchBulkIndexWritable;
module.exports = ElasticsearchWritable;
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticsearch-bulk-index-stream",
"version": "1.0.0",
"description": "A writable stream for bulk indexing records in Elasticsearch",
"name": "elasticsearch-writable-stream",
"version": "2.0.0",
"description": "A writable stream for doing operations in Elasticsearch",
"main": "index.js",
"scripts": {
"test": "eslint . && istanbul cover --print both _mocha test",
Expand All @@ -25,13 +25,14 @@
},
"repository": {
"type": "git",
"url": "git@github.com:voldern/elasticsearch-bulk-index-stream.git"
"url": "git@github.com:voldern/elasticsearch-writable-stream.git"
},
"keywords": [
"elasticsearch",
"stream",
"writable",
"write",
"bulk"
"bulk",
"update_by_query"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var _ = require('lodash'),
chai = require('chai'),
sinon = require('sinon'),
sinonChai = require('sinon-chai'),
ElasticsearchBulkIndexWritable = require('../');
ElasticsearchWritable = require('../');

chai.use(sinonChai);

Expand Down Expand Up @@ -38,7 +38,7 @@ function getMissingFieldTest(fieldName, testFixture) {
};
}

describe('ElastisearchBulkIndexWritable', function() {
describe('ElasticsearchWritable', function() {
beforeEach(function() {
this.sinon = sinon.sandbox.create();
});
Expand All @@ -50,20 +50,20 @@ describe('ElastisearchBulkIndexWritable', function() {
describe('constructor', function() {
it('should require client', function() {
expect(function() {
new ElasticsearchBulkIndexWritable();
new ElasticsearchWritable();
}).to.Throw(Error, 'client is required');
});

it('should default highWaterMark to 16', function() {
var stream = new ElasticsearchBulkIndexWritable({});
var stream = new ElasticsearchWritable({});

expect(stream.highWaterMark).to.eq(16);
});
});

describe('queue', function() {
beforeEach(function() {
this.stream = new ElasticsearchBulkIndexWritable({}, { highWaterMark: 10 });
this.stream = new ElasticsearchWritable({}, { highWaterMark: 10 });
});

it('should queue up number of items equal to highWaterMark', function(done) {
Expand Down Expand Up @@ -101,7 +101,7 @@ describe('ElastisearchBulkIndexWritable', function() {
bulk: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
this.stream = new ElasticsearchWritable(this.client, {
highWaterMark: 6
});
});
Expand Down Expand Up @@ -171,7 +171,7 @@ describe('ElastisearchBulkIndexWritable', function() {
bulk: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
this.stream = new ElasticsearchWritable(this.client, {
highWaterMark: 10,
flushTimeout: 1000
});
Expand Down Expand Up @@ -216,7 +216,7 @@ describe('ElastisearchBulkIndexWritable', function() {
bulk: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
this.stream = new ElasticsearchWritable(this.client, {
highWaterMark: 1,
flushTimeout: 10
});
Expand Down Expand Up @@ -256,7 +256,7 @@ describe('ElastisearchBulkIndexWritable', function() {
updateByQuery: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
this.stream = new ElasticsearchWritable(this.client, {
highWaterMark: 1,
flushTimeout: 10
});
Expand Down

0 comments on commit a436147

Please sign in to comment.