diff --git a/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts b/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts index a93da5d94..45d808311 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts @@ -29,8 +29,6 @@ export class RequestProcessorWorker extends ReplaySubject { } public addSubscriber(subscriber: Subscriber): Subscription { - const superSub: Subscription = super.subscribe(subscriber); - this.subscribers.push(subscriber as Subscriber); subscriber.add(() => { this.removeSubscriber(subscriber); @@ -39,7 +37,7 @@ export class RequestProcessorWorker extends ReplaySubject { } }); - return superSub; + return super.subscribe(subscriber); } /** @deprecated */ @@ -54,6 +52,6 @@ export class RequestProcessorWorker extends ReplaySubject { error?: ((error: any) => void) | null, complete?: (() => void) | null ): Subscription { - throw "deprecated, use addObserver"; + throw "deprecated, use addSubscriber"; } } diff --git a/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts index 5b1b9966f..b511e2e8e 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts @@ -1,5 +1,6 @@ import { RequestProcessorWorkerGroup } from './requestProcessorWorkerGroup'; import { Observable, Subscriber } from 'rxjs'; +import { finalize } from 'rxjs/operators'; it('test constructor', () => { expect( @@ -193,3 +194,33 @@ it('test finalizer deletes queries with no subscribers', () => { expect(workerGroup.size()).toEqual(0); }); +it('producers can run a finalizer when the last subscriber unsubscribes', (done) => { + const workerGroup: RequestProcessorWorkerGroup = + new RequestProcessorWorkerGroup((query) => { + let timeoutID: any; + let counter = 0; + return new Observable(subscriber => { + timeoutID = setTimeout(function incramenter() { + counter++; + subscriber.next(counter); + timeoutID = setTimeout(incramenter, 5); + },5); + }).pipe(finalize(() => { + clearTimeout(timeoutID); + // the test actually ends here when the timeout is cleared + // if you remove this call to done the test will hang, timeout and fail + done(); + })); + }, identity); + + const recorder:SubscriberRecorder = new SubscriberRecorder(); + workerGroup.subscribe("test", recorder); + expect(workerGroup.size()).toEqual(1); + + setTimeout(() => { + expect(recorder.peek()).toBeGreaterThan(1); + recorder.unsubscribe(); + // expect no workers to remain because finalizer ran to delete the worker + expect(workerGroup.size()).toEqual(0); + }, 25) +});