Skip to content

Commit

Permalink
Upgrade RxJS to 7 (#129087)
Browse files Browse the repository at this point in the history
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
afharo and kibanamachine authored Apr 12, 2022
1 parent 5ee7c3d commit 9d5aca5
Show file tree
Hide file tree
Showing 260 changed files with 47,613 additions and 52,731 deletions.
7 changes: 4 additions & 3 deletions examples/search_examples/public/search/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
EuiTabbedContentTab,
} from '@elastic/eui';

import { lastValueFrom } from 'rxjs';
import { CoreStart } from '../../../../src/core/public';
import { mountReactNode } from '../../../../src/core/public/utils';
import { NavigationPublicPluginStart } from '../../../../src/plugins/navigation/public';
Expand Down Expand Up @@ -306,9 +307,9 @@ export const SearchExamplesApp = ({
const abortController = new AbortController();
setAbortController(abortController);
setIsLoading(true);
const { rawResponse: res } = await searchSource
.fetch$({ abortSignal: abortController.signal })
.toPromise();
const { rawResponse: res } = await lastValueFrom(
searchSource.fetch$({ abortSignal: abortController.signal })
);
setRawResponse(res);

const message = <EuiText>Searched {res.hits.total} documents.</EuiText>;
Expand Down
9 changes: 4 additions & 5 deletions examples/search_examples/public/search_sessions/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
EuiTitle,
} from '@elastic/eui';
import { catchError, map, tap } from 'rxjs/operators';
import { of } from 'rxjs';
import { lastValueFrom, of } from 'rxjs';

import { CoreStart } from '../../../../src/core/public';
import { mountReactNode } from '../../../../src/core/public/utils';
Expand Down Expand Up @@ -693,9 +693,8 @@ function doSearch(
const startTs = performance.now();

// Submit the search request using the `data.search` service.
return data.search
.search(req, { sessionId })
.pipe(
return lastValueFrom(
data.search.search(req, { sessionId }).pipe(
tap((res) => {
if (isCompleteResponse(res)) {
const avgResult: number | undefined = res.rawResponse.aggregations
Expand Down Expand Up @@ -724,7 +723,7 @@ function doSearch(
return of({ request: req, response: e });
})
)
.toPromise();
);
}

function getNumeric(fields?: DataViewField[]) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
"redux-actions": "^2.6.5",
"redux-devtools-extension": "^2.13.8",
"redux-logger": "^3.0.6",
"redux-observable": "^1.2.0",
"redux-observable": "2.0.0",
"redux-saga": "^1.1.3",
"redux-thunk": "^2.3.0",
"redux-thunks": "^1.0.0",
Expand All @@ -394,7 +394,7 @@
"reselect": "^4.0.0",
"resize-observer-polyfill": "^1.5.1",
"rison-node": "1.0.2",
"rxjs": "^6.5.5",
"rxjs": "^7.5.5",
"safe-squel": "^5.12.5",
"seedrandom": "^3.0.5",
"semver": "^7.3.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

// eslint-disable-next-line max-classes-per-file
import type { Observable } from 'rxjs';
import { BehaviorSubject, Subject } from 'rxjs';
import { BehaviorSubject, lastValueFrom, Subject } from 'rxjs';
import type { MockedLogger } from '@kbn/logging-mocks';
import { loggerMock } from '@kbn/logging-mocks';
import { AnalyticsClient } from './analytics_client';
Expand Down Expand Up @@ -113,14 +113,14 @@ describe('AnalyticsClient', () => {
},
});

// eslint-disable-next-line dot-notation
const internalQueuePromise = analyticsClient['internalEventQueue$']
.pipe(take(3), toArray())
.toPromise();
const internalQueuePromise = lastValueFrom(
// eslint-disable-next-line dot-notation
analyticsClient['internalEventQueue$'].pipe(take(3), toArray())
);

const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3), toArray())
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3), toArray())
);

analyticsClient.reportEvent('testEvent', { a_field: 'a' });
analyticsClient.reportEvent('testEvent', { a_field: 'b' });
Expand Down Expand Up @@ -259,7 +259,7 @@ describe('AnalyticsClient', () => {
// Typescript also helps with the config type inference <3
analyticsClient.registerShipper(MockedShipper, { telemetryCounter$: mockTelemetryCounter$ });

const counterEventPromise = analyticsClient.telemetryCounter$.pipe(take(1)).toPromise();
const counterEventPromise = lastValueFrom(analyticsClient.telemetryCounter$.pipe(take(1)));

const counter: TelemetryCounter = {
type: TelemetryCounterType.succeeded,
Expand Down Expand Up @@ -383,7 +383,7 @@ describe('AnalyticsClient', () => {
context$,
});

const globalContextPromise = globalContext$.pipe(take(2), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(2), toArray()));
context$.next({ a_field: true });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
Expand All @@ -392,7 +392,7 @@ describe('AnalyticsClient', () => {
});

test('It does not break if context emits `undefined`', async () => {
const context$ = new Subject<{ a_field: boolean }>();
const context$ = new Subject<{ a_field: boolean } | undefined | void>();
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
Expand All @@ -406,7 +406,7 @@ describe('AnalyticsClient', () => {
context$,
});

const globalContextPromise = globalContext$.pipe(take(3), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(3), toArray()));
context$.next();
context$.next(undefined);
await expect(globalContextPromise).resolves.toEqual([
Expand All @@ -431,7 +431,7 @@ describe('AnalyticsClient', () => {
context$,
});

const globalContextPromise = globalContext$.pipe(take(1), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(1), toArray()));
await expect(globalContextPromise).resolves.toEqual([
{ a_field: true }, // No original empty state
]);
Expand Down Expand Up @@ -473,7 +473,7 @@ describe('AnalyticsClient', () => {
context$: contextB$,
});

const globalContextPromise = globalContext$.pipe(take(6), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
contextA$.next({ a_field: true });
contextB$.next({ b_field: 1 });
contextB$.next({ a_field: false, b_field: 1 });
Expand Down Expand Up @@ -512,7 +512,7 @@ describe('AnalyticsClient', () => {
context$,
});

const globalContextPromise = globalContext$.pipe(take(6), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
context$.next({ b_field: 1 });
context$.next({ a_field: false, b_field: 1 });
context$.next({ a_field: true, b_field: 1 });
Expand Down Expand Up @@ -582,7 +582,7 @@ describe('AnalyticsClient', () => {
context$,
});

const globalContextPromise = globalContext$.pipe(take(4), toArray()).toPromise();
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(4), toArray()));
context$.next({ a_field: true });
// The size of the registry grows on the first emission
expect(contextProvidersRegistry.size).toBe(1);
Expand Down Expand Up @@ -736,11 +736,11 @@ describe('AnalyticsClient', () => {
// eslint-disable-next-line dot-notation
const internalEventQueue$ = analyticsClient['internalEventQueue$'];

const internalQueuePromise = internalEventQueue$.pipe(take(2), toArray()).toPromise();
const internalQueuePromise = lastValueFrom(internalEventQueue$.pipe(take(2), toArray()));

const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(2), toArray())
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(2), toArray())
);

analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
analyticsClient.reportEvent('event-type-b', { b_field: 100 });
Expand Down Expand Up @@ -781,9 +781,9 @@ describe('AnalyticsClient', () => {
});

test('Sends events from the internal queue when there are shippers and an opt-in response is true', async () => {
const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
);

// Send multiple events of 1 type to test the grouping logic as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -862,9 +862,9 @@ describe('AnalyticsClient', () => {
});

test('Discards events from the internal queue when there are shippers and an opt-in response is false', async () => {
const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3), toArray()) // Waiting for 3 enqueued
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3), toArray()) // Waiting for 3 enqueued
);

// Send multiple events of 1 type to test the grouping logic as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -904,9 +904,9 @@ describe('AnalyticsClient', () => {
});

test('Discards only one type of the enqueued events based on event_type config', async () => {
const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3 + 1), toArray()) // Waiting for 3 enqueued + 1 batch-shipped events
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3 + 1), toArray()) // Waiting for 3 enqueued + 1 batch-shipped events
);

// Send multiple events of 1 type to test the grouping logic as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -965,9 +965,9 @@ describe('AnalyticsClient', () => {
});

test('Discards the event at the shipper level (for a specific event)', async () => {
const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
);

// Send multiple events of 1 type to test the grouping logic as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -1061,9 +1061,9 @@ describe('AnalyticsClient', () => {
});

test('Discards all the events at the shipper level (globally disabled)', async () => {
const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3 + 2), toArray()) // Waiting for 3 enqueued + 2 batch-shipped events
);

// Send multiple events of 1 type to test the grouping logic as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -1154,9 +1154,9 @@ describe('AnalyticsClient', () => {
analyticsClient.registerShipper(MockedShipper1, { reportEventsMock });
analyticsClient.optIn({ global: { enabled: false } });

const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3), toArray()) // Waiting for 3 enqueued
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3), toArray()) // Waiting for 3 enqueued
);

// Send multiple events of 1 type to test the non-grouping logic at this stage as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down Expand Up @@ -1197,9 +1197,9 @@ describe('AnalyticsClient', () => {
analyticsClient.registerShipper(MockedShipper1, { reportEventsMock });
analyticsClient.optIn({ global: { enabled: true } });

const telemetryCounterPromise = analyticsClient.telemetryCounter$
.pipe(take(3 * 2), toArray()) // Waiting for 2 events per each reportEvent call: enqueued and sent_to_shipper
.toPromise();
const telemetryCounterPromise = lastValueFrom(
analyticsClient.telemetryCounter$.pipe(take(3 * 2), toArray()) // Waiting for 2 events per each reportEvent call: enqueued and sent_to_shipper
);

// Send multiple events of 1 type to test the non-grouping logic at this stage as well
analyticsClient.reportEvent('event-type-a', { a_field: 'a' });
Expand Down
10 changes: 4 additions & 6 deletions packages/kbn-cli-dev-mode/src/dev_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
* Side Public License, v 1.
*/

import { EventEmitter } from 'events';

import * as Rx from 'rxjs';
import {
map,
Expand Down Expand Up @@ -60,9 +58,9 @@ export class DevServer {
this.script = options.script;
this.argv = options.argv;
this.gracefulTimeout = options.gracefulTimeout;
this.processExit$ = options.processExit$ ?? Rx.fromEvent(process as EventEmitter, 'exit');
this.sigint$ = options.sigint$ ?? Rx.fromEvent(process as EventEmitter, 'SIGINT');
this.sigterm$ = options.sigterm$ ?? Rx.fromEvent(process as EventEmitter, 'SIGTERM');
this.processExit$ = options.processExit$ ?? Rx.fromEvent<void>(process, 'exit');
this.sigint$ = options.sigint$ ?? Rx.fromEvent<void>(process, 'SIGINT');
this.sigterm$ = options.sigterm$ ?? Rx.fromEvent<void>(process, 'SIGTERM');
this.mapLogLine = options.mapLogLine;
}

Expand Down Expand Up @@ -117,7 +115,7 @@ export class DevServer {
*/
run$ = new Rx.Observable<void>((subscriber) => {
// listen for SIGINT and forward to process if it's running, otherwise unsub
const gracefulShutdown$ = new Rx.Subject();
const gracefulShutdown$ = new Rx.Subject<void>();
subscriber.add(
this.sigint$
.pipe(
Expand Down
3 changes: 1 addition & 2 deletions packages/kbn-cli-dev-mode/src/optimizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import * as Rx from 'rxjs';
import { toArray } from 'rxjs/operators';
import { OptimizerUpdate } from '@kbn/optimizer';
import { observeLines, createReplaceSerializer } from '@kbn/dev-utils';
import { firstValueFrom } from '@kbn/std';

import { Optimizer, Options } from './optimizer';

Expand Down Expand Up @@ -130,7 +129,7 @@ it('uses options to create valid OptimizerConfig', () => {

it('is ready when optimizer phase is success or issue and logs in familiar format', async () => {
const writeLogTo = new PassThrough();
const linesPromise = firstValueFrom(observeLines(writeLogTo).pipe(toArray()));
const linesPromise = Rx.firstValueFrom(observeLines(writeLogTo).pipe(toArray()));

const { update$, optimizer } = setup({
...defaultOptions,
Expand Down
3 changes: 1 addition & 2 deletions packages/kbn-cli-dev-mode/src/watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { EventEmitter } from 'events';

import * as Rx from 'rxjs';
import { materialize, toArray } from 'rxjs/operators';
import { firstValueFrom } from '@kbn/std';

import { TestLog } from './log';
import { Watcher, Options } from './watcher';
Expand Down Expand Up @@ -112,7 +111,7 @@ it('closes chokidar watcher when unsubscribed', () => {

it('rethrows chokidar errors', async () => {
const watcher = new Watcher(defaultOptions);
const promise = firstValueFrom(watcher.run$.pipe(materialize(), toArray()));
const promise = Rx.firstValueFrom(watcher.run$.pipe(materialize(), toArray()));

isMock(mockChokidar);
mockChokidar.emit('error', new Error('foo bar'));
Expand Down
14 changes: 7 additions & 7 deletions packages/kbn-config/src/config_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Type } from '@kbn/config-schema';
import { isEqual } from 'lodash';
import { BehaviorSubject, combineLatest, Observable } from 'rxjs';
import { distinctUntilChanged, first, map, shareReplay, take, tap } from 'rxjs/operators';
import { BehaviorSubject, combineLatest, firstValueFrom, Observable } from 'rxjs';
import { distinctUntilChanged, first, map, shareReplay, tap } from 'rxjs/operators';
import { Logger, LoggerFactory } from '@kbn/logging';

import { Config, ConfigPath, Env } from '.';
Expand Down Expand Up @@ -170,7 +170,7 @@ export class ConfigService {
const namespace = pathToString(path);
const hasSchema = this.schemas.has(namespace);

const config = await this.config$.pipe(first()).toPromise();
const config = await firstValueFrom(this.config$);
if (!hasSchema && config.has(path)) {
// Throw if there is no schema, but a config exists at the path.
throw new Error(`No validation schema has been defined for [${namespace}]`);
Expand All @@ -195,13 +195,13 @@ export class ConfigService {
}

public async getUnusedPaths() {
const config = await this.config$.pipe(first()).toPromise();
const config = await firstValueFrom(this.config$);
const handledPaths = [...this.handledPaths.values()].map(pathToString);
return config.getFlattenedPaths().filter((path) => !isPathHandled(path, handledPaths));
}

public async getUsedPaths() {
const config = await this.config$.pipe(first()).toPromise();
const config = await firstValueFrom(this.config$);
const handledPaths = [...this.handledPaths.values()].map(pathToString);
return config.getFlattenedPaths().filter((path) => isPathHandled(path, handledPaths));
}
Expand All @@ -211,8 +211,8 @@ export class ConfigService {
}

private async logDeprecation() {
const rawConfig = await this.rawConfigProvider.getConfig$().pipe(take(1)).toPromise();
const deprecations = await this.deprecations.pipe(take(1)).toPromise();
const rawConfig = await firstValueFrom(this.rawConfigProvider.getConfig$());
const deprecations = await firstValueFrom(this.deprecations);
const deprecationMessages: string[] = [];
const createAddDeprecation = (domainId: string) => (context: DeprecatedConfigDetails) => {
if (!context.silent) {
Expand Down
Loading

0 comments on commit 9d5aca5

Please sign in to comment.