Skip to content

Commit

Permalink
fix(cache): get correct caching behavior (#1765)
Browse files Browse the repository at this point in the history
This is an initial pass at fixing the cache operator. There is still a lot to do. Ideally, the cache operator would use the lift mechanism. Also this is not well optimized as it is introducing a lot of closures. But it works, and that's the point for now.

fixes #1628
  • Loading branch information
benlesh authored Jun 14, 2016
1 parent 0a6c4e8 commit cb0b806
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
34 changes: 30 additions & 4 deletions spec/operators/cache-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ describe('Observable.prototype.cache', () => {
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), time(sub2));
});

it('should replay values and error', () => {
const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---# ';
const expected2 = ' (abc#)';
it('should not replay values after error with a hot observable', () => {
const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---# ';
const expected2 = ' -';
const t = time( '------------------|');

expectObservable(s1).toBe(expected1);
Expand All @@ -48,6 +48,19 @@ describe('Observable.prototype.cache', () => {
}, t);
});

it('should be resubscribable after error with a cold observable', () => {
const s1 = cold( '----a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---# ';
const expected2 = ' ----a---b---c---#';
const t = time( '------------------| ');

expectObservable(s1).toBe(expected1);

rxTestScheduler.schedule(() => {
expectObservable(s1).toBe(expected2);
}, t);
});

it('should replay values and and share', () => {
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c------------d--e--f-|';
Expand Down Expand Up @@ -193,4 +206,17 @@ describe('Observable.prototype.cache', () => {
expectObservable(s1).toBe(e3);
}, t2);
});

it('should be retryable', () => {
const source = cold('--1-2-3-#');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const expected = '--1-2-3---1-2-3---1-2-3-#';

const result = source.cache(undefined, undefined, rxTestScheduler).retry(2);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
47 changes: 44 additions & 3 deletions src/operator/cache.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {Observable} from '../Observable';
import {publishReplay} from './publishReplay';
import {Scheduler} from '../Scheduler';
import {ConnectableObservable} from '../observable/ConnectableObservable';
import {ReplaySubject} from '../ReplaySubject';
import {Observer} from '../Observer';
import {Subscription} from '../Subscription';

/**
* @param bufferSize
Expand All @@ -14,7 +15,47 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
export function cache<T>(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: Scheduler): Observable<T> {
return (<ConnectableObservable<any>>publishReplay.call(this, bufferSize, windowTime, scheduler)).refCount();
let subject: ReplaySubject<T>;
let source = this;
let refs = 0;
let outerSub: Subscription;

const getSubject = () => {
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return subject;
};

return new Observable<T>((observer: Observer<T>) => {
if (!subject) {
subject = getSubject();
outerSub = source.subscribe(
(value: T) => subject.next(value),
(err: any) => {
let s = subject;
subject = null;
s.error(err);
},
() => subject.complete()
);
}

refs++;

if (!subject) {
subject = getSubject();
}
let innerSub = subject.subscribe(observer);

return () => {
refs--;
if (innerSub) {
innerSub.unsubscribe();
}
if (refs === 0) {
outerSub.unsubscribe();
}
};
});
}

export interface CacheSignature<T> {
Expand Down

0 comments on commit cb0b806

Please sign in to comment.