From 897b4a76f559c19e28c4df85b2def3f6092af527 Mon Sep 17 00:00:00 2001 From: Marshall Main Date: Tue, 15 Jun 2021 00:47:47 -0700 Subject: [PATCH] Add IndexWriter indexManyNow method --- .../server/event_log/elasticsearch/index_writer.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts index 7f83421ec80d85b..04b7129d4c7c7f3 100644 --- a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts @@ -6,6 +6,7 @@ */ import type { PublicMethodsOf } from '@kbn/utility-types'; +import { estypes } from '@elastic/elasticsearch'; import util from 'util'; import { Logger, ElasticsearchClient } from 'src/core/server'; import { BufferedStream } from './utils/buffered_stream'; @@ -40,7 +41,9 @@ export class IndexWriter { this.logger = params.logger.get('IndexWriter'); this.buffer = new BufferedStream({ - flush: (items) => this.bulkIndex(items), + flush: async (items) => { + this.bulkIndex(items); + }, }); } @@ -60,11 +63,16 @@ export class IndexWriter { } } + public async indexManyNow(docs: Document[]): Promise { + const bufferItems = docs.map((doc) => ({ index: this.indexName, doc })); + return this.bulkIndex(bufferItems); + } + public async close(): Promise { await this.buffer.closeAndWaitUntilFlushed(); } - private async bulkIndex(items: BufferItem[]): Promise { + private async bulkIndex(items: BufferItem[]): Promise { this.logger.debug(`Indexing ${items.length} documents`); const bulkBody: Array> = []; @@ -85,10 +93,12 @@ export class IndexWriter { error.stack += '\n' + util.inspect(response.body.items, { depth: null }); this.logger.error(error); } + return response.body; } catch (e) { this.logger.error( `error writing bulk events: "${e.message}"; docs: ${JSON.stringify(bulkBody)}` ); + return undefined; } } }