diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index e8976b3c7d..af65d104c4 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -1,8 +1,8 @@ -import { Observable } from '../Observable'; -import { ReplaySubject } from '../ReplaySubject'; -import { Subscription } from '../Subscription'; -import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; -import { Subscriber } from '../Subscriber'; +import { Observable } from "../Observable"; +import { ReplaySubject } from "../ReplaySubject"; +import { Subscription } from "../Subscription"; +import { MonoTypeOperatorFunction, SchedulerLike } from "../types"; +import { Subscriber } from "../Subscriber"; export interface ShareReplayConfig { bufferSize?: number; @@ -56,22 +56,28 @@ export interface ShareReplayConfig { * @method shareReplay * @owner Observable */ -export function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; -export function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function shareReplay( + config: ShareReplayConfig +): MonoTypeOperatorFunction; +export function shareReplay( + bufferSize?: number, + windowTime?: number, + scheduler?: SchedulerLike +): MonoTypeOperatorFunction; export function shareReplay( configOrBufferSize?: ShareReplayConfig | number, windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction { let config: ShareReplayConfig; - if (configOrBufferSize && typeof configOrBufferSize === 'object') { + if (configOrBufferSize && typeof configOrBufferSize === "object") { config = configOrBufferSize as ShareReplayConfig; } else { config = { bufferSize: configOrBufferSize as number | undefined, windowTime, refCount: false, - scheduler + scheduler, }; } return (source: Observable) => source.lift(shareReplayOperator(config)); @@ -81,7 +87,7 @@ function shareReplayOperator({ bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, refCount: useRefCount, - scheduler + scheduler, }: ShareReplayConfig) { let subject: ReplaySubject | undefined; let refCount = 0; @@ -89,7 +95,10 @@ function shareReplayOperator({ let hasError = false; let isComplete = false; - return function shareReplayOperation(this: Subscriber, source: Observable) { + return function shareReplayOperation( + this: Subscriber, + source: Observable + ) { refCount++; let innerSub: Subscription; if (!subject || hasError) { @@ -97,7 +106,9 @@ function shareReplayOperator({ subject = new ReplaySubject(bufferSize, windowTime, scheduler); innerSub = subject.subscribe(this); subscription = source.subscribe({ - next(value) { subject.next(value); }, + next(value) { + subject.next(value); + }, error(err) { hasError = true; subject.error(err); @@ -108,6 +119,14 @@ function shareReplayOperator({ subject.complete(); }, }); + + // Here we need to check to see if the source synchronously completed. Although + // we're setting `subscription = undefined` in the completion handler, if the source + // is synchronous, that will happen *before* subscription is set by the return of + // the `subscribe` call. + if (isComplete) { + subscription = undefined; + } } else { innerSub = subject.subscribe(this); } @@ -115,6 +134,7 @@ function shareReplayOperator({ this.add(() => { refCount--; innerSub.unsubscribe(); + innerSub = undefined; if (subscription && !isComplete && useRefCount && refCount === 0) { subscription.unsubscribe(); subscription = undefined;