Skip to content

Commit

Permalink
[Search] Add cancel function to pollSearch (#85787) (#88414)
Browse files Browse the repository at this point in the history
* Add cancel functionality to pollSearch
Makes sure that DELETE requests are properly sent even if consumer unsubscribes.

* Update poll_search.test.ts

* cancel on abort

* fix jest

* ADded jest test

* Cancel to be called once

* Only cancel internally on abort

* ts + addd defer

* ts

* make cancel code prettier

* Cancel even after unsubscribe

* Throw abort error if already aborted

* Only delete once

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

Co-authored-by: Liza Katz <lizka.k@gmail.com>
  • Loading branch information
lukasolson and lizozom authored Jan 15, 2021
1 parent eba5539 commit 83de2af
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 32 deletions.
119 changes: 119 additions & 0 deletions x-pack/plugins/data_enhanced/common/search/poll_search.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { pollSearch } from './poll_search';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';

describe('pollSearch', () => {
function getMockedSearch$(resolveOnI = 1, finishWithError = false) {
let counter = 0;
return jest.fn().mockImplementation(() => {
counter++;
const lastCall = counter === resolveOnI;
return new Promise((resolve) => {
if (lastCall) {
resolve({
isRunning: false,
isPartial: finishWithError,
});
} else {
resolve({
isRunning: true,
isPartial: true,
});
}
});
});
}

test('Defers execution', async () => {
const searchFn = getMockedSearch$(1);
const cancelFn = jest.fn();
pollSearch(searchFn, cancelFn);
expect(searchFn).toBeCalledTimes(0);
expect(cancelFn).toBeCalledTimes(0);
});

test('Resolves immediatelly', async () => {
const searchFn = getMockedSearch$(1);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Resolves when complete', async () => {
const searchFn = getMockedSearch$(3);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(3);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws Error on ES error response', async () => {
const searchFn = getMockedSearch$(2, true);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(Error);
expect(searchFn).toBeCalledTimes(2);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError on empty response', async () => {
const searchFn = jest.fn().mockResolvedValue(undefined);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(AbortError);
expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError and cancels on abort', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const abortController = new AbortController();
const poll = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).toPromise();

await new Promise((resolve) => setTimeout(resolve, 500));
abortController.abort();

await expect(poll).rejects.toThrow(AbortError);

await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});

test("Stops, but doesn't cancel on unsubscribe", async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {});

await new Promise((resolve) => setTimeout(resolve, 500));
subscription.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Calls cancel even when consumer unsubscribes', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const abortController = new AbortController();
const subscription = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).subscribe(() => {});
subscription.unsubscribe();
abortController.abort();

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});
});
46 changes: 30 additions & 16 deletions x-pack/plugins/data_enhanced/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,42 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { from, NEVER, Observable, timer } from 'rxjs';
import { expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import { from, Observable, timer, defer, fromEvent, EMPTY } from 'rxjs';
import { expand, map, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import type { IKibanaSearchResponse } from '../../../../../src/plugins/data/common';
import { isErrorResponse, isPartialResponse } from '../../../../../src/plugins/data/common';
import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/common';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
import type { IAsyncSearchOptions } from './types';

export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
cancel?: () => void,
{ pollInterval = 1000, abortSignal }: IAsyncSearchOptions = {}
): Observable<Response> => {
const aborted = options?.abortSignal
? abortSignalToPromise(options?.abortSignal)
: { promise: NEVER, cleanup: () => {} };
return defer(() => {
if (abortSignal?.aborted) {
throw new AbortError();
}

return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) throw new AbortError();
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(from(aborted.promise)),
finalize(aborted.cleanup)
);
if (cancel) {
abortSignal?.addEventListener('abort', cancel, { once: true });
}

const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
map(() => {
throw new AbortError();
})
);

return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) {
throw response ? new Error('Received partial response') : new AbortError();
}
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(aborted$)
);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe('EnhancedSearchInterceptor', () => {
await timeTravel(10);

expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
});

test('should abort on user abort', async () => {
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should not DELETE a running async search on async timeout prior to first response', async () => {
Expand Down Expand Up @@ -326,7 +326,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should DELETE a running async search on async timeout on error from fetch', async () => {
Expand All @@ -343,8 +343,6 @@ describe('EnhancedSearchInterceptor', () => {
time: 10,
value: {
error: 'oh no',
isPartial: false,
isRunning: false,
id: 1,
},
isError: true,
Expand All @@ -368,7 +366,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBe(responses[1].value);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should NOT DELETE a running SAVED async search on abort', async () => {
Expand Down
12 changes: 8 additions & 4 deletions x-pack/plugins/data_enhanced/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { once } from 'lodash';
import { throwError, Subscription } from 'rxjs';
import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators';
import {
Expand All @@ -14,7 +15,6 @@ import {
IKibanaSearchRequest,
SearchSessionState,
} from '../../../../../src/plugins/data/public';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';

export class EnhancedSearchInterceptor extends SearchInterceptor {
Expand Down Expand Up @@ -88,10 +88,14 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
isSavedToBackground = true;
});

return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe(
const cancel = once(() => {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
});

return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
tap((response) => (id = response.id)),
catchError((e: AbortError) => {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
catchError((e: Error) => {
cancel();
return throwError(this.handleSearchError(e, timeoutSignal, options));
}),
finalize(() => {
Expand Down
16 changes: 13 additions & 3 deletions x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { tap } from 'rxjs/operators';
import type { Logger } from 'kibana/server';
import type { IScopedClusterClient, Logger } from 'kibana/server';
import type { ISearchStrategy } from '../../../../../src/plugins/data/server';
import type {
EqlSearchStrategyRequest,
Expand All @@ -21,10 +21,14 @@ import { EqlSearchResponse } from './types';
export const eqlSearchStrategyProvider = (
logger: Logger
): ISearchStrategy<EqlSearchStrategyRequest, EqlSearchStrategyResponse> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
await esClient.asCurrentUser.asyncSearch.delete({ id });
}

return {
cancel: async (id, options, { esClient }) => {
logger.debug(`_eql/delete ${id}`);
await esClient.asCurrentUser.eql.delete({ id });
await cancelAsyncSearch(id, esClient);
},

search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => {
Expand Down Expand Up @@ -54,7 +58,13 @@ export const eqlSearchStrategyProvider = (
return toEqlKibanaSearchResponse(response);
};

return pollSearch(search, options).pipe(tap((response) => (id = response.id)));
const cancel = async () => {
if (id) {
await cancelAsyncSearch(id, esClient);
}
};

return pollSearch(search, cancel, options).pipe(tap((response) => (id = response.id)));
},

extend: async (id, keepAlive, options, { esClient }) => {
Expand Down
16 changes: 13 additions & 3 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import type { Observable } from 'rxjs';
import type { Logger, SharedGlobalConfig } from 'kibana/server';
import type { IScopedClusterClient, Logger, SharedGlobalConfig } from 'kibana/server';
import { first, tap } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { from } from 'rxjs';
Expand Down Expand Up @@ -40,6 +40,10 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage
): ISearchStrategy<IEsSearchRequest> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
await esClient.asCurrentUser.asyncSearch.delete({ id });
}

function asyncSearch(
{ id, ...request }: IEsSearchRequest,
options: IAsyncSearchOptions,
Expand All @@ -58,7 +62,13 @@ export const enhancedEsSearchStrategyProvider = (
return toAsyncKibanaSearchResponse(body);
};

return pollSearch(search, options).pipe(
const cancel = async () => {
if (id) {
await cancelAsyncSearch(id, esClient);
}
};

return pollSearch(search, cancel, options).pipe(
tap((response) => (id = response.id)),
tap(searchUsageObserver(logger, usage))
);
Expand Down Expand Up @@ -109,7 +119,7 @@ export const enhancedEsSearchStrategyProvider = (
},
cancel: async (id, options, { esClient }) => {
logger.debug(`cancel ${id}`);
await esClient.asCurrentUser.asyncSearch.delete({ id });
await cancelAsyncSearch(id, esClient);
},
extend: async (id, keepAlive, options, { esClient }) => {
logger.debug(`extend ${id} by ${keepAlive}`);
Expand Down

0 comments on commit 83de2af

Please sign in to comment.