Skip to content

Commit

Permalink
[event_log] index event docs in bulk instead of individually (redo)
Browse files Browse the repository at this point in the history
resolves #55634
resolves #65746

Buffers event docs being written for a fixed interval / buffer size,
and indexes those docs via a bulk ES call.

Also now flushing those buffers at plugin stop() time, which
we couldn't do before with the single index calls, which were
run via `setImmediate()`.

This is a redo of PR #80941 which
had to be reverted.
  • Loading branch information
pmuellr committed Nov 20, 2020
1 parent ec983ef commit 4ac1078
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import { IClusterClientAdapter } from './cluster_client_adapter';
const createClusterClientMock = () => {
const mock: jest.Mocked<IClusterClientAdapter> = {
indexDocument: jest.fn(),
indexDocuments: jest.fn(),
doesIlmPolicyExist: jest.fn(),
createIlmPolicy: jest.fn(),
doesIndexTemplateExist: jest.fn(),
createIndexTemplate: jest.fn(),
doesAliasExist: jest.fn(),
createIndex: jest.fn(),
queryEventsBySavedObject: jest.fn(),
shutdown: jest.fn(),
};
return mock;
};
Expand Down
166 changes: 154 additions & 12 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { LegacyClusterClient, Logger } from 'src/core/server';
import { LegacyClusterClient } from 'src/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks';
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';
import {
ClusterClientAdapter,
IClusterClientAdapter,
EVENT_BUFFER_LENGTH,
} from './cluster_client_adapter';
import { contextMock } from './context.mock';
import { findOptionsSchema } from '../event_log_client';
import { delay } from '../lib/delay';
import { times } from 'lodash';

type EsClusterClient = Pick<jest.Mocked<LegacyClusterClient>, 'callAsInternalUser' | 'asScoped'>;
type MockedLogger = ReturnType<typeof loggingSystemMock['createLogger']>;

let logger: Logger;
let logger: MockedLogger;
let clusterClient: EsClusterClient;
let clusterClientAdapter: IClusterClientAdapter;

Expand All @@ -21,22 +29,130 @@ beforeEach(() => {
clusterClientAdapter = new ClusterClientAdapter({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
context: contextMock.create(),
});
});

describe('indexDocument', () => {
test('should call cluster client with given doc', async () => {
await clusterClientAdapter.indexDocument({ args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', {
args: true,
test('should call cluster client bulk with given doc', async () => {
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });

await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
});

expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
body: [{ create: { _index: 'event-log' } }, { message: 'foo' }],
});
});

test('should throw error when cluster client throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.indexDocument({ args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
test('should log an error when cluster client throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure'));
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
await retryUntil('cluster client bulk called', () => {
return logger.error.mock.calls.length !== 0;
});

const expectedMessage = `error writing bulk events: "expected failure"; docs: [{"create":{"_index":"event-log"}},{"message":"foo"}]`;
expect(logger.error).toHaveBeenCalledWith(expectedMessage);
});
});

describe('shutdown()', () => {
test('should work if no docs have been written', async () => {
const result = await clusterClientAdapter.shutdown();
expect(result).toBeFalsy();
});

test('should work if some docs have been written', async () => {
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
const resultPromise = clusterClientAdapter.shutdown();

await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
});

const result = await resultPromise;
expect(result).toBeFalsy();
});
});

describe('buffering documents', () => {
test('should write buffered docs after timeout', async () => {
// write EVENT_BUFFER_LENGTH - 1 docs
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
}

await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
});

const expectedBody = [];
for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) {
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
}

expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
body: expectedBody,
});
});

test('should write buffered docs after buffer exceeded', async () => {
// write EVENT_BUFFER_LENGTH + 1 docs
for (let i = 0; i < EVENT_BUFFER_LENGTH + 1; i++) {
clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' });
}

await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length >= 2;
});

const expectedBody = [];
for (let i = 0; i < EVENT_BUFFER_LENGTH; i++) {
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
}

expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', {
body: expectedBody,
});

expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', {
body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }],
});
});

test('should handle lots of docs correctly with a delay in the bulk index', async () => {
// @ts-ignore
clusterClient.callAsInternalUser.mockImplementation = async () => await delay(100);

const docs = times(EVENT_BUFFER_LENGTH * 10, (i) => ({
body: { message: `foo ${i}` },
index: 'event-log',
}));

// write EVENT_BUFFER_LENGTH * 10 docs
for (const doc of docs) {
clusterClientAdapter.indexDocument(doc);
}

await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length >= 10;
});

for (let i = 0; i < 10; i++) {
const expectedBody = [];
for (let j = 0; j < EVENT_BUFFER_LENGTH; j++) {
expectedBody.push(
{ create: { _index: 'event-log' } },
{ message: `foo ${i * EVENT_BUFFER_LENGTH + j}` }
);
}

expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(i + 1, 'bulk', {
body: expectedBody,
});
}
});
});

Expand Down Expand Up @@ -575,3 +691,29 @@ describe('queryEventsBySavedObject', () => {
`);
});
});

type RetryableFunction = () => boolean;

const RETRY_UNTIL_DEFAULT_COUNT = 20;
const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds

async function retryUntil(
label: string,
fn: RetryableFunction,
count: number = RETRY_UNTIL_DEFAULT_COUNT,
wait: number = RETRY_UNTIL_DEFAULT_WAIT
): Promise<boolean> {
while (count > 0) {
count--;

if (fn()) return true;

// eslint-disable-next-line no-console
console.log(`attempt failed waiting for "${label}", attempts left: ${count}`);

if (count === 0) return false;
await delay(wait);
}

return false;
}
72 changes: 68 additions & 4 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,31 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Subject } from 'rxjs';
import { bufferTime, filter, switchMap } from 'rxjs/operators';
import { reject, isUndefined } from 'lodash';
import { SearchResponse, Client } from 'elasticsearch';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger, LegacyClusterClient } from 'src/core/server';

import { IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
import { EsContext } from '.';
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
import { FindOptionsType } from '../event_log_client';

export const EVENT_BUFFER_TIME = 1000; // milliseconds
export const EVENT_BUFFER_LENGTH = 100;

export type EsClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;

export interface Doc {
index: string;
body: IEvent;
}

export interface ConstructorOpts {
logger: Logger;
clusterClientPromise: Promise<EsClusterClient>;
context: EsContext;
}

export interface QueryEventsBySavedObjectResult {
Expand All @@ -30,14 +41,67 @@ export interface QueryEventsBySavedObjectResult {
export class ClusterClientAdapter {
private readonly logger: Logger;
private readonly clusterClientPromise: Promise<EsClusterClient>;
private readonly docBuffer$: Subject<Doc>;
private readonly context: EsContext;
private readonly docsBufferedFlushed: Promise<void>;

constructor(opts: ConstructorOpts) {
this.logger = opts.logger;
this.clusterClientPromise = opts.clusterClientPromise;
this.context = opts.context;
this.docBuffer$ = new Subject<Doc>();

// buffer event log docs for time / buffer length, ignore empty
// buffers, then index the buffered docs; kick things off with a
// promise on the observable, which we'll wait on in shutdown
this.docsBufferedFlushed = this.docBuffer$
.pipe(
bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH),
filter((docs) => docs.length > 0),
switchMap(async (docs) => await this.indexDocuments(docs))
)
.toPromise();
}

public async indexDocument(doc: unknown): Promise<void> {
await this.callEs<ReturnType<Client['index']>>('index', doc);
// This will be called at plugin stop() time; the assumption is any plugins
// depending on the event_log will already be stopped, and so will not be
// writing more event docs. We complete the docBuffer$ observable,
// and wait for the docsBufffered$ observable to complete via it's promise,
// and so should end up writing all events out that pass through, before
// Kibana shuts down (cleanly).
public async shutdown(): Promise<void> {
this.docBuffer$.complete();
await this.docsBufferedFlushed;
}

public indexDocument(doc: Doc): void {
this.docBuffer$.next(doc);
}

async indexDocuments(docs: Doc[]): Promise<void> {
// If es initialization failed, don't try to index.
// Also, don't log here, we log the failure case in plugin startup
// instead, otherwise we'd be spamming the log (if done here)
if (!(await this.context.waitTillReady())) {
return;
}

const bulkBody: Array<Record<string, unknown>> = [];

for (const doc of docs) {
if (doc.body === undefined) continue;

bulkBody.push({ create: { _index: doc.index } });
bulkBody.push(doc.body);
}

try {
await this.callEs<ReturnType<Client['bulk']>>('bulk', { body: bulkBody });
} catch (err) {
this.logger.error(
`error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}`
);
}
}

public async doesIlmPolicyExist(policyName: string): Promise<boolean> {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/event_log/server/es/context.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const createContextMock = () => {
logger: loggingSystemMock.createLogger(),
esNames: namesMock.create(),
initialize: jest.fn(),
shutdown: jest.fn(),
waitTillReady: jest.fn(async () => true),
esAdapter: clusterClientAdapterMock.create(),
initialized: true,
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/event_log/server/es/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface EsContext {
esNames: EsNames;
esAdapter: IClusterClientAdapter;
initialize(): void;
shutdown(): Promise<void>;
waitTillReady(): Promise<boolean>;
initialized: boolean;
}
Expand Down Expand Up @@ -52,6 +53,7 @@ class EsContextImpl implements EsContext {
this.esAdapter = new ClusterClientAdapter({
logger: params.logger,
clusterClientPromise: params.clusterClientPromise,
context: this,
});
}

Expand All @@ -74,6 +76,10 @@ class EsContextImpl implements EsContext {
});
}

async shutdown() {
await this.esAdapter.shutdown();
}

// waits till the ES initialization is done, returns true if it was successful,
// false if it was not successful
async waitTillReady(): Promise<boolean> {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/event_log/server/event_logger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ describe('EventLogger', () => {
eventLogger.logEvent({});
await waitForLogEvent(systemLogger);
delay(WRITE_LOG_WAIT_MILLIS); // sleep a bit longer since event logging is async
expect(esContext.esAdapter.indexDocument).not.toHaveBeenCalled();
expect(esContext.esAdapter.indexDocument).toHaveBeenCalled();
expect(esContext.esAdapter.indexDocuments).not.toHaveBeenCalled();
});

test('method logEvent() writes expected default values', async () => {
Expand Down
Loading

0 comments on commit 4ac1078

Please sign in to comment.