Skip to content

Commit

Permalink
refactor(switchScan): use operate and switchMap.
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Oct 2, 2020
1 parent f593fcb commit 5c788b3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 26 deletions.
5 changes: 1 addition & 4 deletions spec/operators/switchScan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ import { concat, defer, Observable, of } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators';

declare function asDiagram(arg: string): Function;

/** @test {switchScan} */
describe('switchScan', () => {
asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)')
('should map-and-flatten each item to an Observable while passing the accumulated value', () => {
it('should map-and-flatten each item to an Observable while passing the accumulated value', () => {
const e1 = hot('--1-----3--5-------|');
const e1subs = '^ !';
const e2 = cold('x-x-x| ', {x: 10});
Expand Down
47 changes: 26 additions & 21 deletions src/internal/operators/switchScan.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { Observable } from '../Observable';
/** @prettier */
import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { switchMap } from './switchMap';
import { tap } from './tap';
import { operate } from '../util/lift';

/**
* Applies an accumulator function over the source Observable where the
Expand All @@ -17,30 +15,37 @@ import { tap } from './tap';
*
* @see {@link scan}
* @see {@link mergeScan}
* @see {@link switchMap}
*
* @param {function(acc: R, value: T, index: number): Observable<R>} accumulator
* @param accumulator
* The accumulator function called on each source value.
* @param {T|R} [seed] The initial accumulation value.
* @return {Observable<R>} An observable of the accumulated values.
* @method switchScan
* @owner Observable
* @param seed The initial accumulation value.
* @return An observable of the accumulated values.
*/
export function switchScan<T, R, O extends ObservableInput<any>>(
accumulator: (acc: R, value: T, index: number) => O,
seed: R
): OperatorFunction<T, ObservedValueOf<O>> {
return (source: Observable<T>) => source.lift(new SwitchScanOperator(accumulator, seed));
}

class SwitchScanOperator<T, R, O extends ObservableInput<any>> implements Operator<T, O> {
constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<O>, private seed: R) { }
return operate((source, subscriber) => {
// The state we will keep up to date to pass into our
// accumulator function at each new value from the source.
let state: any = seed;

call(subscriber: Subscriber<O>, source: any): any {
let seed: R = this.seed;
// Use `switchMap` on our `source` to do the work of creating
// this operator. Note the backwards order here of `switchMap()(source)`
// to avoid needing to use `pipe` unnecessarily
switchMap(
// On each value from the source, call the accumulator with
// our previous state, the value and the index.
(value: T, index) => accumulator(state, value, index),
// Using the deprecated result selector here as a dirty trick
// to update our state with the flattened value.
(_, innerValue) => ((state = innerValue), innerValue)
)(source).subscribe(subscriber);

return source.pipe(
switchMap((value: T, index: number) => this.accumulator(seed, value, index)),
tap((value: R) => seed = value),
).subscribe(subscriber);
}
return () => {
// Release state on teardown
state = null!;
};
});
}
2 changes: 1 addition & 1 deletion src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ export { skipWhile } from '../internal/operators/skipWhile';
export { startWith } from '../internal/operators/startWith';
export { subscribeOn } from '../internal/operators/subscribeOn';
export { switchAll } from '../internal/operators/switchAll';
export { switchScan } from '../internal/operators/switchScan';
export { switchMap } from '../internal/operators/switchMap';
export { switchMapTo } from '../internal/operators/switchMapTo';
export { switchScan } from '../internal/operators/switchScan';
export { take } from '../internal/operators/take';
export { takeLast } from '../internal/operators/takeLast';
export { takeUntil } from '../internal/operators/takeUntil';
Expand Down

0 comments on commit 5c788b3

Please sign in to comment.