diff --git a/packages/core/src/asset-modules/sitewise/requestProcessor.spec.ts b/packages/core/src/asset-modules/sitewise/requestProcessor.spec.ts index 5de7cffd4..130206ba0 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessor.spec.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessor.spec.ts @@ -1,9 +1,160 @@ import { RequestProcessor } from './requestProcessor'; -import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise'; import { SiteWiseAssetCache } from './cache'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; +import { + AssetSummary, + AssetPropertyValue, + DescribeAssetCommandInput, + DescribeAssetCommandOutput, + DescribeAssetModelResponse, + DescribeAssetModelCommandInput, + DescribeAssetModelCommandOutput, + GetAssetPropertyValueCommandInput, + GetAssetPropertyValueCommandOutput, + ListAssetsCommandInput, + ListAssetsCommandOutput, + ListAssociatedAssetsCommandInput, + ListAssociatedAssetsCommandOutput, +} from '@aws-sdk/client-iotsitewise'; +import { Observable } from 'rxjs'; +import { sampleAssetModel, sampleAssetSummary, samplePropertyValue } from '../mocks.spec'; +import { HIERARCHY_ROOT_ID, HierarchyAssetSummaryList, LoadingStateEnum } from './types'; it('initializes', () => { - expect( - () => new RequestProcessor(new IoTSiteWiseClient({ region: 'us-east' }), new SiteWiseAssetCache()) - ).not.toThrowError(); + expect(() => { + new RequestProcessor({} as SiteWiseAssetDataSource, new SiteWiseAssetCache()); + }).not.toThrowError(); +}); + +const createMockSiteWiseAssetDataSource = (): SiteWiseAssetDataSource => { + return { + describeAsset: (input: DescribeAssetCommandInput): Promise => { + throw 'No Calls Expected'; + }, + + getPropertyValue: (input: GetAssetPropertyValueCommandInput): Promise => { + throw 'No Calls Expected'; + }, + + describeAssetModel: (input: DescribeAssetModelCommandInput): Promise => { + throw 'No Calls Expected'; + }, + + listAssets: (input: ListAssetsCommandInput): Promise => { + throw 'No Calls Expected'; + }, + + listAssociatedAssets: (input: ListAssociatedAssetsCommandInput): Promise => { + throw 'No Calls Expected'; + }, + }; +}; + +describe('Request an AssetSummary', () => { + const mockDataSource = createMockSiteWiseAssetDataSource(); + let mockDescribeAsset = jest.fn(); + mockDescribeAsset.mockReturnValue(Promise.resolve(sampleAssetSummary)); + mockDataSource.describeAsset = mockDescribeAsset; + + const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache()); + const observable: Observable = new Observable((observer) => { + requestProcessor.getAssetSummary({ assetId: sampleAssetSummary.id as string }, observer); + }); + + it('waits for the AssetSummary', (done) => { + observable.subscribe((result) => { + expect(result).not.toBeUndefined(); + expect(result).toEqual(sampleAssetSummary); + done(); + }); + }); +}); + +describe('Request an Asset Model', () => { + const mockDataSource = createMockSiteWiseAssetDataSource(); + let mockDescribeAssetModel = jest.fn(); + mockDescribeAssetModel.mockReturnValue(Promise.resolve(sampleAssetModel)); + mockDataSource.describeAssetModel = mockDescribeAssetModel; + + const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache()); + const observable: Observable = new Observable((observer) => { + requestProcessor.getAssetModel({ assetModelId: sampleAssetModel.assetModelId as string }, observer); + }); + + it('waits for the Asset Model', (done) => { + observable.subscribe((result) => expect(sampleAssetModel).toEqual(result)); + done(); + }); +}); + +describe('Request an Asset Property Value', () => { + const mockDataSource = createMockSiteWiseAssetDataSource(); + let mockGetPropertyValue = jest.fn(); + mockGetPropertyValue.mockReturnValue(Promise.resolve(samplePropertyValue)); + mockDataSource.getPropertyValue = mockGetPropertyValue; + + const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache()); + const observable: Observable = new Observable((observer) => { + requestProcessor.getAssetPropertyValue( + { + assetId: sampleAssetSummary.id as string, + propertyId: 'doesnt matter', + }, + observer + ); + }); + + it('waits for the Asset Property Value', (done) => { + observable.subscribe((result) => expect(samplePropertyValue).toEqual(result)); + done(); + }); +}); + +describe('Request an Asset Hierarchy of a parent Asset', () => { + const mockDataSource = createMockSiteWiseAssetDataSource(); + const mockListAssociatedAssets = jest.fn(); + const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary] }; + mockListAssociatedAssets.mockReturnValue(Promise.resolve(result)); + mockDataSource.listAssociatedAssets = mockListAssociatedAssets; + const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache()); + const observable: Observable = new Observable((observer) => { + requestProcessor.getAssetHierarchy({ assetId: 'parentAssetId', assetHierarchyId: 'hierarchyId' }, observer); + }); + + it('waits for the Asset Hierarchy to become loaded', (done) => { + observable.subscribe((result) => { + // the worker returns the completed list of assets: + expect({ + assetHierarchyId: 'hierarchyId', + assets: [sampleAssetSummary], + loadingState: LoadingStateEnum.LOADED, + }).toEqual(result); + done(); + }); + }); +}); + +describe('Request the root assets', () => { + const mockDataSource = createMockSiteWiseAssetDataSource(); + const mockListAssets = jest.fn(); + const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary] }; + mockListAssets.mockReturnValue(Promise.resolve(result)); + mockDataSource.listAssets = mockListAssets; + const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache()); + + const observable: Observable = new Observable((observer) => { + requestProcessor.getAssetHierarchy({ assetHierarchyId: HIERARCHY_ROOT_ID }, observer); + }); + + it('waits for the root assets to become loaded', (done) => { + observable.subscribe((result) => { + // the worker returns the completed list of assets: + expect({ + assetHierarchyId: HIERARCHY_ROOT_ID, + assets: [sampleAssetSummary], + loadingState: LoadingStateEnum.LOADED, + }).toEqual(result); + done(); + }); + }); }); diff --git a/packages/core/src/asset-modules/sitewise/requestProcessor.ts b/packages/core/src/asset-modules/sitewise/requestProcessor.ts index 07b5353f9..b3ad74a72 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessor.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessor.ts @@ -13,15 +13,9 @@ import { EMPTY, Observable, Subscriber } from 'rxjs'; import { AssetPropertyValue, AssetSummary, - DescribeAssetCommand, - DescribeAssetModelCommand, DescribeAssetModelResponse, - GetAssetPropertyValueCommand, - IoTSiteWiseClient, - ListAssetsCommand, ListAssetsCommandOutput, ListAssetsFilter, - ListAssociatedAssetsCommand, ListAssociatedAssetsCommandOutput, TraversalDirection, } from '@aws-sdk/client-iotsitewise'; @@ -29,89 +23,131 @@ import { SiteWiseAssetCache } from './cache'; import { SiteWiseAssetSession } from './session'; import { RequestProcessorWorkerGroup } from './requestProcessorWorkerGroup'; import { expand, map } from 'rxjs/operators'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; export class RequestProcessor { - private readonly api: IoTSiteWiseClient; + private readonly api: SiteWiseAssetDataSource; private readonly cache: SiteWiseAssetCache; private readonly MAX_RESULTS: number = 250; + + private readonly assetSummaryWorkers: RequestProcessorWorkerGroup< + AssetSummaryQuery, + AssetSummary + > = new RequestProcessorWorkerGroup( + (query) => this.assetSummaryWorkerFactory(query), + (query) => query.assetId + ); + + private readonly assetModelWorkers: RequestProcessorWorkerGroup< + AssetModelQuery, + DescribeAssetModelResponse + > = new RequestProcessorWorkerGroup( + (query) => this.assetModelWorkerFactory(query), + (query) => query.assetModelId + ); + + private readonly assetPropertyValueWorkers: RequestProcessorWorkerGroup< + AssetPropertyValueQuery, + AssetPropertyValue + > = new RequestProcessorWorkerGroup( + (query) => this.assetPropertyValueWorkerFactory(query), + (query) => query.assetId + ':' + query.propertyId + ); + private readonly hierarchyWorkers: RequestProcessorWorkerGroup< AssetHierarchyQuery, HierarchyAssetSummaryList > = new RequestProcessorWorkerGroup( (query) => this.loadHierarchyWorkerFactory(query), - (query) => assetHierarchyQueryKey(query), - (query) => this.hierarchyFromCache(query) + (query) => assetHierarchyQueryKey(query) ); - constructor(api: IoTSiteWiseClient, cache: SiteWiseAssetCache) { + constructor(api: SiteWiseAssetDataSource, cache: SiteWiseAssetCache) { this.api = api; this.cache = cache; } - getAssetSummary(assetSummaryRequest: AssetSummaryQuery, observer: Subscriber) { - let assetSummary = this.cache.getAssetSummary(assetSummaryRequest.assetId); - if (assetSummary != undefined) { - observer.next(assetSummary); - observer.complete(); - return; - } + private assetSummaryWorkerFactory(assetSummaryQuery: AssetSummaryQuery): Observable { + return new Observable((observer) => { + let assetSummary = this.cache.getAssetSummary(assetSummaryQuery.assetId); + if (assetSummary != undefined) { + observer.next(assetSummary); + observer.complete(); + return; + } - this.api.send(new DescribeAssetCommand({ assetId: assetSummaryRequest.assetId })).then((assetSummary) => { - this.cache.storeAssetSummary(assetSummary); - observer.next(this.cache.getAssetSummary(assetSummaryRequest.assetId)); - observer.complete(); + this.api.describeAsset({ assetId: assetSummaryQuery.assetId }).then((assetSummary) => { + this.cache.storeAssetSummary(assetSummary); + observer.next(this.cache.getAssetSummary(assetSummaryQuery.assetId)); + observer.complete(); + }); }); } - getAssetPropertyValue(assetPropertyValueRequest: AssetPropertyValueQuery, observer: Subscriber) { - let propertyValue = this.cache.getPropertyValue( - assetPropertyValueRequest.assetId, - assetPropertyValueRequest.propertyId - ); - if (propertyValue != undefined) { - observer.next(propertyValue); - observer.complete(); - return; - } + getAssetSummary(assetSummaryRequest: AssetSummaryQuery, observer: Subscriber) { + this.assetSummaryWorkers.subscribe(assetSummaryRequest, observer); + } - this.api - .send( - new GetAssetPropertyValueCommand({ - assetId: assetPropertyValueRequest.assetId, - propertyId: assetPropertyValueRequest.propertyId, + private assetPropertyValueWorkerFactory( + assetPropertyValueQuery: AssetPropertyValueQuery + ): Observable { + return new Observable((observer) => { + let propertyValue = this.cache.getPropertyValue( + assetPropertyValueQuery.assetId, + assetPropertyValueQuery.propertyId + ); + if (propertyValue != undefined) { + observer.next(propertyValue); + observer.complete(); + return; + } + + this.api + .getPropertyValue({ + assetId: assetPropertyValueQuery.assetId, + propertyId: assetPropertyValueQuery.propertyId, }) - ) - .then((propertyValue) => { - if (propertyValue.propertyValue != undefined) { - this.cache.storePropertyValue( - assetPropertyValueRequest.assetId, - assetPropertyValueRequest.propertyId, - propertyValue.propertyValue - ); - observer.next( - this.cache.getPropertyValue(assetPropertyValueRequest.assetId, assetPropertyValueRequest.propertyId) - ); - observer.complete(); - } - // TODO: if it is undefined, perform error handling - }); + .then((propertyValue) => { + if (propertyValue.propertyValue != undefined) { + this.cache.storePropertyValue( + assetPropertyValueQuery.assetId, + assetPropertyValueQuery.propertyId, + propertyValue.propertyValue + ); + observer.next( + this.cache.getPropertyValue(assetPropertyValueQuery.assetId, assetPropertyValueQuery.propertyId) + ); + observer.complete(); + } + }); + }); } - getAssetModel(assetModelRequest: AssetModelQuery, observer: Subscriber) { - let model = this.cache.getAssetModel(assetModelRequest.assetModelId); - if (model != undefined) { - observer.next(model); - observer.complete(); - return; - } + getAssetPropertyValue(assetPropertyValueQuery: AssetPropertyValueQuery, observer: Subscriber) { + this.assetPropertyValueWorkers.subscribe(assetPropertyValueQuery, observer); + } + + private assetModelWorkerFactory(assetModelQuery: AssetModelQuery): Observable { + return new Observable((observer) => { + let model = this.cache.getAssetModel(assetModelQuery.assetModelId); + if (model != undefined) { + observer.next(model); + observer.complete(); + return; + } - this.api.send(new DescribeAssetModelCommand({ assetModelId: assetModelRequest.assetModelId })).then((model) => { - this.cache.storeAssetModel(model); - observer.next(this.cache.getAssetModel(assetModelRequest.assetModelId)); - observer.complete(); + this.api.describeAssetModel({ assetModelId: assetModelQuery.assetModelId }).then((model) => { + this.cache.storeAssetModel(model); + observer.next(this.cache.getAssetModel(assetModelQuery.assetModelId)); + observer.complete(); + }); }); } + getAssetModel(assetModelRequest: AssetModelQuery, observer: Subscriber) { + this.assetModelWorkers.subscribe(assetModelRequest, observer); + } + private buildAssetSummaryList(hierarchyId: string, cachedValue: CachedAssetSummaryBlock): HierarchyAssetSummaryList { return { assetHierarchyId: hierarchyId, @@ -127,21 +163,18 @@ export class RequestProcessor { this.cache.setHierarchyLoadingState(assetHierarchyQueryKey(hierarchyRequest), LoadingStateEnum.NOT_LOADED); cachedValue = this.cache.getHierarchy(assetHierarchyQueryKey(hierarchyRequest)) as CachedAssetSummaryBlock; } - return this.buildAssetSummaryList(hierarchyRequest.assetHierarchyId, cachedValue); } private hierarchyRootRequest(paginationToken: string | undefined): Observable { return new Observable((observer) => { this.api - .send( - new ListAssetsCommand({ - filter: ListAssetsFilter.TOP_LEVEL, - maxResults: this.MAX_RESULTS, - nextToken: paginationToken, - assetModelId: undefined, - }) - ) + .listAssets({ + filter: ListAssetsFilter.TOP_LEVEL, + maxResults: this.MAX_RESULTS, + nextToken: paginationToken, + assetModelId: undefined, + }) .then((result) => observer.next(result)); }); } @@ -152,16 +185,16 @@ export class RequestProcessor { ): Observable { return new Observable((observer) => { this.api - .send( - new ListAssociatedAssetsCommand({ - hierarchyId: query.assetHierarchyId, - maxResults: this.MAX_RESULTS, - traversalDirection: TraversalDirection.CHILD, - assetId: query.assetId, - nextToken: paginationToken, - }) - ) - .then((result) => observer.next(result)); + .listAssociatedAssets({ + hierarchyId: query.assetHierarchyId, + maxResults: this.MAX_RESULTS, + traversalDirection: TraversalDirection.CHILD, + assetId: query.assetId, + nextToken: paginationToken, + }) + .then((result) => { + observer.next(result); + }); }); } diff --git a/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts b/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts index 94434f0d9..98c6818c9 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessorWorker.ts @@ -1,4 +1,4 @@ -import { BehaviorSubject, Observable, Subscription } from 'rxjs'; +import { Observable, Observer, ReplaySubject, Subscriber, Subscription } from 'rxjs'; import { finalize } from 'rxjs/operators'; /** @@ -8,12 +8,13 @@ import { finalize } from 'rxjs/operators'; * * When the last subscriber unsubscribed the 'onTeardown' callback is invoked * * When 'onTeardown' is invoked all subscribers are automatically completed. */ -export class RequestProcessorWorker extends BehaviorSubject { +export class RequestProcessorWorker extends ReplaySubject { private readonly producer: Observable; private readonly broadcastSubscription: Subscription; + private readonly subscribers: Subscriber[] = []; - constructor(initialValue: T, producer: Observable, finalizer: () => void) { - super(initialValue); + constructor(producer: Observable, finalizer: () => void) { + super(1); // when the Observable calls complete(), call finalizer() this.producer = producer.pipe(finalize(finalizer)); // connect the single producer to all consumers @@ -21,4 +22,42 @@ export class RequestProcessorWorker extends BehaviorSubject { // when the last observer unsubscribes, call finalizer() this.broadcastSubscription.add(finalizer); } + + private removeSubscriber(subscriber: Subscriber) { + const index = this.subscribers.indexOf(subscriber); + if (index > -1) { + this.subscribers.splice(index, 1); + } + } + + public addSubscriber(subscriber: Subscriber): Subscription { + this.subscribers.push(subscriber as Subscriber); + subscriber.add(() => { + this.removeSubscriber(subscriber); + if (this.subscribers.length === 0) { + this.broadcastSubscription.unsubscribe(); + } + }); + + return super.subscribe(subscriber); + } + + /** @deprecated */ + subscribe(observer?: Partial>): Subscription; + /** @deprecated */ + subscribe(next: (value: T) => void): Subscription; + /** @deprecated */ + subscribe( + next?: ((value: T) => void) | null, + error?: ((error: any) => void) | null, + complete?: (() => void) | null + ): Subscription; + /** @deprecated */ + subscribe( + observerOrNext?: Partial> | ((value: T) => void) | null, + error?: ((error: any) => void) | null, + complete?: (() => void) | null + ): Subscription { + throw 'deprecated, use addSubscriber'; + } } diff --git a/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts new file mode 100644 index 000000000..524d95aa0 --- /dev/null +++ b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.spec.ts @@ -0,0 +1,243 @@ +import { RequestProcessorWorkerGroup } from './requestProcessorWorkerGroup'; +import { Observable, Subscriber } from 'rxjs'; +import { finalize } from 'rxjs/operators'; + +it('test constructor', () => { + expect(() => { + new RequestProcessorWorkerGroup( + () => { + return new Observable(); + }, + (request) => request + ); + }).not.toThrowError(); +}); + +class SubscriberRecord { + public query: Q; + public subscriber: Subscriber; + constructor(query: Q, subscriber: Subscriber) { + this.query = query; + this.subscriber = subscriber; + } +} + +// A test fixture that records the calls to the worker factory method and exposes the subscriber for testing +class WorkerRecorder { + private lastSubscriberStack: SubscriberRecord[] = []; + private counter: number = 0; + + createWorker(query: Q): Observable { + const observable: Observable = new Observable((subscriber) => { + this.counter++; + this.lastSubscriberStack.push(new SubscriberRecord(query, subscriber)); + }); + return observable; + } + + workerFactory() { + return (query: Q) => this.createWorker(query); + } + + popLastSubscription(): SubscriberRecord | undefined { + return this.lastSubscriberStack.pop(); + } + + getWorkerCount(): number { + return this.counter; + } +} + +// A test fixture that records all values sent to a Subscriber for later inspection +class SubscriberRecorder extends Subscriber { + private recorded: [T?] = []; + next(value?: T) { + super.next(value); + this.recorded.push(value); + } + + length(): number { + return this.recorded.length; + } + + peek(): T | undefined { + return this.length() > 0 ? this.recorded[this.length() - 1] : undefined; + } +} + +const identity = (i: any) => i; + +it('test single consumer, single response', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const recorder: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('test', recorder); + expect(workerRecorder.getWorkerCount()).toEqual(1); + expect(recorder.length()).toEqual(0); + + let subRecord = workerRecorder.popLastSubscription(); + subRecord?.subscriber.next(12345); + expect(recorder.length()).toEqual(1); + expect(recorder.peek()).toEqual(12345); +}); + +it('test multiple consumer, single query', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const firstConsumer: SubscriberRecorder = new SubscriberRecorder(); + const secondConsumer: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('test', firstConsumer); + expect(workerRecorder.getWorkerCount()).toEqual(1); + expect(firstConsumer.length()).toEqual(0); + + workerGroup.subscribe('test', secondConsumer); + // this is the key: we have 2 subscriber but only 1 worker for the same query + expect(workerRecorder.getWorkerCount()).toEqual(1); + expect(firstConsumer.length()).toEqual(0); + expect(secondConsumer.length()).toEqual(0); + + let worker = workerRecorder.popLastSubscription(); + worker?.subscriber.next(12345); + + expect(firstConsumer.length()).toEqual(1); + expect(firstConsumer.peek()).toEqual(12345); + + expect(secondConsumer.length()).toEqual(1); + expect(secondConsumer.peek()).toEqual(12345); +}); + +it('late joining consumers immediately get the latest value', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const firstConsumer: SubscriberRecorder = new SubscriberRecorder(); + const secondConsumer: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('test', firstConsumer); + let worker = workerRecorder.popLastSubscription(); + worker?.subscriber.next(1); + worker?.subscriber.next(2); + worker?.subscriber.next(3); + worker?.subscriber.next(4); + worker?.subscriber.next(12345); + + expect(firstConsumer.length()).toEqual(5); + expect(firstConsumer.peek()).toEqual(12345); + + workerGroup.subscribe('test', secondConsumer); + expect(secondConsumer.length()).toEqual(1); + expect(secondConsumer.peek()).toEqual(12345); +}); + +it('test multiple consumers with different queries', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const firstConsumer: SubscriberRecorder = new SubscriberRecorder(); + const secondConsumer: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('First Query', firstConsumer); + expect(workerRecorder.getWorkerCount()).toEqual(1); + let firstWorker = workerRecorder.popLastSubscription(); + expect(firstWorker?.query).toEqual('First Query'); + + workerGroup.subscribe('Second Query', secondConsumer); + // this is the key: we have 2 consumers asking for different queries, so 2 workers have been created + expect(workerRecorder.getWorkerCount()).toEqual(2); + let secondWorker = workerRecorder.popLastSubscription(); + expect(secondWorker?.query).toEqual('Second Query'); + + firstWorker?.subscriber.next(12345); + secondWorker?.subscriber.next(678910); + + expect(firstConsumer.length()).toEqual(1); + expect(firstConsumer.peek()).toEqual(12345); + + expect(secondConsumer.length()).toEqual(1); + expect(secondConsumer.peek()).toEqual(678910); +}); + +it('test finalizer deletes completed queries', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const recorder: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('test', recorder); + expect(workerRecorder.getWorkerCount()).toEqual(1); + expect(workerGroup.size()).toEqual(1); + let subRecord = workerRecorder.popLastSubscription(); + subRecord?.subscriber.next(12345); + subRecord?.subscriber.complete(); + + expect(recorder.length()).toEqual(1); + expect(recorder.peek()).toEqual(12345); + + // expect no workers to remain because finalizer ran to delete the worker + expect(workerGroup.size()).toEqual(0); +}); + +it('test finalizer deletes queries with no subscribers', () => { + const workerRecorder: WorkerRecorder = new WorkerRecorder(); + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + workerRecorder.workerFactory(), + identity + ); + const recorder: SubscriberRecorder = new SubscriberRecorder(); + + workerGroup.subscribe('test', recorder); + expect(workerRecorder.getWorkerCount()).toEqual(1); + expect(workerGroup.size()).toEqual(1); + recorder.unsubscribe(); + + // expect no workers to remain because finalizer ran to delete the worker + expect(workerGroup.size()).toEqual(0); +}); + +it('producers can run a finalizer when the last subscriber unsubscribes', (done) => { + const workerGroup: RequestProcessorWorkerGroup = new RequestProcessorWorkerGroup( + (query) => { + let timeoutID: any; + let counter = 0; + return new Observable((subscriber) => { + timeoutID = setInterval(function incrementer() { + counter++; + subscriber.next(counter); + }, 5); + }).pipe( + finalize(() => { + clearInterval(timeoutID); + // the test actually ends here when the timeout is cleared + // if you remove this call to done() the test will hang, timeout and fail + done(); + }) + ); + }, + identity + ); + + const recorder: SubscriberRecorder = new SubscriberRecorder(); + workerGroup.subscribe('test', recorder); + expect(workerGroup.size()).toEqual(1); + + setTimeout(() => { + expect(recorder.peek()).toBeGreaterThan(1); + recorder.unsubscribe(); + // expect no workers to remain because finalizer ran to delete the worker + expect(workerGroup.size()).toEqual(0); + }, 25); +}); diff --git a/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.ts b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.ts index 2166ccda9..019aa3491 100644 --- a/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.ts +++ b/packages/core/src/asset-modules/sitewise/requestProcessorWorkerGroup.ts @@ -1,32 +1,33 @@ import { AssetQuery } from './types'; -import { Observable, Subscriber } from 'rxjs'; +import { Observable, Subscriber, Subscription } from 'rxjs'; import { RequestProcessorWorker } from './requestProcessorWorker'; export class RequestProcessorWorkerGroup { - private readonly activeQueries: Record> = {}; + private readonly activeQueries: Map> = new Map(); private readonly workerFactory: (query: TQuery) => Observable; private readonly queryToKey: (query: TQuery) => string; - private readonly startValue: (query: TQuery) => TResult; - constructor( - workerFactory: (query: TQuery) => Observable, - queryToKey: (query: TQuery) => string, - startValue: (query: TQuery) => TResult - ) { + constructor(workerFactory: (query: TQuery) => Observable, queryToKey: (query: TQuery) => string) { this.workerFactory = workerFactory; this.queryToKey = queryToKey; - this.startValue = startValue; } public subscribe(query: TQuery, observer: Subscriber) { const key: string = this.queryToKey(query); - if (!this.activeQueries[key]) { - this.activeQueries[key] = new RequestProcessorWorker(this.startValue(query), this.workerFactory(query), () => { - delete this.activeQueries[key]; - }); + if (!this.activeQueries.get(key)) { + this.activeQueries.set( + key, + new RequestProcessorWorker(this.workerFactory(query), () => { + this.activeQueries.delete(key); + }) + ); } - this.activeQueries[key].subscribe(observer); + this.activeQueries.get(key)?.addSubscriber(observer); + } + + public size(): number { + return this.activeQueries.size; } } diff --git a/packages/core/src/asset-modules/sitewise/session.spec.ts b/packages/core/src/asset-modules/sitewise/session.spec.ts index c9082ea28..01f6f72da 100644 --- a/packages/core/src/asset-modules/sitewise/session.spec.ts +++ b/packages/core/src/asset-modules/sitewise/session.spec.ts @@ -1,13 +1,10 @@ import { SiteWiseAssetSession } from './session'; import { RequestProcessor } from './requestProcessor'; -import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise'; import { SiteWiseAssetCache } from './cache'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; it('initializes', () => { expect( - () => - new SiteWiseAssetSession( - new RequestProcessor(new IoTSiteWiseClient({ region: 'us-east' }), new SiteWiseAssetCache()) - ) + () => new SiteWiseAssetSession(new RequestProcessor({} as SiteWiseAssetDataSource, new SiteWiseAssetCache())) ).not.toThrowError(); }); diff --git a/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.spec.ts b/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.spec.ts index 083d5fd91..e5d48af78 100644 --- a/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.spec.ts +++ b/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.spec.ts @@ -1,12 +1,12 @@ -import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise'; import { SiteWiseAssetModule } from './siteWiseAssetModule'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; it('initializes', () => { - expect(() => new SiteWiseAssetModule(new IoTSiteWiseClient({ region: 'us-east' }))).not.toThrowError(); + expect(() => new SiteWiseAssetModule({} as SiteWiseAssetDataSource)).not.toThrowError(); }); describe('startSession', () => { - const module = new SiteWiseAssetModule(new IoTSiteWiseClient({ region: 'us-east' })); + const module = new SiteWiseAssetModule({} as SiteWiseAssetDataSource); it('getSession', () => { expect(() => module.startSession()).not.toBeUndefined(); }); diff --git a/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.ts b/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.ts index d31d49dbd..558e3964a 100644 --- a/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.ts +++ b/packages/core/src/asset-modules/sitewise/siteWiseAssetModule.ts @@ -3,13 +3,14 @@ import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise'; import { SiteWiseAssetCache } from './cache'; import { RequestProcessor } from './requestProcessor'; import { SiteWiseAssetModuleInterface } from './types'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; export class SiteWiseAssetModule implements SiteWiseAssetModuleInterface { - private readonly api: IoTSiteWiseClient; + private readonly api: SiteWiseAssetDataSource; private readonly assetCache: SiteWiseAssetCache; private readonly requestProcessor: RequestProcessor; - constructor(api: IoTSiteWiseClient) { + constructor(api: SiteWiseAssetDataSource) { this.api = api; this.assetCache = new SiteWiseAssetCache(); this.requestProcessor = new RequestProcessor(this.api, this.assetCache); diff --git a/packages/core/src/asset-modules/sitewise/types.ts b/packages/core/src/asset-modules/sitewise/types.ts index fe1063f09..916975e14 100644 --- a/packages/core/src/asset-modules/sitewise/types.ts +++ b/packages/core/src/asset-modules/sitewise/types.ts @@ -2,7 +2,6 @@ * These are the types of high level queries that you can make to the SiteWise asset module */ import { AssetPropertyValue, AssetSummary, DescribeAssetModelResponse } from '@aws-sdk/client-iotsitewise'; -import { SiteWiseAssetSession } from './session'; import { Subscription } from 'rxjs'; export type AssetQuery = {}; diff --git a/packages/core/src/data-module/index.ts b/packages/core/src/data-module/index.ts index 90c621770..59bc7ef42 100644 --- a/packages/core/src/data-module/index.ts +++ b/packages/core/src/data-module/index.ts @@ -1,9 +1,16 @@ -import { DataModule, RegisterDataSource, SubscribeToDataStreams, SubscribeToDataStreamsFrom } from './types.d'; +import { + DataModule, + RegisterDataSource, + SiteWiseAssetDataSource, + SubscribeToDataStreams, + SubscribeToDataStreamsFrom, +} from './types.d'; import { createDataSource } from '../data-sources/site-wise/data-source'; import { sitewiseSdk } from '../data-sources/site-wise/sitewise-sdk'; import { Credentials, Provider } from '@aws-sdk/types'; import { SiteWiseAssetModule } from '../asset-modules'; import { IotAppKitDataModule } from './IotAppKitDataModule'; +import { createSiteWiseAssetDataSource } from '../data-sources/site-wise/asset-data-source'; let siteWiseAssetModule: SiteWiseAssetModule | undefined = undefined; @@ -36,9 +43,9 @@ export const initialize = ({ if (registerDataSources && awsCredentials != null) { /** Automatically registered data sources */ const siteWiseSdk = sitewiseSdk(awsCredentials, awsRegion); - siteWiseAssetModule = new SiteWiseAssetModule(siteWiseSdk); - - registerDataSource(dataModule, createDataSource(sitewiseSdk(awsCredentials, awsRegion))); + const assetDataSource: SiteWiseAssetDataSource = createSiteWiseAssetDataSource(siteWiseSdk); + siteWiseAssetModule = new SiteWiseAssetModule(assetDataSource); + registerDataSource(dataModule, createDataSource(siteWiseSdk)); } else if (registerDataSources && awsCredentials == null) { console.warn( 'site-wise data-source failed to register. Must provide field `awsCredentials` for the site-wise data-source to register.' diff --git a/packages/core/src/data-module/types.d.ts b/packages/core/src/data-module/types.d.ts index 6e93db363..062412651 100644 --- a/packages/core/src/data-module/types.d.ts +++ b/packages/core/src/data-module/types.d.ts @@ -1,5 +1,18 @@ import { DataStream, DataStreamId, Resolution } from '@synchro-charts/core'; import { TimeSeriesDataRequest } from './data-cache/requestTypes'; +import { Request } from './data-cache/requestTypes'; +import { + DescribeAssetCommandInput, + DescribeAssetCommandOutput, + DescribeAssetModelCommandInput, + DescribeAssetModelCommandOutput, + GetAssetPropertyValueCommandInput, + GetAssetPropertyValueCommandOutput, + ListAssetsCommandInput, + ListAssetsCommandOutput, + ListAssociatedAssetsCommandInput, + ListAssociatedAssetsCommandOutput, +} from '@aws-sdk/client-iotsitewise'; export type RequestInformation = { id: DataStreamId; resolution: Resolution }; export type RequestInformationAndRange = RequestInformation & { start: Date; end: Date }; @@ -89,6 +102,21 @@ export type SubscribeToDataStreamsFrom = ( unsubscribe: () => void; }; +type SubscribeToDataStreamsFromPrivate = ( + source: string, + emit: DataStreamCallback +) => { + unsubscribe: () => void; +}; + +export type SiteWiseAssetDataSource = { + describeAsset: (input: DescribeAssetCommandInput) => Promise; + getPropertyValue: (input: GetAssetPropertyValueCommandInput) => Promise; + describeAssetModel: (input: DescribeAssetModelCommandInput) => Promise; + listAssets: (input: ListAssetsCommandInput) => Promise; + listAssociatedAssets: (input: ListAssociatedAssetsCommandInput) => Promise; +}; + /** * Register custom data source to the data module. */ diff --git a/packages/core/src/data-sources/site-wise/asset-data-source.ts b/packages/core/src/data-sources/site-wise/asset-data-source.ts new file mode 100644 index 000000000..e8b2bcb98 --- /dev/null +++ b/packages/core/src/data-sources/site-wise/asset-data-source.ts @@ -0,0 +1,43 @@ +import { + DescribeAssetCommand, + DescribeAssetCommandInput, + DescribeAssetCommandOutput, + GetAssetPropertyValueCommand, + GetAssetPropertyValueCommandOutput, + GetAssetPropertyValueCommandInput, + IoTSiteWiseClient, + DescribeAssetModelCommand, + ListAssetsCommand, + ListAssociatedAssetsCommand, + DescribeAssetModelCommandInput, + DescribeAssetModelCommandOutput, + ListAssetsCommandInput, + ListAssetsCommandOutput, + ListAssociatedAssetsCommandInput, + ListAssociatedAssetsCommandOutput, +} from '@aws-sdk/client-iotsitewise'; +import { SiteWiseAssetDataSource } from '../../data-module/types'; + +export const createSiteWiseAssetDataSource = (api: IoTSiteWiseClient): SiteWiseAssetDataSource => { + return { + describeAsset: (input: DescribeAssetCommandInput): Promise => { + return api.send(new DescribeAssetCommand(input)); + }, + + getPropertyValue: (input: GetAssetPropertyValueCommandInput): Promise => { + return api.send(new GetAssetPropertyValueCommand(input)); + }, + + describeAssetModel: (input: DescribeAssetModelCommandInput): Promise => { + return api.send(new DescribeAssetModelCommand(input)); + }, + + listAssets: (input: ListAssetsCommandInput): Promise => { + return api.send(new ListAssetsCommand(input)); + }, + + listAssociatedAssets: (input: ListAssociatedAssetsCommandInput): Promise => { + return api.send(new ListAssociatedAssetsCommand(input)); + }, + }; +}; diff --git a/packages/core/stencil.config.ts b/packages/core/stencil.config.ts index b91f276f3..e3f132d82 100755 --- a/packages/core/stencil.config.ts +++ b/packages/core/stencil.config.ts @@ -24,10 +24,10 @@ export const config: Config = { modulePathIgnorePatterns: ['cypress'], coverageThreshold: { global: { - statements: 75, - branches: 60, - functions: 70, - lines: 75, + statements: 80, + branches: 70, + functions: 80, + lines: 80, }, }, },