diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index ed0e5d41a..4055ae5ce 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -82,14 +82,14 @@ export interface BulkStats { aborted: boolean; } -interface IndexAction { +interface IndexActionOperation { index: { _index: string; [key: string]: any; }; } -interface CreateAction { +interface CreateActionOperation { create: { _index: string; [key: string]: any; @@ -110,6 +110,8 @@ interface DeleteAction { }; } +type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]; +type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]; type UpdateAction = [UpdateActionOperation, Record]; type Action = IndexAction | CreateAction | UpdateAction | DeleteAction; type Omit = Pick>; diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index c03ae5532..ff5b14d92 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -684,6 +684,56 @@ test('bulk index', (t) => { }); }); + t.test('Should use payload returned by `onDocument`', async (t) => { + let count = 0; + const updatedAt = '1970-01-01T12:00:00.000Z'; + const MockConnection = connection.buildMockConnection({ + onRequest(params) { + t.equal(params.path, '/_bulk'); + t.match(params.headers, { + 'content-type': 'application/x-ndjson', + }); + const [action, payload] = params.body.split('\n'); + t.same(JSON.parse(action), { index: { _index: 'test' } }); + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }); + return { body: { errors: false, items: [{}] } }; + }, + }); + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + }); + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument(doc) { + return [ + { + index: { + _index: 'test', + }, + }, + { ...doc, updatedAt }, + ]; + }, + onDrop(doc) { + t.fail('This should never be called'); + }, + }); + + t.type(result.time, 'number'); + t.type(result.bytes, 'number'); + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false, + }); + }); + t.end(); }); @@ -844,6 +894,57 @@ test('bulk create', (t) => { aborted: false, }); }); + + t.test('Should perform a bulk request', async (t) => { + let count = 0; + const updatedAt = '1970-01-01T12:00:00.000Z'; + const MockConnection = connection.buildMockConnection({ + onRequest(params) { + t.equal(params.path, '/_bulk'); + t.match(params.headers, { 'content-type': 'application/x-ndjson' }); + const [action, payload] = params.body.split('\n'); + t.same(JSON.parse(action), { create: { _index: 'test', _id: count } }); + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }); + return { body: { errors: false, items: [{}] } }; + }, + }); + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + }); + let id = 0; + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument(doc) { + return [ + { + create: { + _index: 'test', + _id: id++, + }, + }, + { ...doc, updatedAt }, + ]; + }, + onDrop(doc) { + t.fail('This should never be called'); + }, + }); + + t.type(result.time, 'number'); + t.type(result.bytes, 'number'); + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false, + }); + }); + t.end(); });