Skip to content

Commit

Permalink
Add IndexWriter indexManyNow method
Browse files Browse the repository at this point in the history
  • Loading branch information
marshallmain committed Jun 15, 2021
1 parent ada916f commit 897b4a7
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { estypes } from '@elastic/elasticsearch';
import util from 'util';
import { Logger, ElasticsearchClient } from 'src/core/server';
import { BufferedStream } from './utils/buffered_stream';
Expand Down Expand Up @@ -40,7 +41,9 @@ export class IndexWriter {
this.logger = params.logger.get('IndexWriter');

this.buffer = new BufferedStream<BufferItem>({
flush: (items) => this.bulkIndex(items),
flush: async (items) => {
this.bulkIndex(items);
},
});
}

Expand All @@ -60,11 +63,16 @@ export class IndexWriter {
}
}

public async indexManyNow(docs: Document[]): Promise<estypes.BulkResponse | undefined> {
const bufferItems = docs.map((doc) => ({ index: this.indexName, doc }));
return this.bulkIndex(bufferItems);
}

public async close(): Promise<void> {
await this.buffer.closeAndWaitUntilFlushed();
}

private async bulkIndex(items: BufferItem[]): Promise<void> {
private async bulkIndex(items: BufferItem[]): Promise<estypes.BulkResponse | undefined> {
this.logger.debug(`Indexing ${items.length} documents`);

const bulkBody: Array<Record<string, unknown>> = [];
Expand All @@ -85,10 +93,12 @@ export class IndexWriter {
error.stack += '\n' + util.inspect(response.body.items, { depth: null });
this.logger.error(error);
}
return response.body;
} catch (e) {
this.logger.error(
`error writing bulk events: "${e.message}"; docs: ${JSON.stringify(bulkBody)}`
);
return undefined;
}
}
}

0 comments on commit 897b4a7

Please sign in to comment.