Skip to content

Commit

Permalink
fix(multicast): fix a bug that caused multicast to omit messages afte…
Browse files Browse the repository at this point in the history
…r termination (#2021)

ConnectableObservable as the multicast selector function source argument would omit events after

source termination when used with historic Subjects.
  • Loading branch information
trxcllnt authored and jayphelps committed Oct 24, 2016
1 parent f93cb1d commit 44fbc14
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
21 changes: 21 additions & 0 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions, time

const Observable = Rx.Observable;
const Subject = Rx.Subject;
const ReplaySubject = Rx.ReplaySubject;

/** @test {multicast} */
describe('Observable.prototype.multicast', () => {
Expand Down Expand Up @@ -89,6 +90,26 @@ describe('Observable.prototype.multicast', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should accept a multicast selector and respect the subject\'s messaging semantics', () => {
const source = cold('-1-2-3----4-|');
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new ReplaySubject(1),
x => x.concat(x.takeLast(1)));
const expected1 = '-1-2-3----4-(4|)';
const expected2 = ' -1-2-3----4-(4|)';
const expected3 = ' -1-2-3----4-(4|)';
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
const subscriber3 = hot(' c| ').mergeMapTo(multicasted);

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
Expand Down
8 changes: 4 additions & 4 deletions src/operator/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ export class MulticastOperator<T> implements Operator<T, T> {
constructor(private subjectFactory: () => Subject<T>,
private selector: (source: Observable<T>) => Observable<T>) {
}
call(subscriber: Subscriber<T>, self: any): any {
call(subscriber: Subscriber<T>, source: any): any {
const { selector } = this;
const connectable = new ConnectableObservable(self.source, this.subjectFactory);
const subscription = selector(connectable).subscribe(subscriber);
subscription.add(connectable.connect());
const subject = this.subjectFactory();
const subscription = selector(subject).subscribe(subscriber);
subscription.add(source._subscribe(subject));
return subscription;
}
}

0 comments on commit 44fbc14

Please sign in to comment.