Skip to content

Commit

Permalink
Results of
Browse files Browse the repository at this point in the history
  • Loading branch information
gareth-amazon committed Jan 7, 2022
1 parent 6c4eede commit 1c45760
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 176 deletions.
80 changes: 44 additions & 36 deletions packages/core/src/asset-modules/sitewise/requestProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,34 @@ import { sampleAssetModel, sampleAssetSummary, samplePropertyValue } from '../mo
import { HIERARCHY_ROOT_ID, HierarchyAssetSummaryList, LoadingStateEnum } from './types';

it('initializes', () => {
expect(
() =>
new RequestProcessor({} as SiteWiseAssetDataSource, new SiteWiseAssetCache())
).not.toThrowError();
expect(() => {
new RequestProcessor({} as SiteWiseAssetDataSource, new SiteWiseAssetCache());
}).not.toThrowError();
});

const createMockSiteWiseAssetDataSource = (): SiteWiseAssetDataSource => {
return {
describeAsset: (input: DescribeAssetCommandInput): Promise<DescribeAssetCommandOutput> => {
throw "No Calls Expected";
throw 'No Calls Expected';
},

getPropertyValue: (input: GetAssetPropertyValueCommandInput): Promise<GetAssetPropertyValueCommandOutput> => {
throw "No Calls Expected";
throw 'No Calls Expected';
},

describeAssetModel: (input: DescribeAssetModelCommandInput): Promise<DescribeAssetModelCommandOutput> => {
throw "No Calls Expected";
throw 'No Calls Expected';
},

listAssets: (input: ListAssetsCommandInput): Promise<ListAssetsCommandOutput> => {
throw "No Calls Expected";
throw 'No Calls Expected';
},

listAssociatedAssets: (input: ListAssociatedAssetsCommandInput): Promise<ListAssociatedAssetsCommandOutput> => {
throw "No Calls Expected";
}
}
}
throw 'No Calls Expected';
},
};
};

describe('Request an AssetSummary', () => {
const mockDataSource = createMockSiteWiseAssetDataSource();
Expand All @@ -59,19 +58,18 @@ describe('Request an AssetSummary', () => {

const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache());
const observable: Observable<AssetSummary> = new Observable<AssetSummary>((observer) => {
requestProcessor.getAssetSummary({assetId: sampleAssetSummary.id as string}, observer);
requestProcessor.getAssetSummary({ assetId: sampleAssetSummary.id as string }, observer);
});

it('waits for the AssetSummary', done => {
observable.subscribe(result => {
it('waits for the AssetSummary', (done) => {
observable.subscribe((result) => {
expect(result).not.toBeUndefined();
expect(result).toEqual(sampleAssetSummary);
done();
});
});
});


describe('Request an Asset Model', () => {
const mockDataSource = createMockSiteWiseAssetDataSource();
let mockDescribeAssetModel = jest.fn();
Expand All @@ -80,11 +78,11 @@ describe('Request an Asset Model', () => {

const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache());
const observable: Observable<DescribeAssetModelResponse> = new Observable<DescribeAssetModelResponse>((observer) => {
requestProcessor.getAssetModel({assetModelId: sampleAssetModel.assetModelId as string}, observer);
requestProcessor.getAssetModel({ assetModelId: sampleAssetModel.assetModelId as string }, observer);
});

it('waits for the Asset Model', done => {
observable.subscribe(result => expect(sampleAssetModel).toEqual(result));
it('waits for the Asset Model', (done) => {
observable.subscribe((result) => expect(sampleAssetModel).toEqual(result));
done();
});
});
Expand All @@ -97,32 +95,40 @@ describe('Request an Asset Property Value', () => {

const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache());
const observable: Observable<AssetPropertyValue> = new Observable<AssetPropertyValue>((observer) => {
requestProcessor.getAssetPropertyValue({
assetId: sampleAssetSummary.id as string,
propertyId: 'doesnt matter'}, observer);
requestProcessor.getAssetPropertyValue(
{
assetId: sampleAssetSummary.id as string,
propertyId: 'doesnt matter',
},
observer
);
});

it('waits for the Asset Property Value', done => {
observable.subscribe(result => expect(samplePropertyValue).toEqual(result));
it('waits for the Asset Property Value', (done) => {
observable.subscribe((result) => expect(samplePropertyValue).toEqual(result));
done();
});
});

describe('Request an Asset Hierarchy of a parent Asset', () => {
const mockDataSource = createMockSiteWiseAssetDataSource();
const mockListAssociatedAssets = jest.fn();
const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary]};
const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary] };
mockListAssociatedAssets.mockReturnValue(Promise.resolve<ListAssetsCommandOutput>(result));
mockDataSource.listAssociatedAssets = mockListAssociatedAssets;
const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache());
const observable: Observable<HierarchyAssetSummaryList> = new Observable<HierarchyAssetSummaryList>((observer) => {
requestProcessor.getAssetHierarchy({assetId: "parentAssetId", assetHierarchyId: 'hierarchyId'}, observer);
requestProcessor.getAssetHierarchy({ assetId: 'parentAssetId', assetHierarchyId: 'hierarchyId' }, observer);
});

it('waits for the Asset Hierarchy to become loaded', done => {
observable.subscribe(result => {
it('waits for the Asset Hierarchy to become loaded', (done) => {
observable.subscribe((result) => {
// the worker returns the completed list of assets:
expect({ assetHierarchyId: 'hierarchyId', assets: [sampleAssetSummary], loadingState: LoadingStateEnum.LOADED }).toEqual(result)
expect({
assetHierarchyId: 'hierarchyId',
assets: [sampleAssetSummary],
loadingState: LoadingStateEnum.LOADED,
}).toEqual(result);
done();
});
});
Expand All @@ -131,22 +137,24 @@ describe('Request an Asset Hierarchy of a parent Asset', () => {
describe('Request the root assets', () => {
const mockDataSource = createMockSiteWiseAssetDataSource();
const mockListAssets = jest.fn();
const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary]};
const result: ListAssetsCommandOutput = { $metadata: {}, assetSummaries: [sampleAssetSummary] };
mockListAssets.mockReturnValue(Promise.resolve<ListAssetsCommandOutput>(result));
mockDataSource.listAssets = mockListAssets;
const requestProcessor: RequestProcessor = new RequestProcessor(mockDataSource, new SiteWiseAssetCache());

const observable: Observable<HierarchyAssetSummaryList> = new Observable<HierarchyAssetSummaryList>((observer) => {
requestProcessor.getAssetHierarchy({assetHierarchyId: HIERARCHY_ROOT_ID}, observer);
requestProcessor.getAssetHierarchy({ assetHierarchyId: HIERARCHY_ROOT_ID }, observer);
});

it('waits for the root assets to become loaded', done => {
observable.subscribe(result => {
it('waits for the root assets to become loaded', (done) => {
observable.subscribe((result) => {
// the worker returns the completed list of assets:
expect({ assetHierarchyId: HIERARCHY_ROOT_ID, assets: [sampleAssetSummary], loadingState: LoadingStateEnum.LOADED }).toEqual(result)
expect({
assetHierarchyId: HIERARCHY_ROOT_ID,
assets: [sampleAssetSummary],
loadingState: LoadingStateEnum.LOADED,
}).toEqual(result);
done();
});
});
});

>>>>>>> 7ddb230 (Wrap SiteWise Asset related API calls in a Data Source)
126 changes: 71 additions & 55 deletions packages/core/src/asset-modules/sitewise/requestProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,45 @@ export class RequestProcessor {
private readonly cache: SiteWiseAssetCache;
private readonly MAX_RESULTS: number = 250;

private readonly assetSummaryWorkers: RequestProcessorWorkerGroup<AssetSummaryQuery, AssetSummary> =
new RequestProcessorWorkerGroup<AssetSummaryQuery, AssetSummary>(
query => this.assetSummaryWorkerFactory(query),
query => query.assetId
);
private readonly assetSummaryWorkers: RequestProcessorWorkerGroup<
AssetSummaryQuery,
AssetSummary
> = new RequestProcessorWorkerGroup<AssetSummaryQuery, AssetSummary>(
(query) => this.assetSummaryWorkerFactory(query),
(query) => query.assetId
);

private readonly assetModelWorkers: RequestProcessorWorkerGroup<AssetModelQuery, DescribeAssetModelResponse> =
new RequestProcessorWorkerGroup<AssetModelQuery, DescribeAssetModelResponse>(
query => this.assetModelWorkerFactory(query),
query => query.assetModelId
);
private readonly assetModelWorkers: RequestProcessorWorkerGroup<
AssetModelQuery,
DescribeAssetModelResponse
> = new RequestProcessorWorkerGroup<AssetModelQuery, DescribeAssetModelResponse>(
(query) => this.assetModelWorkerFactory(query),
(query) => query.assetModelId
);

private readonly assetPropertyValueWorkers: RequestProcessorWorkerGroup<AssetPropertyValueQuery, AssetPropertyValue> =
new RequestProcessorWorkerGroup<AssetPropertyValueQuery, AssetPropertyValue>(
query => this.assetPropertyValueWorkerFactory(query),
query => query.assetId + ':' + query.propertyId
);
private readonly assetPropertyValueWorkers: RequestProcessorWorkerGroup<
AssetPropertyValueQuery,
AssetPropertyValue
> = new RequestProcessorWorkerGroup<AssetPropertyValueQuery, AssetPropertyValue>(
(query) => this.assetPropertyValueWorkerFactory(query),
(query) => query.assetId + ':' + query.propertyId
);

private readonly hierarchyWorkers: RequestProcessorWorkerGroup<AssetHierarchyQuery, HierarchyAssetSummaryList> =
new RequestProcessorWorkerGroup<AssetHierarchyQuery, HierarchyAssetSummaryList>(
query => this.loadHierarchyWorkerFactory(query),
query => assetHierarchyQueryKey(query),
);
private readonly hierarchyWorkers: RequestProcessorWorkerGroup<
AssetHierarchyQuery,
HierarchyAssetSummaryList
> = new RequestProcessorWorkerGroup<AssetHierarchyQuery, HierarchyAssetSummaryList>(
(query) => this.loadHierarchyWorkerFactory(query),
(query) => assetHierarchyQueryKey(query)
);

constructor(api: SiteWiseAssetDataSource, cache: SiteWiseAssetCache) {
this.api = api;
this.cache = cache;
}

private assetSummaryWorkerFactory(assetSummaryQuery: AssetSummaryQuery): Observable<AssetSummary> {
return new Observable<AssetSummary>(observer => {
return new Observable<AssetSummary>((observer) => {
let assetSummary = this.cache.getAssetSummary(assetSummaryQuery.assetId);
if (assetSummary != undefined) {
observer.next(assetSummary);
Expand All @@ -69,20 +77,21 @@ export class RequestProcessor {
}

this.api.describeAsset({ assetId: assetSummaryQuery.assetId }).then((assetSummary) => {
this.cache.storeAssetSummary(assetSummary);
observer.next(this.cache.getAssetSummary(assetSummaryQuery.assetId));
observer.complete();
});
}
);
this.cache.storeAssetSummary(assetSummary);
observer.next(this.cache.getAssetSummary(assetSummaryQuery.assetId));
observer.complete();
});
});
}

getAssetSummary(assetSummaryRequest: AssetSummaryQuery, observer: Subscriber<AssetSummary>) {
this.assetSummaryWorkers.subscribe(assetSummaryRequest, observer);
}

private assetPropertyValueWorkerFactory(assetPropertyValueQuery: AssetPropertyValueQuery): Observable<AssetPropertyValue> {
return new Observable<AssetPropertyValue>(observer => {
private assetPropertyValueWorkerFactory(
assetPropertyValueQuery: AssetPropertyValueQuery
): Observable<AssetPropertyValue> {
return new Observable<AssetPropertyValue>((observer) => {
let propertyValue = this.cache.getPropertyValue(
assetPropertyValueQuery.assetId,
assetPropertyValueQuery.propertyId
Expand All @@ -93,10 +102,11 @@ export class RequestProcessor {
return;
}

this.api.getPropertyValue({
assetId: assetPropertyValueQuery.assetId,
propertyId: assetPropertyValueQuery.propertyId,
})
this.api
.getPropertyValue({
assetId: assetPropertyValueQuery.assetId,
propertyId: assetPropertyValueQuery.propertyId,
})
.then((propertyValue) => {
if (propertyValue.propertyValue != undefined) {
this.cache.storePropertyValue(
Expand All @@ -118,7 +128,7 @@ export class RequestProcessor {
}

private assetModelWorkerFactory(assetModelQuery: AssetModelQuery): Observable<DescribeAssetModelResponse> {
return new Observable<DescribeAssetModelResponse>(observer => {
return new Observable<DescribeAssetModelResponse>((observer) => {
let model = this.cache.getAssetModel(assetModelQuery.assetModelId);
if (model != undefined) {
observer.next(model);
Expand All @@ -134,7 +144,7 @@ export class RequestProcessor {
});
}

getAssetModel(assetModelRequest: AssetModelQuery, observer: Subscriber<DescribeAssetModelResponse>){
getAssetModel(assetModelRequest: AssetModelQuery, observer: Subscriber<DescribeAssetModelResponse>) {
this.assetModelWorkers.subscribe(assetModelRequest, observer);
}

Expand All @@ -156,29 +166,35 @@ export class RequestProcessor {
return this.buildAssetSummaryList(hierarchyRequest.assetHierarchyId, cachedValue);
}

private hierarchyRootRequest(paginationToken: string|undefined): Observable<ListAssetsCommandOutput> {
return new Observable<ListAssetsCommandOutput>(observer => {
this.api.listAssets({
filter: ListAssetsFilter.TOP_LEVEL,
maxResults: this.MAX_RESULTS,
nextToken: paginationToken,
assetModelId: undefined,
}).then(result => observer.next(result));
private hierarchyRootRequest(paginationToken: string | undefined): Observable<ListAssetsCommandOutput> {
return new Observable<ListAssetsCommandOutput>((observer) => {
this.api
.listAssets({
filter: ListAssetsFilter.TOP_LEVEL,
maxResults: this.MAX_RESULTS,
nextToken: paginationToken,
assetModelId: undefined,
})
.then((result) => observer.next(result));
});
}

private hierarchyBranchRequest(query: AssetHierarchyQuery,
paginationToken: string | undefined): Observable<ListAssociatedAssetsCommandOutput> {
return new Observable<ListAssociatedAssetsCommandOutput>(observer => {
this.api.listAssociatedAssets({
hierarchyId: query.assetHierarchyId,
maxResults: this.MAX_RESULTS,
traversalDirection: TraversalDirection.CHILD,
assetId: query.assetId,
nextToken: paginationToken,
}).then(result => {
observer.next(result);
});
private hierarchyBranchRequest(
query: AssetHierarchyQuery,
paginationToken: string | undefined
): Observable<ListAssociatedAssetsCommandOutput> {
return new Observable<ListAssociatedAssetsCommandOutput>((observer) => {
this.api
.listAssociatedAssets({
hierarchyId: query.assetHierarchyId,
maxResults: this.MAX_RESULTS,
traversalDirection: TraversalDirection.CHILD,
assetId: query.assetId,
nextToken: paginationToken,
})
.then((result) => {
observer.next(result);
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export class RequestProcessorWorker<T> extends ReplaySubject<T> {

private removeSubscriber(subscriber: Subscriber<T>) {
const index = this.subscribers.indexOf(subscriber);
0 <= index && this.subscribers.splice(index, 1);
if (index > -1) {
this.subscribers.splice(index, 1);
}
}

public addSubscriber(subscriber: Subscriber<T>): Subscription {
Expand All @@ -45,13 +47,17 @@ export class RequestProcessorWorker<T> extends ReplaySubject<T> {
/** @deprecated */
subscribe(next: (value: T) => void): Subscription;
/** @deprecated */
subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription;
subscribe(
next?: ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription;
/** @deprecated */
subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
throw "deprecated, use addSubscriber";
throw 'deprecated, use addSubscriber';
}
}
Loading

0 comments on commit 1c45760

Please sign in to comment.