Skip to content

Commit

Permalink
fix(typings): fixed some cases where multicast and publish would not …
Browse files Browse the repository at this point in the history
…return a ConnectableObservable (#3320)
  • Loading branch information
david-driscoll authored and benlesh committed Feb 20, 2018
1 parent f7e4c02 commit ddffecc
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 138 deletions.
97 changes: 49 additions & 48 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import { expect } from 'chai';
import * as Rx from '../../src/Rx';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { hot, cold, expectObservable, expectSubscriptions, time } from '../helpers/marble-testing';

declare const type: Function;
declare const asDiagram: Function;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const ReplaySubject = Rx.ReplaySubject;

/** @test {multicast} */
describe('Observable.prototype.multicast', () => {
asDiagram('multicast(() => new Subject())')('should mirror a simple source Observable', () => {
asDiagram('multicast(() => new Subject<string>())')('should mirror a simple source Observable', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = '^ !';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const expected = '--1-2---3-4--5-|';

expectObservable(multicasted).toBe(expected);
Expand All @@ -23,12 +24,12 @@ describe('Observable.prototype.multicast', () => {
multicasted.connect();
});

it('should accept Subjects', (done: MochaDone) => {
it('should accept Subjects', (done) => {
const expected = [1, 2, 3, 4];

const connectable = Observable.of(1, 2, 3, 4).multicast(new Subject<number>());

connectable.subscribe((x: number) => { expect(x).to.equal(expected.shift()); },
connectable.subscribe((x) => { expect(x).to.equal(expected.shift()); },
(x) => {
done(new Error('should not be called'));
}, () => {
Expand All @@ -38,7 +39,7 @@ describe('Observable.prototype.multicast', () => {
connectable.connect();
});

it('should multicast a ConnectableObservable', (done: MochaDone) => {
it('should multicast a ConnectableObservable', (done) => {
const expected = [1, 2, 3, 4];

const source = new Subject<number>();
Expand All @@ -55,7 +56,7 @@ describe('Observable.prototype.multicast', () => {
source.complete();

replayed.do({
next(x: number) {
next(x) {
expect(x).to.equal(expected.shift());
},
complete() {
Expand All @@ -65,12 +66,12 @@ describe('Observable.prototype.multicast', () => {
.subscribe(null, done, done);
});

it('should accept Subject factory functions', (done: MochaDone) => {
it('should accept Subject factory functions', (done) => {
const expected = [1, 2, 3, 4];

const connectable = Observable.of(1, 2, 3, 4).multicast(() => new Subject<number>());

connectable.subscribe((x: number) => { expect(x).to.equal(expected.shift()); },
connectable.subscribe((x) => { expect(x).to.equal(expected.shift()); },
(x) => {
done(new Error('should not be called'));
}, () => {
Expand All @@ -85,8 +86,8 @@ describe('Observable.prototype.multicast', () => {
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a: string, b: string) => (parseInt(a) + parseInt(b)).toString()));
const multicasted = source.multicast(() => new Subject<string>(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-2-4-6----8-|';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
Expand All @@ -105,8 +106,8 @@ describe('Observable.prototype.multicast', () => {
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a: string, b: string) => (parseInt(a) + parseInt(b)).toString()));
const multicasted = source.multicast(() => new Subject<string>(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const expected1 = '-2-4-6----8-|';
const expected2 = ' -2-4-6----8-|';
const expected3 = ' -2-4-6----8-|';
Expand All @@ -125,7 +126,7 @@ describe('Observable.prototype.multicast', () => {
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new ReplaySubject(1),
const multicasted = source.multicast(() => new ReplaySubject<string>(1),
x => x.concat(x.takeLast(1)));
const expected1 = '-1-2-3----4-(4|)';
const expected2 = ' -1-2-3----4-(4|)';
Expand All @@ -142,8 +143,8 @@ describe('Observable.prototype.multicast', () => {

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
const multicasted = source.multicast(() => new Subject());
const sourceSubs: string[] = [];
const multicasted = source.multicast(() => new Subject<string>());
const expected = '-';

expectObservable(multicasted).toBe(expected);
Expand All @@ -153,7 +154,7 @@ describe('Observable.prototype.multicast', () => {
it('should multicast the same values to multiple observers', () => {
const source = cold('-1-2-3----4-|');
const sourceSubs = '^ !';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-1-2-3----4-|';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
Expand All @@ -172,7 +173,7 @@ describe('Observable.prototype.multicast', () => {
it('should multicast an error from the source to multiple observers', () => {
const source = cold('-1-2-3----4-#');
const sourceSubs = '^ !';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-1-2-3----4-#';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
Expand All @@ -192,7 +193,7 @@ describe('Observable.prototype.multicast', () => {
'but is unsubscribed explicitly and early', () => {
const source = cold('-1-2-3----4-|');
const sourceSubs = '^ ! ';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const unsub = ' u ';
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-1-2-3---- ';
Expand All @@ -207,7 +208,7 @@ describe('Observable.prototype.multicast', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

// Set up unsubscription action
let connection;
let connection: Rx.Subscription;
expectObservable(hot(unsub).do(() => {
connection.unsubscribe();
})).toBe(unsub);
Expand All @@ -219,7 +220,7 @@ describe('Observable.prototype.multicast', () => {
const source = cold('-1-2-3----4-|');
const sourceSubs = '^ ! ';
const multicasted = source
.mergeMap((x: string) => Observable.of(x))
.mergeMap((x) => Observable.of(x))
.multicast(() => new Subject<string>());
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-1-2-3---- ';
Expand All @@ -235,7 +236,7 @@ describe('Observable.prototype.multicast', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

// Set up unsubscription action
let connection;
let connection: Rx.Subscription;
expectObservable(hot(unsub).do(() => {
connection.unsubscribe();
})).toBe(unsub);
Expand All @@ -246,7 +247,7 @@ describe('Observable.prototype.multicast', () => {
it('should multicast an empty source', () => {
const source = cold('|');
const sourceSubs = '(^!)';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const expected = '|';

expectObservable(multicasted).toBe(expected);
Expand All @@ -258,7 +259,7 @@ describe('Observable.prototype.multicast', () => {
it('should multicast a never source', () => {
const source = cold('-');
const sourceSubs = '^';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const expected = '-';

expectObservable(multicasted).toBe(expected);
Expand All @@ -270,7 +271,7 @@ describe('Observable.prototype.multicast', () => {
it('should multicast a throw source', () => {
const source = cold('#');
const sourceSubs = '(^!)';
const multicasted = source.multicast(() => new Subject());
const multicasted = source.multicast(() => new Subject<string>());
const expected = '#';

expectObservable(multicasted).toBe(expected);
Expand All @@ -283,7 +284,7 @@ describe('Observable.prototype.multicast', () => {
it('should connect when first subscriber subscribes', () => {
const source = cold( '-1-2-3----4-|');
const sourceSubs = ' ^ !';
const multicasted = source.multicast(() => new Subject()).refCount();
const multicasted = source.multicast(() => new Subject<string>()).refCount();
const subscriber1 = hot(' a| ').mergeMapTo(multicasted);
const expected1 = ' -1-2-3----4-|';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
Expand All @@ -300,7 +301,7 @@ describe('Observable.prototype.multicast', () => {
it('should disconnect when last subscriber unsubscribes', () => {
const source = cold( '-1-2-3----4-|');
const sourceSubs = ' ^ ! ';
const multicasted = source.multicast(() => new Subject()).refCount();
const multicasted = source.multicast(() => new Subject<string>()).refCount();
const subscriber1 = hot(' a| ').mergeMapTo(multicasted);
const unsub1 = ' ! ';
const expected1 = ' -1-2-3-- ';
Expand All @@ -314,7 +315,7 @@ describe('Observable.prototype.multicast', () => {
});

it('should be retryable when cold source is synchronous', () => {
function subjectFactory() { return new Subject(); }
function subjectFactory() { return new Subject<string>(); }
const source = cold('(123#)');
const multicasted = source.multicast(subjectFactory).refCount();
const subscribe1 = 's ';
Expand Down Expand Up @@ -370,7 +371,7 @@ describe('Observable.prototype.multicast', () => {
});

it('should be repeatable when cold source is synchronous', () => {
function subjectFactory() { return new Subject(); }
function subjectFactory() { return new Subject<string>(); }
const source = cold('(123|)');
const multicasted = source.multicast(subjectFactory).refCount();
const subscribe1 = 's ';
Expand Down Expand Up @@ -430,7 +431,7 @@ describe('Observable.prototype.multicast', () => {
});

it('should be retryable', () => {
function subjectFactory() { return new Subject(); }
function subjectFactory() { return new Subject<string>(); }
const source = cold('-1-2-3----4-# ');
const sourceSubs = ['^ ! ',
' ^ ! ',
Expand Down Expand Up @@ -472,7 +473,7 @@ describe('Observable.prototype.multicast', () => {
});

it('should be repeatable', () => {
function subjectFactory() { return new Subject(); }
function subjectFactory() { return new Subject<string>(); }
const source = cold('-1-2-3----4-| ');
const sourceSubs = ['^ ! ',
' ^ ! ',
Expand Down Expand Up @@ -518,12 +519,12 @@ describe('Observable.prototype.multicast', () => {
});
});

it('should multicast one observable to multiple observers', (done: MochaDone) => {
const results1 = [];
const results2 = [];
it('should multicast one observable to multiple observers', (done) => {
const results1: string[] = [];
const results2: string[] = [];
let subscriptions = 0;

const source = new Observable((observer: Rx.Observer<number>) => {
const source = new Observable<number>((observer) => {
subscriptions++;
observer.next(1);
observer.next(2);
Expand All @@ -533,14 +534,14 @@ describe('Observable.prototype.multicast', () => {
});

const connectable = source.multicast(() => {
return new Subject();
return new Subject<string>();
});

connectable.subscribe((x: number) => {
connectable.subscribe((x) => {
results1.push(x);
});

connectable.subscribe((x: number) => {
connectable.subscribe((x) => {
results2.push(x);
});

Expand All @@ -562,7 +563,7 @@ describe('Observable.prototype.multicast', () => {

const source = Observable.from([1, 2, 3, 4]).multicast(subject);

source.subscribe((x: number) => {
source.subscribe((x) => {
expect(x).to.equal(expected[i++]);
});

Expand All @@ -571,19 +572,19 @@ describe('Observable.prototype.multicast', () => {
});

describe('when given a subject factory', () => {
it('should allow you to reconnect by subscribing again', (done: MochaDone) => {
it('should allow you to reconnect by subscribing again', (done) => {
const expected = [1, 2, 3, 4];
let i = 0;

const source = Observable.of(1, 2, 3, 4).multicast(() => new Subject<number>());

source.subscribe((x: number) => {
source.subscribe((x) => {
expect(x).to.equal(expected[i++]);
}, null,
() => {
i = 0;

source.subscribe((x: number) => {
source.subscribe((x) => {
expect(x).to.equal(expected[i++]);
}, null, done);

Expand All @@ -594,16 +595,16 @@ describe('Observable.prototype.multicast', () => {
});

it('should not throw ObjectUnsubscribedError when used in ' +
'a switchMap', (done: MochaDone) => {
'a switchMap', (done) => {
const source = Observable.of(1, 2, 3)
.multicast(() => new Subject<number>())
.refCount();

const expected = ['a1', 'a2', 'a3', 'b1', 'b2', 'b3', 'c1', 'c2', 'c3'];

Observable.of('a', 'b', 'c')
.switchMap((letter: string) => source.map((n: number) => String(letter + n)))
.subscribe((x: string) => {
.switchMap((letter) => source.map((n) => String(letter + n)))
.subscribe((x) => {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
Expand All @@ -616,16 +617,16 @@ describe('Observable.prototype.multicast', () => {

describe('when given a subject', () => {
it('should not throw ObjectUnsubscribedError when used in ' +
'a switchMap', (done: MochaDone) => {
'a switchMap', (done) => {
const source = Observable.of(1, 2, 3)
.multicast(new Subject<number>())
.refCount();

const expected = ['a1', 'a2', 'a3'];

Observable.of('a', 'b', 'c')
.switchMap((letter: string) => source.map((n: number) => String(letter + n)))
.subscribe((x: string) => {
.switchMap((letter) => source.map((n) => String(letter + n)))
.subscribe((x) => {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
Expand Down
Loading

0 comments on commit ddffecc

Please sign in to comment.