-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(refCount): add higher-order lettable version of refCount
NOTE: I am a little worried about a circular dependency here between ConnectableObservable and the lettable refCount
- Loading branch information
Showing
3 changed files
with
97 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Subscription, TeardownLogic } from '../Subscription'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
import { ConnectableObservable } from '../observable/ConnectableObservable'; | ||
import { Observable } from '../Observable'; | ||
|
||
export function refCount<T>(): MonoTypeOperatorFunction<T> { | ||
return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> { | ||
return source.lift(new RefCountOperator(source)); | ||
}; | ||
} | ||
|
||
class RefCountOperator<T> implements Operator<T, T> { | ||
constructor(private connectable: ConnectableObservable<T>) { | ||
} | ||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
|
||
const { connectable } = this; | ||
(<any> connectable)._refCount++; | ||
|
||
const refCounter = new RefCountSubscriber(subscriber, connectable); | ||
const subscription = source.subscribe(refCounter); | ||
|
||
if (!refCounter.closed) { | ||
(<any> refCounter).connection = connectable.connect(); | ||
} | ||
|
||
return subscription; | ||
} | ||
} | ||
|
||
class RefCountSubscriber<T> extends Subscriber<T> { | ||
|
||
private connection: Subscription; | ||
|
||
constructor(destination: Subscriber<T>, | ||
private connectable: ConnectableObservable<T>) { | ||
super(destination); | ||
} | ||
|
||
protected _unsubscribe() { | ||
|
||
const { connectable } = this; | ||
if (!connectable) { | ||
this.connection = null; | ||
return; | ||
} | ||
|
||
this.connectable = null; | ||
const refCount = (<any> connectable)._refCount; | ||
if (refCount <= 0) { | ||
this.connection = null; | ||
return; | ||
} | ||
|
||
(<any> connectable)._refCount = refCount - 1; | ||
if (refCount > 1) { | ||
this.connection = null; | ||
return; | ||
} | ||
|
||
/// | ||
// Compare the local RefCountSubscriber's connection Subscription to the | ||
// connection Subscription on the shared ConnectableObservable. In cases | ||
// where the ConnectableObservable source synchronously emits values, and | ||
// the RefCountSubscriber's downstream Observers synchronously unsubscribe, | ||
// execution continues to here before the RefCountOperator has a chance to | ||
// supply the RefCountSubscriber with the shared connection Subscription. | ||
// For example: | ||
// ``` | ||
// Observable.range(0, 10) | ||
// .publish() | ||
// .refCount() | ||
// .take(5) | ||
// .subscribe(); | ||
// ``` | ||
// In order to account for this case, RefCountSubscriber should only dispose | ||
// the ConnectableObservable's shared connection Subscription if the | ||
// connection Subscription exists, *and* either: | ||
// a. RefCountSubscriber doesn't have a reference to the shared connection | ||
// Subscription yet, or, | ||
// b. RefCountSubscriber's connection Subscription reference is identical | ||
// to the shared connection Subscription | ||
/// | ||
const { connection } = this; | ||
const sharedConnection = (<any> connectable)._connection; | ||
this.connection = null; | ||
|
||
if (sharedConnection && (!connection || sharedConnection === connection)) { | ||
sharedConnection.unsubscribe(); | ||
} | ||
} | ||
} |