Skip to content

Commit

Permalink
fix(Subject): correct Subject behaviors to be more like Rx4
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Subjects no longer duck-type as Subscriptions
BREAKING CHANGE: Subjects will no longer throw when re-subscribed to if they are not unsubscribed
BREAKING CHANGE: Subjects no longer automatically unsubscribe when completed or errored
BREAKING CAHNGE: Minor scheduling changes to groupBy to ensure proper emission ordering
  • Loading branch information
benlesh committed May 22, 2016
1 parent 08f3d38 commit ba9ef2b
Show file tree
Hide file tree
Showing 21 changed files with 482 additions and 813 deletions.
169 changes: 35 additions & 134 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ describe('Subject', () => {
subject.complete();
});

it('should have the rxSubscriber Symbol', () => {
const subject = new Subject();
expect(subject[Rx.Symbol.rxSubscriber]).to.be.a('function');
});

it('should pump values to multiple subscribers', (done: MochaDone) => {
const subject = new Subject();
const expected = ['foo', 'bar'];
Expand Down Expand Up @@ -276,82 +271,7 @@ describe('Subject', () => {
expect(results3).to.deep.equal([]);
});

it('should allow ad-hoc subscription to be added to itself', () => {
const subject = new Subject();
const results1 = [];
const results2 = [];

const auxSubject = new Subject();

const subscription1 = subject.subscribe(
function (x) { results1.push(x); },
function (e) { results1.push('E'); },
() => { results1.push('C'); }
);
const subscription2 = auxSubject.subscribe(
function (x) { results2.push(x); },
function (e) { results2.push('E'); },
() => { results2.push('C'); }
);

subject.add(subscription2);

subject.next(1);
subject.next(2);
subject.next(3);
auxSubject.next('a');
auxSubject.next('b');

subscription1.unsubscribe();
subject.unsubscribe();

auxSubject.next('c');
auxSubject.next('d');

expect(results1).to.deep.equal([1, 2, 3]);
expect(subscription2.isUnsubscribed).to.be.true;
expect(results2).to.deep.equal(['a', 'b']);
});

it('should allow ad-hoc subscription to be removed from itself', () => {
const subject = new Subject();
const results1 = [];
const results2 = [];

const auxSubject = new Subject();

const subscription1 = subject.subscribe(
function (x) { results1.push(x); },
function (e) { results1.push('E'); },
() => { results1.push('C'); }
);
const subscription2 = auxSubject.subscribe(
function (x) { results2.push(x); },
function (e) { results2.push('E'); },
() => { results2.push('C'); }
);

subject.add(subscription2);

subject.next(1);
subject.next(2);
subject.next(3);
auxSubject.next('a');
auxSubject.next('b');

subject.remove(subscription2);
subscription1.unsubscribe();
subject.unsubscribe();

auxSubject.next('c');
auxSubject.next('d');

expect(results1).to.deep.equal([1, 2, 3]);
expect(subscription2.isUnsubscribed).to.be.false;
expect(results2).to.deep.equal(['a', 'b', 'c', 'd']);
});

it('should not allow values to be nexted after a return', (done: MochaDone) => {
it('should not allow values to be nexted after it is unsubscribed', (done: MochaDone) => {
const subject = new Subject();
const expected = ['foo'];

Expand All @@ -360,7 +280,7 @@ describe('Subject', () => {
});

subject.next('foo');
subject.complete();
subject.unsubscribe();
expect(() => subject.next('bar')).to.throw(Rx.ObjectUnsubscribedError);
done();
});
Expand Down Expand Up @@ -528,38 +448,24 @@ describe('Subject', () => {
}).to.throw(Rx.ObjectUnsubscribedError);
});

it('should throw ObjectUnsubscribedError when emit after completed', () => {
it('should not next after completed', () => {
const subject = new Rx.Subject();
const results = [];
subject.subscribe(x => results.push(x), null, () => results.push('C'));
subject.next('a');
subject.complete();

expect(() => {
subject.next('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.error('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.complete();
}).to.throw(Rx.ObjectUnsubscribedError);
subject.next('b');
expect(results).to.deep.equal(['a', 'C']);
});

it('should throw ObjectUnsubscribedError when emit after error', () => {
it('should not next after error', () => {
const subject = new Rx.Subject();
subject.error('e');

expect(() => {
subject.next('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.error('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.complete();
}).to.throw(Rx.ObjectUnsubscribedError);
const results = [];
subject.subscribe(x => results.push(x), (err) => results.push(err));
subject.next('a');
subject.error(new Error('wut?'));
subject.next('b');
expect(results).to.deep.equal(['a', new Error('wut?')]);
});

describe('asObservable', () => {
Expand Down Expand Up @@ -600,43 +506,38 @@ describe('Subject', () => {
expectObservable(observable).toBe(expected);
});

it('should work with inherited subject', (done: MochaDone) => {
it('should work with inherited subject', () => {
const results = [];
const subject = new Rx.AsyncSubject();

subject.next(42);
subject.complete();

const observable = subject.asObservable();

const expected = [new Rx.Notification('N', 42),
new Rx.Notification('C')];
observable.subscribe(x => results.push(x), null, () => results.push('done'));

observable.materialize().subscribe((x: Rx.Notification<number>) => {
expect(x).to.deep.equal(expected.shift());
}, (err: any) => {
done(err);
}, () => {
expect(expected).to.deep.equal([]);
done();
});
expect(results).to.deep.equal([42, 'done']);
});
});
});

it('should not eager', () => {
let subscribed = false;
describe('AnonymousSubject', () => {
it('should not eager', () => {
let subscribed = false;

const subject = new Rx.Subject(null, new Rx.Observable((observer: Rx.Observer<any>) => {
subscribed = true;
const subscription = Rx.Observable.of('x').subscribe(observer);
return () => {
subscription.unsubscribe();
};
}));
const subject = Rx.Subject.create(null, new Rx.Observable((observer: Rx.Observer<any>) => {
subscribed = true;
const subscription = Rx.Observable.of('x').subscribe(observer);
return () => {
subscription.unsubscribe();
};
}));

const observable = subject.asObservable();
expect(subscribed).to.be.false;
const observable = subject.asObservable();
expect(subscribed).to.be.false;

observable.subscribe();
expect(subscribed).to.be.true;
});
observable.subscribe();
expect(subscribed).to.be.true;
});
});
5 changes: 0 additions & 5 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ const Subscriber = Rx.Subscriber;

/** @test {Subscriber} */
describe('Subscriber', () => {
it('should have the rxSubscriber symbol', () => {
const sub = new Subscriber();
expect(sub[Rx.Symbol.rxSubscriber]()).to.equal(sub);
});

describe('when created through create()', () => {
it('should not call error() if next() handler throws an error', () => {
const errorSpy = sinon.spy();
Expand Down
58 changes: 36 additions & 22 deletions spec/operators/cache-spec.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,45 @@
import * as Rx from '../../dist/cjs/Rx';
import {expect} from 'chai';
declare const {hot, cold, time, expectObservable};

declare const rxTestScheduler: Rx.TestScheduler;

/** @test {cache} */
describe('Observable.prototype.cache', () => {
it('should just work™', () => {
let subs = 0;
const source = Rx.Observable.create(observer => {
subs++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).cache();
let results = [];
source.subscribe(x => results.push(x));
expect(results).to.deep.equal([1, 2, 3]);
expect(subs).to.equal(1);
results = [];
source.subscribe(x => results.push(x));
expect(results).to.deep.equal([1, 2, 3]);
expect(subs).to.equal(1);
});

it('should replay values upon subscription', () => {
const s1 = hot('---^---a---b---c---| ').cache();
const expected1 = '----a---b---c---| ';
const expected2 = ' (abc|)';
const t = time( '----------------|');
const s1 = hot( '----a---b---c---| ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---| ';
const expected2 = ' (abc|)';
const sub2 = '------------------| ';

expectObservable(s1).toBe(expected1);

rxTestScheduler.schedule(() => {
expectObservable(s1).toBe(expected2);
}, t);
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), time(sub2));
});

it('should replay values and error', () => {
const s1 = hot('---^---a---b---c---# ').cache();
const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---# ';
const expected2 = ' (abc#)';
const t = time( '----------------|');
const expected2 = ' (abc#)';
const t = time( '------------------|');

expectObservable(s1).toBe(expected1);

Expand All @@ -32,7 +49,7 @@ describe('Observable.prototype.cache', () => {
});

it('should replay values and and share', () => {
const s1 = hot('---^---a---b---c------------d--e--f-|').cache();
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c------------d--e--f-|';
const expected2 = ' (abc)----d--e--f-|';
const t = time( '----------------|');
Expand All @@ -58,16 +75,13 @@ describe('Observable.prototype.cache', () => {
});

it('should have a bufferCount that limits the replay test 2', () => {
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(2);
const s1 = hot( '----a---b---c------------d--e--f-|').cache(2);
const expected1 = '----a---b---c------------d--e--f-|';
const expected2 = ' (bc)-----d--e--f-|';
const t = time( '----------------|');

expectObservable(s1).toBe(expected1);

rxTestScheduler.schedule(() => {
expectObservable(s1).toBe(expected2);
}, t);
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), t);
});

it('should accept a windowTime that limits the replay', () => {
Expand All @@ -85,7 +99,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle empty', () => {
const s1 = cold('|').cache();
const s1 = cold('|').cache(undefined, undefined, rxTestScheduler);
const expected1 = '|';
const expected2 = ' |';
const t = time( '----------------|');
Expand All @@ -98,7 +112,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle throw', () => {
const s1 = cold('#').cache();
const s1 = cold('#').cache(undefined, undefined, rxTestScheduler);
const expected1 = '#';
const expected2 = ' #';
const t = time( '----------------|');
Expand All @@ -111,7 +125,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle never', () => {
const s1 = cold('-').cache();
const s1 = cold('-').cache(undefined, undefined, rxTestScheduler);
const expected1 = '-';
const expected2 = ' -';
const t = time( '----------------|');
Expand All @@ -124,7 +138,7 @@ describe('Observable.prototype.cache', () => {
});

it('should multicast a completion', () => {
const s1 = hot('--a--^--b------c-----d------e-|').cache();
const s1 = hot('--a--^--b------c-----d------e-|').cache(undefined, undefined, rxTestScheduler);
const t1 = time( '| ');
const e1 = '---b------c-----d------e-|';
const t2 = time( '----------| ');
Expand All @@ -142,7 +156,7 @@ describe('Observable.prototype.cache', () => {
});

it('should multicast an error', () => {
const s1 = hot('--a--^--b------c-----d------e-#').cache();
const s1 = hot('--a--^--b------c-----d------e-#').cache(undefined, undefined, rxTestScheduler);
const t1 = time( '| ');
const e1 = '---b------c-----d------e-#';
const t2 = time( '----------| ');
Expand Down
Loading

0 comments on commit ba9ef2b

Please sign in to comment.