Skip to content

Commit

Permalink
feat(AsyncSubject): add AsyncSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki authored and benlesh committed Dec 2, 2015
1 parent a20325c commit 34c05fe
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ import './operator/zipAll';
import {Subject} from './Subject';
import {Subscription} from './Subscription';
import {Subscriber} from './Subscriber';
import {AsyncSubject} from './subject/AsyncSubject';
import {ReplaySubject} from './subject/ReplaySubject';
import {BehaviorSubject} from './subject/BehaviorSubject';
import {ConnectableObservable} from './observable/ConnectableObservable';
Expand Down Expand Up @@ -156,6 +157,7 @@ export {
Observable,
Subscriber,
Subscription,
AsyncSubject,
ReplaySubject,
BehaviorSubject,
ConnectableObservable,
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import './operator/zipAll';
import {Subject} from './Subject';
import {Subscription} from './Subscription';
import {Subscriber} from './Subscriber';
import {AsyncSubject} from './subject/AsyncSubject';
import {ReplaySubject} from './subject/ReplaySubject';
import {BehaviorSubject} from './subject/BehaviorSubject';
import {ConnectableObservable} from './observable/ConnectableObservable';
Expand All @@ -129,6 +130,7 @@ export {
Observable,
Subscriber,
Subscription,
AsyncSubject,
ReplaySubject,
BehaviorSubject,
ConnectableObservable,
Expand Down
53 changes: 53 additions & 0 deletions src/subjects/AsyncSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {Subject} from '../Subject';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

export class AsyncSubject<T> extends Subject<T> {
_value: T = void 0;
_hasNext: boolean = false;
_isScalar: boolean = false;

constructor () {
super();
}

_subscribe(subscriber: Subscriber<any>): Subscription<T> {
const subscription = super._subscribe(subscriber);
if (!subscription) {
return;
} else if (!subscription.isUnsubscribed && this._hasNext) {
subscriber.next(this._value);
subscriber.complete();
}
return subscription;
}

_next(value: T): void {
this._value = value;
this._hasNext = true;
}

_complete(): void {
let index = -1;
const observers = this.observers;
const len = observers.length;

// optimization -- block next, complete, and unsubscribe while dispatching
this.observers = void 0; // optimization
this.isUnsubscribed = true;

if (this._hasNext) {
while (++index < len) {
let o = observers[index];
o.next(this._value);
o.complete();
}
} else {
while (++index < len) {
observers[index].complete();
}
}

this.isUnsubscribed = false;
}
}

0 comments on commit 34c05fe

Please sign in to comment.