Skip to content

Commit

Permalink
feat(window): windowBoundaries should support ObservableInput
Browse files Browse the repository at this point in the history
  • Loading branch information
jakovljevic-mladen committed Oct 17, 2022
1 parent afac3d5 commit 75fdb84
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 11 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ export declare class VirtualTimeScheduler extends AsyncScheduler {
static frameTimeFactor: number;
}

export declare function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>>;
export declare function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>>;

export declare function windowCount<T>(windowSize: number, startWindowEvery?: number): OperatorFunction<T, Observable<T>>;

Expand Down
2 changes: 1 addition & 1 deletion api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ export declare function timestamp<T>(timestampProvider?: TimestampProvider): Ope

export declare function toArray<T>(): OperatorFunction<T, T[]>;

export declare function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>>;
export declare function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>>;

export declare function windowCount<T>(windowSize: number, startWindowEvery?: number): OperatorFunction<T, Observable<T>>;

Expand Down
7 changes: 6 additions & 1 deletion spec-dtslint/operators/window-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@ it('should infer correctly', () => {
});

it('should enforce types', () => {
of(1).pipe(window('')); // $ExpectError
of(1).pipe(window()); // $ExpectError
of(1).pipe(window(6)); // $ExpectError
});

it('should support Promises', () => {
of(1, 2, 3).pipe(window(Promise.resolve('foo'))); // $ExpectType Observable<Observable<number>>
});
44 changes: 42 additions & 2 deletions spec/operators/window-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { window, mergeMap } from 'rxjs/operators';
import { window, mergeMap, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { EMPTY, of, Observable } from 'rxjs';
import { EMPTY, of, Observable, interval } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';
import { expect } from 'chai';

/** @test {window} */
describe('window', () => {
Expand Down Expand Up @@ -280,4 +281,43 @@ describe('window', () => {
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});
});

it('should window when Promise resolves', (done) => {
const e1 = interval(3).pipe(take(5));
let pos = 0;
const result: number[][] = [[], []];
const expected = [
[0, 1],
[2, 3, 4],
];

e1.pipe(window(new Promise<void>((resolve) => setTimeout(() => resolve(), 8)))).subscribe({
next: (x) => {
x.subscribe({
next: (v) => result[pos].push(v),
complete: () => pos++,
});
},
error: () => done(new Error('should not be called')),
complete: () => {
expect(result).to.deep.equal(expected);
done();
},
});
});

it('should raise error when Promise rejects', (done) => {
const e1 = interval(1).pipe(take(5));
const error = new Error('err');

e1.pipe(window(Promise.reject(error))).subscribe({
error: (err) => {
expect(err).to.be.an('error');
done();
},
complete: () => {
done(new Error('should not be called'));
},
});
});
});
14 changes: 8 additions & 6 deletions src/internal/operators/window.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { OperatorFunction, ObservableInput } from '../types';
import { Subject } from '../Subject';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
import { innerFrom } from '../observable/innerFrom';

/**
* Branch out the source Observable values as a nested Observable whenever
Expand All @@ -17,8 +18,9 @@ import { noop } from '../util/noop';
* Returns an Observable that emits windows of items it collects from the source
* Observable. The output Observable emits connected, non-overlapping
* windows. It emits the current window and opens a new one whenever the
* Observable `windowBoundaries` emits an item. Because each window is an
* Observable, the output is a higher-order Observable.
* `windowBoundaries` emits an item. `windowBoundaries` can be any type that
* `ObservableInput` accepts. It internally gets converted to an Observable.
* Because each window is an Observable, the output is a higher-order Observable.
*
* ## Example
*
Expand All @@ -43,12 +45,12 @@ import { noop } from '../util/noop';
* @see {@link windowWhen}
* @see {@link buffer}
*
* @param {Observable<any>} windowBoundaries An Observable that completes the
* @param windowBoundaries An `ObservableInput` that completes the
* previous window and starts a new window.
* @return A function that returns an Observable of windows, which are
* Observables emitting values of the source Observable.
*/
export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> {
return operate((source, subscriber) => {
let windowSubject: Subject<T> = new Subject<T>();

Expand All @@ -73,7 +75,7 @@ export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T
);

// Subscribe to the window boundaries.
windowBoundaries.subscribe(
innerFrom(windowBoundaries).subscribe(
createOperatorSubscriber(
subscriber,
() => {
Expand Down

0 comments on commit 75fdb84

Please sign in to comment.