From 7f5aaae5c8038c202ade6b4325d2dd8e9051dd64 Mon Sep 17 00:00:00 2001 From: Jeremy Wells Date: Tue, 25 Oct 2022 16:19:42 -0400 Subject: [PATCH] feat(distinct): distinct's flush supports ObservableInput --- api_guard/dist/types/index.d.ts | 900 ++++++++++++++++++++++ api_guard/dist/types/operators/index.d.ts | 392 ++++++++++ spec-dtslint/operators/distinct-spec.ts | 73 +- src/internal/operators/distinct.ts | 12 +- 4 files changed, 1370 insertions(+), 7 deletions(-) create mode 100644 api_guard/dist/types/index.d.ts create mode 100644 api_guard/dist/types/operators/index.d.ts diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts new file mode 100644 index 0000000000..563c72c5de --- /dev/null +++ b/api_guard/dist/types/index.d.ts @@ -0,0 +1,900 @@ +export declare const animationFrame: AnimationFrameScheduler; + +export declare function animationFrames(timestampProvider?: TimestampProvider): Observable<{ + timestamp: number; + elapsed: number; +}>; + +export declare const animationFrameScheduler: AnimationFrameScheduler; + +export interface ArgumentOutOfRangeError extends Error { +} + +export declare const ArgumentOutOfRangeError: ArgumentOutOfRangeErrorCtor; + +export declare const asap: AsapScheduler; + +export declare const asapScheduler: AsapScheduler; + +export declare const async: AsyncScheduler; + +export declare const asyncScheduler: AsyncScheduler; + +export declare class AsyncSubject extends Subject { + complete(): void; + next(value: T): void; +} + +export declare function audit(durationSelector: (value: T) => ObservableInput): MonoTypeOperatorFunction; + +export declare function auditTime(duration: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface BasicGroupByOptions { + connector?: () => SubjectLike; + duration?: (grouped: GroupedObservable) => ObservableInput; + element?: undefined; +} + +export declare class BehaviorSubject extends Subject { + get value(): T; + constructor(_value: T); + getValue(): T; + next(value: T): void; +} + +export declare function bindCallback(callbackFunc: (...args: any[]) => void, resultSelector: (...args: any[]) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; +export declare function bindCallback(callbackFunc: (...args: [...A, (...res: R) => void]) => void, schedulerLike?: SchedulerLike): (...arg: A) => Observable; + +export declare function bindNodeCallback(callbackFunc: (...args: any[]) => void, resultSelector: (...args: any[]) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; +export declare function bindNodeCallback(callbackFunc: (...args: [...A, (err: any, ...res: R) => void]) => void, schedulerLike?: SchedulerLike): (...arg: A) => Observable; + +export declare function buffer(closingNotifier: Observable): OperatorFunction; + +export declare function bufferCount(bufferSize: number, startBufferEvery?: number | null): OperatorFunction; + +export declare function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; +export declare function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction; +export declare function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction; + +export declare function bufferToggle(openings: ObservableInput, closingSelector: (value: O) => ObservableInput): OperatorFunction; + +export declare function bufferWhen(closingSelector: () => ObservableInput): OperatorFunction; + +export declare function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; + +export declare const combineAll: typeof combineLatestAll; + +export declare function combineLatest(arg: T): Observable; +export declare function combineLatest(sources: []): Observable; +export declare function combineLatest(sources: readonly [...ObservableInputTuple]): Observable; +export declare function combineLatest(sources: readonly [...ObservableInputTuple], resultSelector: (...values: A) => R, scheduler: SchedulerLike): Observable; +export declare function combineLatest(sources: readonly [...ObservableInputTuple], resultSelector: (...values: A) => R): Observable; +export declare function combineLatest(sources: readonly [...ObservableInputTuple], scheduler: SchedulerLike): Observable; +export declare function combineLatest(...sources: [...ObservableInputTuple]): Observable; +export declare function combineLatest(...sourcesAndResultSelectorAndScheduler: [...ObservableInputTuple, (...values: A) => R, SchedulerLike]): Observable; +export declare function combineLatest(...sourcesAndResultSelector: [...ObservableInputTuple, (...values: A) => R]): Observable; +export declare function combineLatest(...sourcesAndScheduler: [...ObservableInputTuple, SchedulerLike]): Observable; +export declare function combineLatest(sourcesObject: { + [K in any]: never; +}): Observable; +export declare function combineLatest>>(sourcesObject: T): Observable<{ + [K in keyof T]: ObservedValueOf; +}>; + +export declare function combineLatestAll(): OperatorFunction, T[]>; +export declare function combineLatestAll(): OperatorFunction; +export declare function combineLatestAll(project: (...values: T[]) => R): OperatorFunction, R>; +export declare function combineLatestAll(project: (...values: Array) => R): OperatorFunction; + +export declare function combineLatestWith(...otherSources: [...ObservableInputTuple]): OperatorFunction>; + +export interface CompleteNotification { + kind: 'C'; +} + +export interface CompletionObserver { + closed?: boolean; + complete: () => void; + error?: (err: any) => void; + next?: (value: T) => void; +} + +export declare function concat(...inputs: [...ObservableInputTuple]): Observable; +export declare function concat(...inputsAndScheduler: [...ObservableInputTuple, SchedulerLike]): Observable; + +export declare function concatAll>(): OperatorFunction>; + +export declare function concatMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function concatMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function concatMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function concatMapTo>(observable: O): OperatorFunction>; +export declare function concatMapTo>(observable: O, resultSelector: undefined): OperatorFunction>; +export declare function concatMapTo>(observable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function concatWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare const config: GlobalConfig; + +export declare function connect>(selector: (shared: Observable) => O, config?: ConnectConfig): OperatorFunction>; + +export declare function connectable(source: ObservableInput, config?: ConnectableConfig): Connectable; + +export interface Connectable extends Observable { + connect(): Subscription; +} + +export declare class ConnectableObservable extends Observable { + protected _connection: Subscription | null; + protected _refCount: number; + protected _subject: Subject | null; + source: Observable; + protected subjectFactory: () => Subject; + constructor(source: Observable, subjectFactory: () => Subject); + protected _teardown(): void; + connect(): Subscription; + protected getSubject(): Subject; + refCount(): Observable; +} + +export interface ConnectConfig { + connector: () => SubjectLike; +} + +export declare type Cons = ((arg: X, ...rest: Y) => any) extends (...args: infer U) => any ? U : never; + +export declare function count(predicate?: (value: T, index: number) => boolean): OperatorFunction; + +export declare function debounce(durationSelector: (value: T) => ObservableInput): MonoTypeOperatorFunction; + +export declare function debounceTime(dueTime: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function defaultIfEmpty(defaultValue: R): OperatorFunction; + +export declare function defer>(observableFactory: () => R): Observable>; + +export declare function delay(due: number | Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function delayWhen(delayDurationSelector: (value: T, index: number) => Observable, subscriptionDelay: Observable): MonoTypeOperatorFunction; +export declare function delayWhen(delayDurationSelector: (value: T, index: number) => Observable): MonoTypeOperatorFunction; + +export declare function dematerialize>(): OperatorFunction>; + +export declare function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction; + +export declare function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; +export declare function distinctUntilChanged(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction; + +export declare function distinctUntilKeyChanged(key: keyof T): MonoTypeOperatorFunction; +export declare function distinctUntilKeyChanged(key: K, compare: (x: T[K], y: T[K]) => boolean): MonoTypeOperatorFunction; + +export declare function elementAt(index: number, defaultValue?: D): OperatorFunction; + +export declare function empty(scheduler?: SchedulerLike): Observable; + +export declare const EMPTY: Observable; + +export interface EmptyError extends Error { +} + +export declare const EmptyError: EmptyErrorCtor; + +export declare function endWith(scheduler: SchedulerLike): MonoTypeOperatorFunction; +export declare function endWith(...valuesAndScheduler: [...A, SchedulerLike]): OperatorFunction>; +export declare function endWith(...values: A): OperatorFunction>; + +export interface ErrorNotification { + error: any; + kind: 'E'; +} + +export interface ErrorObserver { + closed?: boolean; + complete?: () => void; + error: (err: any) => void; + next?: (value: T) => void; +} + +export declare function every(predicate: BooleanConstructor): OperatorFunction extends never ? false : boolean>; +export declare function every(predicate: BooleanConstructor, thisArg: any): OperatorFunction extends never ? false : boolean>; +export declare function every(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function every(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare const exhaust: typeof exhaustAll; + +export declare function exhaustAll>(): OperatorFunction>; + +export declare function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function exhaustMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function exhaustMap(project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function expand>(project: (value: T, index: number) => O, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function expand>(project: (value: T, index: number) => O, concurrent: number | undefined, scheduler: SchedulerLike): OperatorFunction>; + +export declare type FactoryOrValue = T | (() => T); + +export declare type Falsy = null | undefined | false | 0 | -0 | 0n | ''; + +export declare function filter(predicate: (this: A, value: T, index: number) => value is S, thisArg: A): OperatorFunction; +export declare function filter(predicate: (value: T, index: number) => value is S): OperatorFunction; +export declare function filter(predicate: BooleanConstructor): OperatorFunction>; +export declare function filter(predicate: (this: A, value: T, index: number) => boolean, thisArg: A): MonoTypeOperatorFunction; +export declare function filter(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction; + +export declare function finalize(callback: () => void): MonoTypeOperatorFunction; + +export declare function find(predicate: BooleanConstructor): OperatorFunction>; +export declare function find(predicate: (this: A, value: T, index: number, source: Observable) => value is S, thisArg: A): OperatorFunction; +export declare function find(predicate: (value: T, index: number, source: Observable) => value is S): OperatorFunction; +export declare function find(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function find(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare function findIndex(predicate: BooleanConstructor): OperatorFunction; +export declare function findIndex(predicate: BooleanConstructor, thisArg: any): OperatorFunction; +export declare function findIndex(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function findIndex(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare function first(predicate?: null, defaultValue?: D): OperatorFunction; +export declare function first(predicate: BooleanConstructor): OperatorFunction>; +export declare function first(predicate: BooleanConstructor, defaultValue: D): OperatorFunction | D>; +export declare function first(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue?: S): OperatorFunction; +export declare function first(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue: D): OperatorFunction; +export declare function first(predicate: (value: T, index: number, source: Observable) => boolean, defaultValue?: D): OperatorFunction; + +export declare function firstValueFrom(source: Observable, config: FirstValueFromConfig): Promise; +export declare function firstValueFrom(source: Observable): Promise; + +export declare const flatMap: typeof mergeMap; + +export declare function forkJoin(arg: T): Observable; +export declare function forkJoin(scheduler: null | undefined): Observable; +export declare function forkJoin(sources: readonly []): Observable; +export declare function forkJoin(sources: readonly [...ObservableInputTuple]): Observable; +export declare function forkJoin(sources: readonly [...ObservableInputTuple], resultSelector: (...values: A) => R): Observable; +export declare function forkJoin(...sources: [...ObservableInputTuple]): Observable; +export declare function forkJoin(...sourcesAndResultSelector: [...ObservableInputTuple, (...values: A) => R]): Observable; +export declare function forkJoin(sourcesObject: { + [K in any]: never; +}): Observable; +export declare function forkJoin>>(sourcesObject: T): Observable<{ + [K in keyof T]: ObservedValueOf; +}>; + +export declare function from>(input: O): Observable>; +export declare function from>(input: O, scheduler: SchedulerLike | undefined): Observable>; + +export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string): Observable; +export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string, resultSelector: (event: T) => R): Observable; +export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string, options: EventListenerOptions): Observable; +export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string, options: EventListenerOptions, resultSelector: (event: T) => R): Observable; +export declare function fromEvent(target: NodeStyleEventEmitter | ArrayLike, eventName: string): Observable; +export declare function fromEvent(target: NodeStyleEventEmitter | ArrayLike, eventName: string): Observable; +export declare function fromEvent(target: NodeStyleEventEmitter | ArrayLike, eventName: string, resultSelector: (...args: any[]) => R): Observable; +export declare function fromEvent(target: NodeCompatibleEventEmitter | ArrayLike, eventName: string): Observable; +export declare function fromEvent(target: NodeCompatibleEventEmitter | ArrayLike, eventName: string): Observable; +export declare function fromEvent(target: NodeCompatibleEventEmitter | ArrayLike, eventName: string, resultSelector: (...args: any[]) => R): Observable; +export declare function fromEvent(target: JQueryStyleEventEmitter | ArrayLike>, eventName: string): Observable; +export declare function fromEvent(target: JQueryStyleEventEmitter | ArrayLike>, eventName: string, resultSelector: (value: T, ...args: any[]) => R): Observable; + +export declare function fromEventPattern(addHandler: (handler: NodeEventHandler) => any, removeHandler?: (handler: NodeEventHandler, signal?: any) => void): Observable; +export declare function fromEventPattern(addHandler: (handler: NodeEventHandler) => any, removeHandler?: (handler: NodeEventHandler, signal?: any) => void, resultSelector?: (...args: any[]) => T): Observable; + +export declare function generate(initialState: S, condition: ConditionFunc, iterate: IterateFunc, resultSelector: ResultFunc, scheduler?: SchedulerLike): Observable; +export declare function generate(initialState: S, condition: ConditionFunc, iterate: IterateFunc, scheduler?: SchedulerLike): Observable; +export declare function generate(options: GenerateBaseOptions): Observable; +export declare function generate(options: GenerateOptions): Observable; + +export interface GlobalConfig { + Promise?: PromiseConstructorLike; + onStoppedNotification: ((notification: ObservableNotification, subscriber: Subscriber) => void) | null; + onUnhandledError: ((err: any) => void) | null; + useDeprecatedNextContext: boolean; + useDeprecatedSynchronousErrorHandling: boolean; +} + +export declare function groupBy(key: (value: T) => K, options: BasicGroupByOptions): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, options: GroupByOptionsWithElement): OperatorFunction>; +export declare function groupBy(key: (value: T) => value is K): OperatorFunction | GroupedObservable>>; +export declare function groupBy(key: (value: T) => K): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element: void, duration: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable, connector?: () => Subject): OperatorFunction>; + +export interface GroupByOptionsWithElement { + connector?: () => SubjectLike; + duration?: (grouped: GroupedObservable) => ObservableInput; + element: (value: T) => E; +} + +export interface GroupedObservable extends Observable { + readonly key: K; +} + +export declare type Head = ((...args: X) => any) extends (arg: infer U, ...rest: any[]) => any ? U : never; + +export declare function identity(x: T): T; + +export declare function ignoreElements(): OperatorFunction; + +export declare function iif(condition: () => boolean, trueResult: ObservableInput, falseResult: ObservableInput): Observable; + +export interface InteropObservable { + [Symbol.observable]: () => Subscribable; +} + +export declare function interval(period?: number, scheduler?: SchedulerLike): Observable; + +export declare function isEmpty(): OperatorFunction; + +export declare function isObservable(obj: any): obj is Observable; + +export declare function last(predicate: BooleanConstructor): OperatorFunction>; +export declare function last(predicate: BooleanConstructor, defaultValue: D): OperatorFunction | D>; +export declare function last(predicate?: null, defaultValue?: D): OperatorFunction; +export declare function last(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue?: S): OperatorFunction; +export declare function last(predicate: (value: T, index: number, source: Observable) => boolean, defaultValue?: D): OperatorFunction; + +export declare function lastValueFrom(source: Observable, config: LastValueFromConfig): Promise; +export declare function lastValueFrom(source: Observable): Promise; + +export declare function map(project: (value: T, index: number) => R): OperatorFunction; +export declare function map(project: (this: A, value: T, index: number) => R, thisArg: A): OperatorFunction; + +export declare function mapTo(value: R): OperatorFunction; +export declare function mapTo(value: R): OperatorFunction; + +export declare function materialize(): OperatorFunction & ObservableNotification>; + +export declare function max(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction; + +export declare function merge(...sources: [...ObservableInputTuple]): Observable; +export declare function merge(...sourcesAndConcurrency: [...ObservableInputTuple, number?]): Observable; +export declare function merge(...sourcesAndScheduler: [...ObservableInputTuple, SchedulerLike?]): Observable; +export declare function merge(...sourcesAndConcurrencyAndScheduler: [...ObservableInputTuple, number?, SchedulerLike?]): Observable; + +export declare function mergeAll>(concurrent?: number): OperatorFunction>; + +export declare function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; +export declare function mergeMap>(project: (value: T, index: number) => O, resultSelector: undefined, concurrent?: number): OperatorFunction>; +export declare function mergeMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; + +export declare function mergeMapTo>(innerObservable: O, concurrent?: number): OperatorFunction>; +export declare function mergeMapTo>(innerObservable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; + +export declare function mergeScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed: R, concurrent?: number): OperatorFunction; + +export declare function mergeWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function min(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction; + +export interface MonoTypeOperatorFunction extends OperatorFunction { +} + +export declare function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; +export declare function multicast>(subject: Subject, selector: (shared: Observable) => O): OperatorFunction>; +export declare function multicast(subjectFactory: () => Subject): UnaryFunction, ConnectableObservable>; +export declare function multicast>(subjectFactory: () => Subject, selector: (shared: Observable) => O): OperatorFunction>; + +export declare function never(): Observable; + +export declare const NEVER: Observable; + +export interface NextNotification { + kind: 'N'; + value: T; +} + +export interface NextObserver { + closed?: boolean; + complete?: () => void; + error?: (err: any) => void; + next: (value: T) => void; +} + +export declare function noop(): void; + +export interface NotFoundError extends Error { +} + +export declare const NotFoundError: NotFoundErrorCtor; + +export declare class Notification { + readonly error?: any; + readonly hasValue: boolean; + readonly kind: 'N' | 'E' | 'C'; + readonly value?: T | undefined; + constructor(kind: 'C'); + constructor(kind: 'E', value: undefined, error: any); + constructor(kind: 'N', value?: T); + accept(next: (value: T) => void, error: (err: any) => void, complete: () => void): void; + accept(next: (value: T) => void, error: (err: any) => void): void; + accept(next: (value: T) => void): void; + accept(observer: PartialObserver): void; + do(next: (value: T) => void, error: (err: any) => void, complete: () => void): void; + do(next: (value: T) => void, error: (err: any) => void): void; + do(next: (value: T) => void): void; + observe(observer: PartialObserver): void; + toObservable(): Observable; + static createComplete(): Notification & CompleteNotification; + static createError(err?: any): Notification & ErrorNotification; + static createNext(value: T): Notification & NextNotification; +} + +export declare enum NotificationKind { + NEXT = "N", + ERROR = "E", + COMPLETE = "C" +} + +export interface ObjectUnsubscribedError extends Error { +} + +export declare const ObjectUnsubscribedError: ObjectUnsubscribedErrorCtor; + +export declare const observable: string | symbol; + +export declare class Observable implements Subscribable { + operator: Operator | undefined; + source: Observable | undefined; + constructor(subscribe?: (this: Observable, subscriber: Subscriber) => TeardownLogic); + forEach(next: (value: T) => void): Promise; + forEach(next: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise; + lift(operator?: Operator): Observable; + pipe(): Observable; + pipe(op1: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction): Observable; + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction, ...operations: OperatorFunction[]): Observable; + subscribe(observer?: Partial>): Subscription; + subscribe(next: (value: T) => void): Subscription; + subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription; + toPromise(): Promise; + toPromise(PromiseCtor: typeof Promise): Promise; + toPromise(PromiseCtor: PromiseConstructorLike): Promise; + static create: (...args: any[]) => any; +} + +export declare type ObservableInput = Observable | InteropObservable | AsyncIterable | PromiseLike | ArrayLike | Iterable | ReadableStreamLike; + +export declare type ObservableInputTuple = { + [K in keyof T]: ObservableInput; +}; + +export declare type ObservableLike = InteropObservable; + +export declare type ObservableNotification = NextNotification | ErrorNotification | CompleteNotification; + +export declare type ObservedValueOf = O extends ObservableInput ? T : never; + +export declare type ObservedValuesFromArray = ObservedValueUnionFromArray; + +export declare type ObservedValueTupleFromArray = { + [K in keyof X]: ObservedValueOf; +}; + +export declare type ObservedValueUnionFromArray = X extends Array> ? T : never; + +export declare function observeOn(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction; + +export interface Observer { + complete: () => void; + error: (err: any) => void; + next: (value: T) => void; +} + +export declare function of(value: null): Observable; +export declare function of(value: undefined): Observable; +export declare function of(scheduler: SchedulerLike): Observable; +export declare function of(...valuesAndScheduler: [...A, SchedulerLike]): Observable>; +export declare function of(): Observable; +export declare function of(): Observable; +export declare function of(value: T): Observable; +export declare function of(...values: A): Observable>; + +export declare function onErrorResumeNext(sources: [...ObservableInputTuple]): Observable; +export declare function onErrorResumeNext(...sources: [...ObservableInputTuple]): Observable; + +export interface Operator { + call(subscriber: Subscriber, source: any): TeardownLogic; +} + +export interface OperatorFunction extends UnaryFunction, Observable> { +} + +export declare function pairs(arr: readonly T[], scheduler?: SchedulerLike): Observable<[string, T]>; +export declare function pairs>(obj: O, scheduler?: SchedulerLike): Observable<[keyof O, O[keyof O]]>; +export declare function pairs(iterable: Iterable, scheduler?: SchedulerLike): Observable<[string, T]>; +export declare function pairs(n: number | bigint | boolean | ((...args: any[]) => any) | symbol, scheduler?: SchedulerLike): Observable<[never, never]>; + +export declare function pairwise(): OperatorFunction; + +export declare type PartialObserver = NextObserver | ErrorObserver | CompletionObserver; + +export declare function partition(source: ObservableInput, predicate: (this: A, value: T, index: number) => value is U, thisArg: A): [Observable, Observable>]; +export declare function partition(source: ObservableInput, predicate: (value: T, index: number) => value is U): [Observable, Observable>]; +export declare function partition(source: ObservableInput, predicate: (this: A, value: T, index: number) => boolean, thisArg: A): [Observable, Observable]; +export declare function partition(source: ObservableInput, predicate: (value: T, index: number) => boolean): [Observable, Observable]; + +export declare function pipe(): typeof identity; +export declare function pipe(fn1: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction, fn6: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction, fn6: UnaryFunction, fn7: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction, fn6: UnaryFunction, fn7: UnaryFunction, fn8: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction, fn6: UnaryFunction, fn7: UnaryFunction, fn8: UnaryFunction, fn9: UnaryFunction): UnaryFunction; +export declare function pipe(fn1: UnaryFunction, fn2: UnaryFunction, fn3: UnaryFunction, fn4: UnaryFunction, fn5: UnaryFunction, fn6: UnaryFunction, fn7: UnaryFunction, fn8: UnaryFunction, fn9: UnaryFunction, ...fns: UnaryFunction[]): UnaryFunction; + +export declare function pluck(k1: K1): OperatorFunction; +export declare function pluck(k1: K1, k2: K2): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6, ...rest: string[]): OperatorFunction; +export declare function pluck(...properties: string[]): OperatorFunction; + +export declare function publish(): UnaryFunction, ConnectableObservable>; +export declare function publish>(selector: (shared: Observable) => O): OperatorFunction>; + +export declare function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable>; + +export declare function publishLast(): UnaryFunction, ConnectableObservable>; + +export declare function publishReplay(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider): MonoTypeOperatorFunction; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: (shared: Observable) => O, timestampProvider?: TimestampProvider): OperatorFunction>; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: undefined, timestampProvider: TimestampProvider): OperatorFunction>; + +export declare const queue: QueueScheduler; + +export declare const queueScheduler: QueueScheduler; + +export declare function race(inputs: [...ObservableInputTuple]): Observable; +export declare function race(...inputs: [...ObservableInputTuple]): Observable; + +export declare function raceWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function range(start: number, count?: number): Observable; +export declare function range(start: number, count: number | undefined, scheduler: SchedulerLike): Observable; + +export interface ReadableStreamLike { + getReader(): ReadableStreamDefaultReaderLike; +} + +export declare function reduce(accumulator: (acc: A | V, value: V, index: number) => A): OperatorFunction; +export declare function reduce(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export declare function reduce(accumulator: (acc: A | S, value: V, index: number) => A, seed: S): OperatorFunction; + +export declare function refCount(): MonoTypeOperatorFunction; + +export declare function repeat(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction; + +export declare function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction; + +export declare class ReplaySubject extends Subject { + constructor(_bufferSize?: number, _windowTime?: number, _timestampProvider?: TimestampProvider); + next(value: T): void; +} + +export declare function retry(count?: number): MonoTypeOperatorFunction; +export declare function retry(config: RetryConfig): MonoTypeOperatorFunction; + +export interface RetryConfig { + count?: number; + delay?: number | ((error: any, retryCount: number) => ObservableInput); + resetOnSuccess?: boolean; +} + +export declare function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction; + +export declare function sample(notifier: Observable): MonoTypeOperatorFunction; + +export declare function sampleTime(period: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function scan(accumulator: (acc: A | V, value: V, index: number) => A): OperatorFunction; +export declare function scan(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export declare function scan(accumulator: (acc: A | S, value: V, index: number) => A, seed: S): OperatorFunction; + +export declare function scheduled(input: ObservableInput, scheduler: SchedulerLike): Observable; + +export declare class Scheduler implements SchedulerLike { + now: () => number; + constructor(schedulerActionCtor: typeof Action, now?: () => number); + schedule(work: (this: SchedulerAction, state?: T) => void, delay?: number, state?: T): Subscription; + static now: () => number; +} + +export interface SchedulerAction extends Subscription { + schedule(state?: T, delay?: number): Subscription; +} + +export interface SchedulerLike extends TimestampProvider { + schedule(work: (this: SchedulerAction, state: T) => void, delay: number, state: T): Subscription; + schedule(work: (this: SchedulerAction, state?: T) => void, delay: number, state?: T): Subscription; + schedule(work: (this: SchedulerAction, state?: T) => void, delay?: number, state?: T): Subscription; +} + +export declare function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction; + +export interface SequenceError extends Error { +} + +export declare const SequenceError: SequenceErrorCtor; + +export declare function share(): MonoTypeOperatorFunction; +export declare function share(options: ShareConfig): MonoTypeOperatorFunction; + +export interface ShareConfig { + connector?: () => SubjectLike; + resetOnComplete?: boolean | (() => Observable); + resetOnError?: boolean | ((error: any) => Observable); + resetOnRefCountZero?: boolean | (() => Observable); +} + +export declare function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; +export declare function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface ShareReplayConfig { + bufferSize?: number; + refCount: boolean; + scheduler?: SchedulerLike; + windowTime?: number; +} + +export declare function single(predicate: BooleanConstructor): OperatorFunction>; +export declare function single(predicate?: (value: T, index: number, source: Observable) => boolean): MonoTypeOperatorFunction; + +export declare function skip(count: number): MonoTypeOperatorFunction; + +export declare function skipLast(skipCount: number): MonoTypeOperatorFunction; + +export declare function skipUntil(notifier: Observable): MonoTypeOperatorFunction; + +export declare function skipWhile(predicate: BooleanConstructor): OperatorFunction extends never ? never : T>; +export declare function skipWhile(predicate: (value: T, index: number) => true): OperatorFunction; +export declare function skipWhile(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction; + +export declare function startWith(value: null): OperatorFunction; +export declare function startWith(value: undefined): OperatorFunction; +export declare function startWith(...valuesAndScheduler: [...A, SchedulerLike]): OperatorFunction>; +export declare function startWith(...values: A): OperatorFunction>; + +export declare class Subject extends Observable implements SubscriptionLike { + closed: boolean; + hasError: boolean; + isStopped: boolean; + get observed(): boolean; + observers: Observer[]; + thrownError: any; + constructor(); + asObservable(): Observable; + complete(): void; + error(err: any): void; + lift(operator: Operator): Observable; + next(value: T): void; + unsubscribe(): void; + static create: (...args: any[]) => any; +} + +export interface SubjectLike extends Observer, Subscribable { +} + +export interface Subscribable { + subscribe(observer: Partial>): Unsubscribable; +} + +export declare type SubscribableOrPromise = Subscribable | Subscribable | PromiseLike | InteropObservable; + +export declare function subscribeOn(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction; + +export declare class Subscriber extends Subscription implements Observer { + protected destination: Subscriber | Observer; + protected isStopped: boolean; + constructor(destination?: Subscriber | Observer); + protected _complete(): void; + protected _error(err: any): void; + protected _next(value: T): void; + complete(): void; + error(err?: any): void; + next(value?: T): void; + unsubscribe(): void; + static create(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber; +} + +export declare class Subscription implements SubscriptionLike { + closed: boolean; + constructor(initialTeardown?: (() => void) | undefined); + add(teardown: TeardownLogic): void; + remove(teardown: Exclude): void; + unsubscribe(): void; + static EMPTY: Subscription; +} + +export interface SubscriptionLike extends Unsubscribable { + readonly closed: boolean; + unsubscribe(): void; +} + +export declare function switchAll>(): OperatorFunction>; + +export declare function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function switchMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function switchMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function switchMapTo>(observable: O): OperatorFunction>; +export declare function switchMapTo>(observable: O, resultSelector: undefined): OperatorFunction>; +export declare function switchMapTo>(observable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function switchScan>(accumulator: (acc: R, value: T, index: number) => O, seed: R): OperatorFunction>; + +export declare type Tail = ((...args: X) => any) extends (arg: any, ...rest: infer U) => any ? U : never; + +export declare function take(count: number): MonoTypeOperatorFunction; + +export declare function takeLast(count: number): MonoTypeOperatorFunction; + +export declare function takeUntil(notifier: ObservableInput): MonoTypeOperatorFunction; + +export declare function takeWhile(predicate: BooleanConstructor, inclusive: true): MonoTypeOperatorFunction; +export declare function takeWhile(predicate: BooleanConstructor, inclusive: false): OperatorFunction>; +export declare function takeWhile(predicate: BooleanConstructor): OperatorFunction>; +export declare function takeWhile(predicate: (value: T, index: number) => value is S): OperatorFunction; +export declare function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; +export declare function takeWhile(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction; + +export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; +export declare function tap(next: (value: T) => void): MonoTypeOperatorFunction; +export declare function tap(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction; + +export declare type TeardownLogic = Subscription | Unsubscribable | (() => void) | void; + +export declare function throttle(durationSelector: (value: T) => ObservableInput, config?: ThrottleConfig): MonoTypeOperatorFunction; + +export interface ThrottleConfig { + leading?: boolean; + trailing?: boolean; +} + +export declare function throttleTime(duration: number, scheduler?: SchedulerLike, config?: import("./throttle").ThrottleConfig): MonoTypeOperatorFunction; + +export declare function throwError(errorFactory: () => any): Observable; +export declare function throwError(error: any): Observable; +export declare function throwError(errorOrErrorFactory: any, scheduler: SchedulerLike): Observable; + +export declare function throwIfEmpty(errorFactory?: () => any): MonoTypeOperatorFunction; + +export declare function timeInterval(scheduler?: SchedulerLike): OperatorFunction>; + +export interface TimeInterval { + interval: number; + value: T; +} + +export declare function timeout, M = unknown>(config: TimeoutConfig & { + with: (info: TimeoutInfo) => O; +}): OperatorFunction>; +export declare function timeout(config: Omit, 'with'>): OperatorFunction; +export declare function timeout(first: Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export declare function timeout(each: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface TimeoutConfig = ObservableInput, M = unknown> { + each?: number; + first?: number | Date; + meta?: M; + scheduler?: SchedulerLike; + with?: (info: TimeoutInfo) => O; +} + +export interface TimeoutError extends Error { + info: TimeoutInfo | null; +} + +export declare const TimeoutError: TimeoutErrorCtor; + +export interface TimeoutInfo { + readonly lastValue: T | null; + readonly meta: M; + readonly seen: number; +} + +export declare function timeoutWith(dueBy: Date, switchTo: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; +export declare function timeoutWith(waitFor: number, switchTo: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; + +export declare function timer(due: number | Date, scheduler?: SchedulerLike): Observable<0>; +export declare function timer(startDue: number | Date, intervalDuration: number, scheduler?: SchedulerLike): Observable; +export declare function timer(dueTime: number | Date, unused: undefined, scheduler?: SchedulerLike): Observable<0>; + +export declare function timestamp(timestampProvider?: TimestampProvider): OperatorFunction>; + +export interface Timestamp { + timestamp: number; + value: T; +} + +export interface TimestampProvider { + now(): number; +} + +export declare function toArray(): OperatorFunction; + +export declare type TruthyTypesOf = T extends Falsy ? never : T; + +export interface UnaryFunction { + (source: T): R; +} + +export interface Unsubscribable { + unsubscribe(): void; +} + +export interface UnsubscriptionError extends Error { + readonly errors: any[]; +} + +export declare const UnsubscriptionError: UnsubscriptionErrorCtor; + +export declare function using>(resourceFactory: () => Unsubscribable | void, observableFactory: (resource: Unsubscribable | void) => T | void): Observable>; + +export declare type ValueFromArray = A extends Array ? T : never; + +export declare type ValueFromNotification = T extends { + kind: 'N' | 'E' | 'C'; +} ? T extends NextNotification ? T extends { + value: infer V; +} ? V : undefined : never : never; + +export declare class VirtualAction extends AsyncAction { + protected active: boolean; + protected index: number; + protected scheduler: VirtualTimeScheduler; + protected work: (this: SchedulerAction, state?: T) => void; + constructor(scheduler: VirtualTimeScheduler, work: (this: SchedulerAction, state?: T) => void, index?: number); + protected _execute(state: T, delay: number): any; + protected recycleAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay?: number): TimerHandle | undefined; + protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay?: number): TimerHandle; + schedule(state?: T, delay?: number): Subscription; +} + +export declare class VirtualTimeScheduler extends AsyncScheduler { + frame: number; + index: number; + maxFrames: number; + constructor(schedulerActionCtor?: typeof AsyncAction, maxFrames?: number); + flush(): void; + static frameTimeFactor: number; +} + +export declare function window(windowBoundaries: Observable): OperatorFunction>; + +export declare function windowCount(windowSize: number, startWindowEvery?: number): OperatorFunction>; + +export declare function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function windowTime(windowTimeSpan: number, windowCreationInterval: number | null | void, maxWindowSize: number, scheduler?: SchedulerLike): OperatorFunction>; + +export declare function windowToggle(openings: ObservableInput, closingSelector: (openValue: O) => ObservableInput): OperatorFunction>; + +export declare function windowWhen(closingSelector: () => ObservableInput): OperatorFunction>; + +export declare function withLatestFrom(...inputs: [...ObservableInputTuple]): OperatorFunction; +export declare function withLatestFrom(...inputs: [...ObservableInputTuple, (...value: [T, ...O]) => R]): OperatorFunction; + +export declare function zip(sources: [...ObservableInputTuple]): Observable; +export declare function zip(sources: [...ObservableInputTuple], resultSelector: (...values: A) => R): Observable; +export declare function zip(...sources: [...ObservableInputTuple]): Observable; +export declare function zip(...sourcesAndResultSelector: [...ObservableInputTuple, (...values: A) => R]): Observable; + +export declare function zipAll(): OperatorFunction, T[]>; +export declare function zipAll(): OperatorFunction; +export declare function zipAll(project: (...values: T[]) => R): OperatorFunction, R>; +export declare function zipAll(project: (...values: Array) => R): OperatorFunction; + +export declare function zipWith(...otherInputs: [...ObservableInputTuple]): OperatorFunction>; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts new file mode 100644 index 0000000000..6f3200e9a3 --- /dev/null +++ b/api_guard/dist/types/operators/index.d.ts @@ -0,0 +1,392 @@ +export declare function audit(durationSelector: (value: T) => ObservableInput): MonoTypeOperatorFunction; + +export declare function auditTime(duration: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface BasicGroupByOptions { + connector?: () => SubjectLike; + duration?: (grouped: GroupedObservable) => ObservableInput; + element?: undefined; +} + +export declare function buffer(closingNotifier: Observable): OperatorFunction; + +export declare function bufferCount(bufferSize: number, startBufferEvery?: number | null): OperatorFunction; + +export declare function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; +export declare function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction; +export declare function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction; + +export declare function bufferToggle(openings: ObservableInput, closingSelector: (value: O) => ObservableInput): OperatorFunction; + +export declare function bufferWhen(closingSelector: () => ObservableInput): OperatorFunction; + +export declare function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; + +export declare const combineAll: typeof combineLatestAll; + +export declare function combineLatest(sources: [...ObservableInputTuple], project: (...values: [T, ...A]) => R): OperatorFunction; +export declare function combineLatest(sources: [...ObservableInputTuple]): OperatorFunction; +export declare function combineLatest(...sourcesAndProject: [...ObservableInputTuple, (...values: [T, ...A]) => R]): OperatorFunction; +export declare function combineLatest(...sources: [...ObservableInputTuple]): OperatorFunction; + +export declare function combineLatestAll(): OperatorFunction, T[]>; +export declare function combineLatestAll(): OperatorFunction; +export declare function combineLatestAll(project: (...values: T[]) => R): OperatorFunction, R>; +export declare function combineLatestAll(project: (...values: Array) => R): OperatorFunction; + +export declare function combineLatestWith(...otherSources: [...ObservableInputTuple]): OperatorFunction>; + +export declare function concat(...sources: [...ObservableInputTuple]): OperatorFunction; +export declare function concat(...sourcesAndScheduler: [...ObservableInputTuple, SchedulerLike]): OperatorFunction; + +export declare function concatAll>(): OperatorFunction>; + +export declare function concatMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function concatMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function concatMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function concatMapTo>(observable: O): OperatorFunction>; +export declare function concatMapTo>(observable: O, resultSelector: undefined): OperatorFunction>; +export declare function concatMapTo>(observable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function concatWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function connect>(selector: (shared: Observable) => O, config?: ConnectConfig): OperatorFunction>; + +export interface ConnectConfig { + connector: () => SubjectLike; +} + +export declare function count(predicate?: (value: T, index: number) => boolean): OperatorFunction; + +export declare function debounce(durationSelector: (value: T) => ObservableInput): MonoTypeOperatorFunction; + +export declare function debounceTime(dueTime: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function defaultIfEmpty(defaultValue: R): OperatorFunction; + +export declare function delay(due: number | Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function delayWhen(delayDurationSelector: (value: T, index: number) => Observable, subscriptionDelay: Observable): MonoTypeOperatorFunction; +export declare function delayWhen(delayDurationSelector: (value: T, index: number) => Observable): MonoTypeOperatorFunction; + +export declare function dematerialize>(): OperatorFunction>; + +export declare function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction; + +export declare function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; +export declare function distinctUntilChanged(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction; + +export declare function distinctUntilKeyChanged(key: keyof T): MonoTypeOperatorFunction; +export declare function distinctUntilKeyChanged(key: K, compare: (x: T[K], y: T[K]) => boolean): MonoTypeOperatorFunction; + +export declare function elementAt(index: number, defaultValue?: D): OperatorFunction; + +export declare function endWith(scheduler: SchedulerLike): MonoTypeOperatorFunction; +export declare function endWith(...valuesAndScheduler: [...A, SchedulerLike]): OperatorFunction>; +export declare function endWith(...values: A): OperatorFunction>; + +export declare function every(predicate: BooleanConstructor): OperatorFunction extends never ? false : boolean>; +export declare function every(predicate: BooleanConstructor, thisArg: any): OperatorFunction extends never ? false : boolean>; +export declare function every(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function every(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare const exhaust: typeof exhaustAll; + +export declare function exhaustAll>(): OperatorFunction>; + +export declare function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function exhaustMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function exhaustMap(project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function expand>(project: (value: T, index: number) => O, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function expand>(project: (value: T, index: number) => O, concurrent: number | undefined, scheduler: SchedulerLike): OperatorFunction>; + +export declare function filter(predicate: (this: A, value: T, index: number) => value is S, thisArg: A): OperatorFunction; +export declare function filter(predicate: (value: T, index: number) => value is S): OperatorFunction; +export declare function filter(predicate: BooleanConstructor): OperatorFunction>; +export declare function filter(predicate: (this: A, value: T, index: number) => boolean, thisArg: A): MonoTypeOperatorFunction; +export declare function filter(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction; + +export declare function finalize(callback: () => void): MonoTypeOperatorFunction; + +export declare function find(predicate: BooleanConstructor): OperatorFunction>; +export declare function find(predicate: (this: A, value: T, index: number, source: Observable) => value is S, thisArg: A): OperatorFunction; +export declare function find(predicate: (value: T, index: number, source: Observable) => value is S): OperatorFunction; +export declare function find(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function find(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare function findIndex(predicate: BooleanConstructor): OperatorFunction; +export declare function findIndex(predicate: BooleanConstructor, thisArg: any): OperatorFunction; +export declare function findIndex(predicate: (this: A, value: T, index: number, source: Observable) => boolean, thisArg: A): OperatorFunction; +export declare function findIndex(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; + +export declare function first(predicate?: null, defaultValue?: D): OperatorFunction; +export declare function first(predicate: BooleanConstructor): OperatorFunction>; +export declare function first(predicate: BooleanConstructor, defaultValue: D): OperatorFunction | D>; +export declare function first(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue?: S): OperatorFunction; +export declare function first(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue: D): OperatorFunction; +export declare function first(predicate: (value: T, index: number, source: Observable) => boolean, defaultValue?: D): OperatorFunction; + +export declare const flatMap: typeof mergeMap; + +export declare function groupBy(key: (value: T) => K, options: BasicGroupByOptions): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, options: GroupByOptionsWithElement): OperatorFunction>; +export declare function groupBy(key: (value: T) => value is K): OperatorFunction | GroupedObservable>>; +export declare function groupBy(key: (value: T) => K): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element: void, duration: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable, connector?: () => Subject): OperatorFunction>; + +export interface GroupByOptionsWithElement { + connector?: () => SubjectLike; + duration?: (grouped: GroupedObservable) => ObservableInput; + element: (value: T) => E; +} + +export declare function ignoreElements(): OperatorFunction; + +export declare function isEmpty(): OperatorFunction; + +export declare function last(predicate: BooleanConstructor): OperatorFunction>; +export declare function last(predicate: BooleanConstructor, defaultValue: D): OperatorFunction | D>; +export declare function last(predicate?: null, defaultValue?: D): OperatorFunction; +export declare function last(predicate: (value: T, index: number, source: Observable) => value is S, defaultValue?: S): OperatorFunction; +export declare function last(predicate: (value: T, index: number, source: Observable) => boolean, defaultValue?: D): OperatorFunction; + +export declare function map(project: (value: T, index: number) => R): OperatorFunction; +export declare function map(project: (this: A, value: T, index: number) => R, thisArg: A): OperatorFunction; + +export declare function mapTo(value: R): OperatorFunction; +export declare function mapTo(value: R): OperatorFunction; + +export declare function materialize(): OperatorFunction & ObservableNotification>; + +export declare function max(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction; + +export declare function merge(...sources: [...ObservableInputTuple]): OperatorFunction; +export declare function merge(...sourcesAndConcurrency: [...ObservableInputTuple, number]): OperatorFunction; +export declare function merge(...sourcesAndScheduler: [...ObservableInputTuple, SchedulerLike]): OperatorFunction; +export declare function merge(...sourcesAndConcurrencyAndScheduler: [...ObservableInputTuple, number, SchedulerLike]): OperatorFunction; + +export declare function mergeAll>(concurrent?: number): OperatorFunction>; + +export declare function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; +export declare function mergeMap>(project: (value: T, index: number) => O, resultSelector: undefined, concurrent?: number): OperatorFunction>; +export declare function mergeMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; + +export declare function mergeMapTo>(innerObservable: O, concurrent?: number): OperatorFunction>; +export declare function mergeMapTo>(innerObservable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; + +export declare function mergeScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed: R, concurrent?: number): OperatorFunction; + +export declare function mergeWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function min(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction; + +export declare function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; +export declare function multicast>(subject: Subject, selector: (shared: Observable) => O): OperatorFunction>; +export declare function multicast(subjectFactory: () => Subject): UnaryFunction, ConnectableObservable>; +export declare function multicast>(subjectFactory: () => Subject, selector: (shared: Observable) => O): OperatorFunction>; + +export declare function observeOn(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction; + +export declare function onErrorResumeNext(sources: [...ObservableInputTuple]): OperatorFunction; +export declare function onErrorResumeNext(...sources: [...ObservableInputTuple]): OperatorFunction; + +export declare function pairwise(): OperatorFunction; + +export declare function partition(predicate: (value: T, index: number) => boolean, thisArg?: any): UnaryFunction, [Observable, Observable]>; + +export declare function pluck(k1: K1): OperatorFunction; +export declare function pluck(k1: K1, k2: K2): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6): OperatorFunction; +export declare function pluck(k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6, ...rest: string[]): OperatorFunction; +export declare function pluck(...properties: string[]): OperatorFunction; + +export declare function publish(): UnaryFunction, ConnectableObservable>; +export declare function publish>(selector: (shared: Observable) => O): OperatorFunction>; + +export declare function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable>; + +export declare function publishLast(): UnaryFunction, ConnectableObservable>; + +export declare function publishReplay(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider): MonoTypeOperatorFunction; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: (shared: Observable) => O, timestampProvider?: TimestampProvider): OperatorFunction>; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: undefined, timestampProvider: TimestampProvider): OperatorFunction>; + +export declare function race(otherSources: [...ObservableInputTuple]): OperatorFunction; +export declare function race(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function raceWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; + +export declare function reduce(accumulator: (acc: A | V, value: V, index: number) => A): OperatorFunction; +export declare function reduce(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export declare function reduce(accumulator: (acc: A | S, value: V, index: number) => A, seed: S): OperatorFunction; + +export declare function refCount(): MonoTypeOperatorFunction; + +export declare function repeat(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction; + +export declare function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction; + +export declare function retry(count?: number): MonoTypeOperatorFunction; +export declare function retry(config: RetryConfig): MonoTypeOperatorFunction; + +export interface RetryConfig { + count?: number; + delay?: number | ((error: any, retryCount: number) => ObservableInput); + resetOnSuccess?: boolean; +} + +export declare function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction; + +export declare function sample(notifier: Observable): MonoTypeOperatorFunction; + +export declare function sampleTime(period: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export declare function scan(accumulator: (acc: A | V, value: V, index: number) => A): OperatorFunction; +export declare function scan(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export declare function scan(accumulator: (acc: A | S, value: V, index: number) => A, seed: S): OperatorFunction; + +export declare function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction; + +export declare function share(): MonoTypeOperatorFunction; +export declare function share(options: ShareConfig): MonoTypeOperatorFunction; + +export interface ShareConfig { + connector?: () => SubjectLike; + resetOnComplete?: boolean | (() => Observable); + resetOnError?: boolean | ((error: any) => Observable); + resetOnRefCountZero?: boolean | (() => Observable); +} + +export declare function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; +export declare function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface ShareReplayConfig { + bufferSize?: number; + refCount: boolean; + scheduler?: SchedulerLike; + windowTime?: number; +} + +export declare function single(predicate: BooleanConstructor): OperatorFunction>; +export declare function single(predicate?: (value: T, index: number, source: Observable) => boolean): MonoTypeOperatorFunction; + +export declare function skip(count: number): MonoTypeOperatorFunction; + +export declare function skipLast(skipCount: number): MonoTypeOperatorFunction; + +export declare function skipUntil(notifier: Observable): MonoTypeOperatorFunction; + +export declare function skipWhile(predicate: BooleanConstructor): OperatorFunction extends never ? never : T>; +export declare function skipWhile(predicate: (value: T, index: number) => true): OperatorFunction; +export declare function skipWhile(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction; + +export declare function startWith(value: null): OperatorFunction; +export declare function startWith(value: undefined): OperatorFunction; +export declare function startWith(...valuesAndScheduler: [...A, SchedulerLike]): OperatorFunction>; +export declare function startWith(...values: A): OperatorFunction>; + +export declare function subscribeOn(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction; + +export declare function switchAll>(): OperatorFunction>; + +export declare function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; +export declare function switchMap>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction>; +export declare function switchMap>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function switchMapTo>(observable: O): OperatorFunction>; +export declare function switchMapTo>(observable: O, resultSelector: undefined): OperatorFunction>; +export declare function switchMapTo>(observable: O, resultSelector: (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R): OperatorFunction; + +export declare function switchScan>(accumulator: (acc: R, value: T, index: number) => O, seed: R): OperatorFunction>; + +export declare function take(count: number): MonoTypeOperatorFunction; + +export declare function takeLast(count: number): MonoTypeOperatorFunction; + +export declare function takeUntil(notifier: ObservableInput): MonoTypeOperatorFunction; + +export declare function takeWhile(predicate: BooleanConstructor, inclusive: true): MonoTypeOperatorFunction; +export declare function takeWhile(predicate: BooleanConstructor, inclusive: false): OperatorFunction>; +export declare function takeWhile(predicate: BooleanConstructor): OperatorFunction>; +export declare function takeWhile(predicate: (value: T, index: number) => value is S): OperatorFunction; +export declare function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; +export declare function takeWhile(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction; + +export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; +export declare function tap(next: (value: T) => void): MonoTypeOperatorFunction; +export declare function tap(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction; + +export declare function throttle(durationSelector: (value: T) => ObservableInput, config?: ThrottleConfig): MonoTypeOperatorFunction; + +export interface ThrottleConfig { + leading?: boolean; + trailing?: boolean; +} + +export declare function throttleTime(duration: number, scheduler?: SchedulerLike, config?: import("./throttle").ThrottleConfig): MonoTypeOperatorFunction; + +export declare function throwIfEmpty(errorFactory?: () => any): MonoTypeOperatorFunction; + +export declare function timeInterval(scheduler?: SchedulerLike): OperatorFunction>; + +export declare function timeout, M = unknown>(config: TimeoutConfig & { + with: (info: TimeoutInfo) => O; +}): OperatorFunction>; +export declare function timeout(config: Omit, 'with'>): OperatorFunction; +export declare function timeout(first: Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export declare function timeout(each: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; + +export interface TimeoutConfig = ObservableInput, M = unknown> { + each?: number; + first?: number | Date; + meta?: M; + scheduler?: SchedulerLike; + with?: (info: TimeoutInfo) => O; +} + +export interface TimeoutInfo { + readonly lastValue: T | null; + readonly meta: M; + readonly seen: number; +} + +export declare function timeoutWith(dueBy: Date, switchTo: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; +export declare function timeoutWith(waitFor: number, switchTo: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; + +export declare function timestamp(timestampProvider?: TimestampProvider): OperatorFunction>; + +export declare function toArray(): OperatorFunction; + +export declare function window(windowBoundaries: Observable): OperatorFunction>; + +export declare function windowCount(windowSize: number, startWindowEvery?: number): OperatorFunction>; + +export declare function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler?: SchedulerLike): OperatorFunction>; +export declare function windowTime(windowTimeSpan: number, windowCreationInterval: number | null | void, maxWindowSize: number, scheduler?: SchedulerLike): OperatorFunction>; + +export declare function windowToggle(openings: ObservableInput, closingSelector: (openValue: O) => ObservableInput): OperatorFunction>; + +export declare function windowWhen(closingSelector: () => ObservableInput): OperatorFunction>; + +export declare function withLatestFrom(...inputs: [...ObservableInputTuple]): OperatorFunction; +export declare function withLatestFrom(...inputs: [...ObservableInputTuple, (...value: [T, ...O]) => R]): OperatorFunction; + +export declare function zip(otherInputs: [...ObservableInputTuple]): OperatorFunction>; +export declare function zip(otherInputsAndProject: [...ObservableInputTuple], project: (...values: Cons) => R): OperatorFunction; +export declare function zip(...otherInputs: [...ObservableInputTuple]): OperatorFunction>; +export declare function zip(...otherInputsAndProject: [...ObservableInputTuple, (...values: Cons) => R]): OperatorFunction; + +export declare function zipAll(): OperatorFunction, T[]>; +export declare function zipAll(): OperatorFunction; +export declare function zipAll(project: (...values: T[]) => R): OperatorFunction, R>; +export declare function zipAll(project: (...values: Array) => R): OperatorFunction; + +export declare function zipWith(...otherInputs: [...ObservableInputTuple]): OperatorFunction>; diff --git a/spec-dtslint/operators/distinct-spec.ts b/spec-dtslint/operators/distinct-spec.ts index 3386ce2bd0..f6034aac17 100644 --- a/spec-dtslint/operators/distinct-spec.ts +++ b/spec-dtslint/operators/distinct-spec.ts @@ -1,5 +1,7 @@ import { of } from 'rxjs'; +import { asInteropObservable } from '../../spec/helpers/interop-helper'; import { distinct } from 'rxjs/operators'; +import { ReadableStreamLike } from '../../src/internal/types'; it('should infer correctly', () => { const o = of(1, 2, 3).pipe(distinct()); // $ExpectType Observable @@ -10,10 +12,79 @@ it('should accept a keySelector', () => { const o = of({ name: 'Tim' } as Person).pipe(distinct(person => person.name)); // $ExpectType Observable }); -it('should accept flushes', () => { +it('should accept observable flush', () => { const o = of(1, 2, 3).pipe(distinct(n => n, of('t', 'i', 'm'))); // $ExpectType Observable }); +it('should accept interop observable flush', () => { + of(1, 2, 3).pipe(distinct(n => n, asInteropObservable(of('t', 'i', 'm')))); // $ExpectType Observable +}); + +it('should accept array-like flush', () => { + of(1, 2, 3).pipe(distinct(n => n, [1,2,3])); // $ExpectType Observable +}); + +it('should accept promise flush', () => { + of(1, 2, 3).pipe(distinct(n => n, Promise.resolve())); // $ExpectType Observable +}); + +it('should accept async iterable flush', () => { + const asyncRange = { + from: 1, + to: 2, + [Symbol.asyncIterator]() { + return { + current: this.from, + last: this.to, + async next() { + await Promise.resolve(); + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(distinct(n => n, asyncRange)); // $ExpectType Observable +}); + +it('should accept iterable flush', () => { + const syncRange = { + from: 1, + to: 2, + [Symbol.iterator]() { + return { + current: this.from, + last: this.to, + next() { + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(distinct(n => n, syncRange)); // $ExpectType Observable +}); + +it('should accept readable stream flush', () => { + const readable: ReadableStreamLike = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + of(1, 2, 3).pipe(distinct(n => n, readable)); // $ExpectType Observable +}); + +it('should error with unsupported flush', () => { + of(1, 2, 3).pipe(distinct(n => n, {})); // $ExpectError +}); + it('should enforce types', () => { const o = of(1, 2, 3).pipe(distinct('F00D')); // $ExpectError }); diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 4b4f55b56a..70ed2c235a 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -1,8 +1,8 @@ -import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; +import { innerFrom } from '../observable/innerFrom'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -56,12 +56,12 @@ import { noop } from '../util/noop'; * @see {@link distinctUntilChanged} * @see {@link distinctUntilKeyChanged} * - * @param {function} [keySelector] Optional function to select which value you want to check as distinct. - * @param {Observable} [flushes] Optional Observable for flushing the internal HashSet of the operator. + * @param keySelector Optional `function` to select which value you want to check as distinct. + * @param flushes Optional `ObservableInput` for flushing the internal HashSet of the operator. * @return A function that returns an Observable that emits items from the * source Observable with distinct values. */ -export function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction { +export function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { const distinctKeys = new Set(); source.subscribe( @@ -74,6 +74,6 @@ export function distinct(keySelector?: (value: T) => K, flushes?: Observab }) ); - flushes?.subscribe(createOperatorSubscriber(subscriber, () => distinctKeys.clear(), noop)); + flushes && innerFrom(flushes).subscribe(createOperatorSubscriber(subscriber, () => distinctKeys.clear(), noop)); }); }