diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 925d19215f..acc199a031 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -4,6 +4,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions, time const Observable = Rx.Observable; const Subject = Rx.Subject; +const ReplaySubject = Rx.ReplaySubject; /** @test {multicast} */ describe('Observable.prototype.multicast', () => { @@ -89,6 +90,26 @@ describe('Observable.prototype.multicast', () => { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); + it('should accept a multicast selector and respect the subject\'s messaging semantics', () => { + const source = cold('-1-2-3----4-|'); + const sourceSubs = ['^ !', + ' ^ !', + ' ^ !']; + const multicasted = source.multicast(() => new ReplaySubject(1), + x => x.concat(x.takeLast(1))); + const expected1 = '-1-2-3----4-(4|)'; + const expected2 = ' -1-2-3----4-(4|)'; + const expected3 = ' -1-2-3----4-(4|)'; + const subscriber1 = hot('a| ').mergeMapTo(multicasted); + const subscriber2 = hot(' b| ').mergeMapTo(multicasted); + const subscriber3 = hot(' c| ').mergeMapTo(multicasted); + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + it('should do nothing if connect is not called, despite subscriptions', () => { const source = cold('--1-2---3-4--5-|'); const sourceSubs = []; diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index f0302f17bd..de06574900 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -56,11 +56,11 @@ export class MulticastOperator implements Operator { constructor(private subjectFactory: () => Subject, private selector: (source: Observable) => Observable) { } - call(subscriber: Subscriber, self: any): any { + call(subscriber: Subscriber, source: any): any { const { selector } = this; - const connectable = new ConnectableObservable(self.source, this.subjectFactory); - const subscription = selector(connectable).subscribe(subscriber); - subscription.add(connectable.connect()); + const subject = this.subjectFactory(); + const subscription = selector(subject).subscribe(subscriber); + subscription.add(source._subscribe(subject)); return subscription; } }