Skip to content

Commit

Permalink
Add test with proof that finalizers run on the worker process
Browse files Browse the repository at this point in the history
  • Loading branch information
gareth-amazon committed Jan 7, 2022
1 parent c5094e4 commit 6c4eede
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ export class RequestProcessorWorker<T> extends ReplaySubject<T> {
}

public addSubscriber(subscriber: Subscriber<T>): Subscription {
const superSub: Subscription = super.subscribe(subscriber);

this.subscribers.push(subscriber as Subscriber<T>);
subscriber.add(() => {
this.removeSubscriber(subscriber);
Expand All @@ -39,7 +37,7 @@ export class RequestProcessorWorker<T> extends ReplaySubject<T> {
}
});

return superSub;
return super.subscribe(subscriber);
}

/** @deprecated */
Expand All @@ -54,6 +52,6 @@ export class RequestProcessorWorker<T> extends ReplaySubject<T> {
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
throw "deprecated, use addObserver";
throw "deprecated, use addSubscriber";
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { RequestProcessorWorkerGroup } from './requestProcessorWorkerGroup';
import { Observable, Subscriber } from 'rxjs';
import { finalize } from 'rxjs/operators';

it('test constructor', () => {
expect(
Expand Down Expand Up @@ -193,3 +194,33 @@ it('test finalizer deletes queries with no subscribers', () => {
expect(workerGroup.size()).toEqual(0);
});

it('producers can run a finalizer when the last subscriber unsubscribes', (done) => {
const workerGroup: RequestProcessorWorkerGroup<string, number> =
new RequestProcessorWorkerGroup<string, number>((query) => {
let timeoutID: any;
let counter = 0;
return new Observable<number>(subscriber => {
timeoutID = setTimeout(function incramenter() {
counter++;
subscriber.next(counter);
timeoutID = setTimeout(incramenter, 5);
},5);
}).pipe(finalize(() => {
clearTimeout(timeoutID);
// the test actually ends here when the timeout is cleared
// if you remove this call to done the test will hang, timeout and fail
done();
}));
}, identity);

const recorder:SubscriberRecorder<number> = new SubscriberRecorder<number>();
workerGroup.subscribe("test", recorder);
expect(workerGroup.size()).toEqual(1);

setTimeout(() => {
expect(recorder.peek()).toBeGreaterThan(1);
recorder.unsubscribe();
// expect no workers to remain because finalizer ran to delete the worker
expect(workerGroup.size()).toEqual(0);
}, 25)
});

0 comments on commit 6c4eede

Please sign in to comment.