forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(switchScan): Make switchScan behavior more close to
scan()
Closes ReactiveX#2931
- Loading branch information
Showing
7 changed files
with
122 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import { of } from 'rxjs'; | ||
import { switchScan } from 'rxjs/operators'; | ||
|
||
it('should infer correctly', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)))); // $ExpectType Observable<boolean> | ||
}); | ||
|
||
it('should infer correctly when using a single type', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v))); // $ExpectType Observable<number> | ||
}); | ||
|
||
it('should infer correctly when using a seed', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable<number> | ||
}); | ||
|
||
it('should infer correctly when using seed of a different type', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc: string, v: number) => of(acc + v), '0')); // $ExpectType Observable<string> | ||
}); | ||
|
||
it('should support a projector that takes an index', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index) => of(Boolean(v)))); // $ExpectType Observable<boolean> | ||
}); | ||
|
||
it('should enforce types', () => { | ||
const o = of(1, 2, 3).pipe(switchScan()); // $ExpectError | ||
}); | ||
|
||
it('should enforce the return type to be Observable', () => { | ||
const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError | ||
}); | ||
|
||
it('should enforce seed and the return type from accumulator', () => { | ||
const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectError | ||
}); | ||
|
||
it('should enforce seed and accumulator to have the same type', () => { | ||
const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,61 @@ | ||
import { Observable } from '../Observable'; | ||
import { ObservableInput, OperatorFunction } from '../types'; | ||
import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction } from '../types'; | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { of } from '../observable/of'; | ||
import { switchMap } from './switchMap'; | ||
import { tap } from './tap'; | ||
|
||
export function switchScan<T>(accumulator: (acc: T, value: T, index: number) => ObservableInput<T>, seed?: T): MonoTypeOperatorFunction<T>; | ||
export function switchScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, seed?: R): OperatorFunction<T, R>; | ||
|
||
/** | ||
* Applies an accumulator function over the source Observable where the | ||
* accumulator function itself returns an Observable, emitting values | ||
* only from the most recently returned Observable. | ||
* | ||
* <span class="informal">It's like {@link scan}, but only to most recent | ||
* Observable returned by the accumulator is merged into the outer Observable.</span> | ||
* | ||
* @see {@link scan} | ||
* @see {@link mergeScan} | ||
* | ||
* @param {function(acc: R, value: T, index: number): Observable<R>} accumulator | ||
* The accumulator function called on each source value. | ||
* @param {T|R} [seed] The initial accumulation value. | ||
* @return {Observable<R>} An observable of the accumulated values. | ||
* @method switchScan | ||
* @owner Observable | ||
*/ | ||
export function switchScan<T, R>( | ||
project: (acc: R, value: T, index: number) => ObservableInput<R>, | ||
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, | ||
seed?: R | ||
): OperatorFunction<T, R> { | ||
return (source: Observable<T>) => source.lift(new SwitchScanOperator(project, seed)); | ||
let hasSeed = false; | ||
// The same reason as described in `scan` https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/scan.ts#L54-L58 | ||
if (arguments.length >= 2) { | ||
hasSeed = true; | ||
} | ||
|
||
return (source: Observable<T>) => source.lift(new SwitchScanOperator(accumulator, seed, hasSeed)); | ||
} | ||
|
||
class SwitchScanOperator<T, R> implements Operator<T, R> { | ||
private acc: R; | ||
|
||
constructor(private project: (acc: R, value: T, index: number) => ObservableInput<R>, | ||
seed: R | ||
) { | ||
this.acc = seed; | ||
} | ||
constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, private seed: R, private hasSeed: boolean = false) { } | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
const wrappedProject = (value: T, index: number): ObservableInput<R> => | ||
this.project(this.acc, value, index); | ||
|
||
return source.pipe( | ||
switchMap(wrappedProject), | ||
tap((value: R) => this.acc = value), | ||
switchMap((value: T, index: number): ObservableInput<R> => { | ||
if (this.hasSeed) { | ||
return this.accumulator(this.seed, value, index); | ||
} else { | ||
return of(value); | ||
} | ||
}), | ||
tap((value: R) => { | ||
this.seed = value; | ||
this.hasSeed = true; | ||
}), | ||
).subscribe(subscriber); | ||
} | ||
} |