Skip to content

Commit

Permalink
feat(debounceTime): add higher-order lettable version of debounceTime
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jul 25, 2017
1 parent cb8ce46 commit df0d439
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 66 deletions.
69 changes: 3 additions & 66 deletions src/operator/debounceTime.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Operator } from '../Operator';

import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { IScheduler } from '../Scheduler';
import { Subscription, TeardownLogic } from '../Subscription';
import { async } from '../scheduler/async';
import { debounceTime as higherOrder } from '../operators';

/**
* Emits a value from the source Observable only after a particular time span
Expand Down Expand Up @@ -52,67 +51,5 @@ import { async } from '../scheduler/async';
* @owner Observable
*/
export function debounceTime<T>(this: Observable<T>, dueTime: number, scheduler: IScheduler = async): Observable<T> {
return this.lift(new DebounceTimeOperator(dueTime, scheduler));
}

class DebounceTimeOperator<T> implements Operator<T, T> {
constructor(private dueTime: number, private scheduler: IScheduler) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DebounceTimeSubscriber<T> extends Subscriber<T> {
private debouncedSubscription: Subscription = null;
private lastValue: T = null;
private hasValue: boolean = false;

constructor(destination: Subscriber<T>,
private dueTime: number,
private scheduler: IScheduler) {
super(destination);
}

protected _next(value: T) {
this.clearDebounce();
this.lastValue = value;
this.hasValue = true;
this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this));
}

protected _complete() {
this.debouncedNext();
this.destination.complete();
}

debouncedNext(): void {
this.clearDebounce();

if (this.hasValue) {
this.destination.next(this.lastValue);
this.lastValue = null;
this.hasValue = false;
}
}

private clearDebounce(): void {
const debouncedSubscription = this.debouncedSubscription;

if (debouncedSubscription !== null) {
this.remove(debouncedSubscription);
debouncedSubscription.unsubscribe();
this.debouncedSubscription = null;
}
}
}

function dispatchNext(subscriber: DebounceTimeSubscriber<any>) {
subscriber.debouncedNext();
return higherOrder(dueTime, scheduler)(this);
}
119 changes: 119 additions & 0 deletions src/operators/debounceTime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { IScheduler } from '../Scheduler';
import { Subscription, TeardownLogic } from '../Subscription';
import { async } from '../scheduler/async';
import { MonoTypeOperatorFunction } from '../interfaces';

/**
* Emits a value from the source Observable only after a particular time span
* has passed without another source emission.
*
* <span class="informal">It's like {@link delay}, but passes only the most
* recent value from each burst of emissions.</span>
*
* <img src="./img/debounceTime.png" width="100%">
*
* `debounceTime` delays values emitted by the source Observable, but drops
* previous pending delayed emissions if a new value arrives on the source
* Observable. This operator keeps track of the most recent value from the
* source Observable, and emits that only when `dueTime` enough time has passed
* without any other value appearing on the source Observable. If a new value
* appears before `dueTime` silence occurs, the previous value will be dropped
* and will not be emitted on the output Observable.
*
* This is a rate-limiting operator, because it is impossible for more than one
* value to be emitted in any time window of duration `dueTime`, but it is also
* a delay-like operator since output emissions do not occur at the same time as
* they did on the source Observable. Optionally takes a {@link IScheduler} for
* managing timers.
*
* @example <caption>Emit the most recent click after a burst of clicks</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var result = clicks.debounceTime(1000);
* result.subscribe(x => console.log(x));
*
* @see {@link auditTime}
* @see {@link debounce}
* @see {@link delay}
* @see {@link sampleTime}
* @see {@link throttleTime}
*
* @param {number} dueTime The timeout duration in milliseconds (or the time
* unit determined internally by the optional `scheduler`) for the window of
* time required to wait for emission silence before emitting the most recent
* source value.
* @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for
* managing the timers that handle the timeout for each value.
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified `dueTime`, and may drop some values if they occur
* too frequently.
* @method debounceTime
* @owner Observable
*/
export function debounceTime<T>(dueTime: number, scheduler: IScheduler = async): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(new DebounceTimeOperator(dueTime, scheduler));
}

class DebounceTimeOperator<T> implements Operator<T, T> {
constructor(private dueTime: number, private scheduler: IScheduler) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DebounceTimeSubscriber<T> extends Subscriber<T> {
private debouncedSubscription: Subscription = null;
private lastValue: T = null;
private hasValue: boolean = false;

constructor(destination: Subscriber<T>,
private dueTime: number,
private scheduler: IScheduler) {
super(destination);
}

protected _next(value: T) {
this.clearDebounce();
this.lastValue = value;
this.hasValue = true;
this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this));
}

protected _complete() {
this.debouncedNext();
this.destination.complete();
}

debouncedNext(): void {
this.clearDebounce();

if (this.hasValue) {
this.destination.next(this.lastValue);
this.lastValue = null;
this.hasValue = false;
}
}

private clearDebounce(): void {
const debouncedSubscription = this.debouncedSubscription;

if (debouncedSubscription !== null) {
this.remove(debouncedSubscription);
debouncedSubscription.unsubscribe();
this.debouncedSubscription = null;
}
}
}

function dispatchNext(subscriber: DebounceTimeSubscriber<any>) {
subscriber.debouncedNext();
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export { concatMap } from './concatMap';
export { concatMapTo } from './concatMapTo';
export { count } from './count';
export { debounce } from './debounce';
export { debounceTime } from './debounceTime';
export { defaultIfEmpty } from './defaultIfEmpty';
export { dematerialize } from './dematerialize';
export { filter } from './filter';
Expand Down

0 comments on commit df0d439

Please sign in to comment.