From eb6c821a0e6a7a4d96c4cc89db698c46fa0cb71a Mon Sep 17 00:00:00 2001 From: Bowei Han Date: Tue, 15 Mar 2022 13:20:26 -0400 Subject: [PATCH] feat: support fetchMostRecentBeforeStart --- .../iot-connector.spec.component.ts | 45 +- .../iot-line-chart.spec.component.ts | 24 +- .../iot-status-grid.spec.component.ts | 3 +- .../iot-status-timeline.spec.component.ts | 27 +- packages/core/src/__mocks__/data-source.ts | 41 +- .../data-module/IotAppKitDataModule.spec.ts | 44 +- .../src/data-module/IotAppKitDataModule.ts | 34 +- .../data-cache/caching/caching.spec.ts | 511 ++++++++++++++++++ .../data-module/data-cache/caching/caching.ts | 98 +++- .../src/data-module/data-cache/dataActions.ts | 23 +- .../data-cache/dataCacheWrapped.spec.ts | 23 +- .../data-cache/dataCacheWrapped.ts | 35 +- .../data-cache/dataReducer.spec.ts | 326 ++++++++--- .../src/data-module/data-cache/dataReducer.ts | 50 +- packages/core/src/data-module/types.ts | 9 +- .../time-series-data/client/client.spec.ts | 30 +- .../src/time-series-data/client/client.ts | 28 +- .../client/getAggregatedPropertyDataPoints.ts | 35 +- .../client/getHistoricalPropertyDataPoints.ts | 30 +- .../client/getLatestPropertyDataPoint.ts | 24 +- .../src/time-series-data/data-source.spec.ts | 229 +++++++- .../src/time-series-data/data-source.ts | 35 +- 22 files changed, 1398 insertions(+), 306 deletions(-) diff --git a/packages/components/src/integration/iot-connector/iot-connector.spec.component.ts b/packages/components/src/integration/iot-connector/iot-connector.spec.component.ts index f0bfae3e4..ebba42c3f 100644 --- a/packages/components/src/integration/iot-connector/iot-connector.spec.component.ts +++ b/packages/components/src/integration/iot-connector/iot-connector.spec.component.ts @@ -14,22 +14,41 @@ describe('handles gestures', () => { before(() => { cy.intercept('/properties/history?*', (req) => { - req.reply( - mockGetAggregatedOrRawResponse({ - startDate: new Date(req.query.startDate), - endDate: new Date(req.query.endDate), - }) - ); + if (new Date(req.query.startDate).getUTCFullYear() === 1899) { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(new Date(req.query.endDate).getTime() - SECOND_IN_MS), + endDate: new Date(req.query.endDate), + }) + ); + } else { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(req.query.startDate), + endDate: new Date(req.query.endDate), + }) + ); + } }); cy.intercept('/properties/aggregates?*', (req) => { - req.reply( - mockGetAggregatedOrRawResponse({ - startDate: new Date(req.query.startDate), - endDate: new Date(req.query.endDate), - resolution: req.query.resolution as string, - }) - ); + if (new Date(req.query.startDate).getUTCFullYear() === 1899) { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(new Date(req.query.endDate).getTime() - 60 * SECOND_IN_MS), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } else { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(req.query.startDate), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } }); cy.intercept(`/assets/${assetId}`, (req) => { diff --git a/packages/components/src/integration/iot-line-chart/iot-line-chart.spec.component.ts b/packages/components/src/integration/iot-line-chart/iot-line-chart.spec.component.ts index c1339cffb..4ca44c163 100644 --- a/packages/components/src/integration/iot-line-chart/iot-line-chart.spec.component.ts +++ b/packages/components/src/integration/iot-line-chart/iot-line-chart.spec.component.ts @@ -14,13 +14,23 @@ describe('line chart', () => { before(() => { cy.intercept('/properties/aggregates?*', (req) => { - req.reply( - mockGetAggregatedOrRawResponse({ - startDate: new Date(req.query.startDate), - endDate: new Date(req.query.endDate), - resolution: req.query.resolution as string, - }) - ); + if (new Date(req.query.startDate).getUTCFullYear() === 1899) { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(new Date(req.query.endDate).getTime() - 60 * SECOND_IN_MS), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } else { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(req.query.startDate), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } }); cy.intercept(`/assets/${assetId}`, (req) => { diff --git a/packages/components/src/integration/iot-status-grid/iot-status-grid.spec.component.ts b/packages/components/src/integration/iot-status-grid/iot-status-grid.spec.component.ts index 8400f5e1e..a3cece237 100644 --- a/packages/components/src/integration/iot-status-grid/iot-status-grid.spec.component.ts +++ b/packages/components/src/integration/iot-status-grid/iot-status-grid.spec.component.ts @@ -1,6 +1,7 @@ import { renderChart } from '../../testing/renderChart'; import { mockLatestValueResponse } from '../../testing/mocks/mockGetAggregatedOrRawResponse'; import { mockGetAssetSummary } from '../../testing/mocks/mockGetAssetSummaries'; +import { COMPARISON_OPERATOR } from '@synchro-charts/core'; const SECOND_IN_MS = 1000; @@ -27,7 +28,7 @@ describe('status grid', () => { chartType: 'iot-status-grid', settings: { resolution: '0' }, viewport: { duration: '1m' }, - annotations: { y: [{ color: '#FF0000', comparisonOperator: 'GT', value: 25 }] }, + annotations: { y: [{ color: '#FF0000', comparisonOperator: COMPARISON_OPERATOR.GREATER_THAN, value: 25 }] }, }); cy.wait(SECOND_IN_MS * 2); diff --git a/packages/components/src/integration/iot-status-timeline/iot-status-timeline.spec.component.ts b/packages/components/src/integration/iot-status-timeline/iot-status-timeline.spec.component.ts index 637abe2fd..395b5d1c0 100644 --- a/packages/components/src/integration/iot-status-timeline/iot-status-timeline.spec.component.ts +++ b/packages/components/src/integration/iot-status-timeline/iot-status-timeline.spec.component.ts @@ -1,6 +1,7 @@ import { renderChart } from '../../testing/renderChart'; import { mockGetAggregatedOrRawResponse } from '../../testing/mocks/mockGetAggregatedOrRawResponse'; import { mockGetAssetSummary } from '../../testing/mocks/mockGetAssetSummaries'; +import { COMPARISON_OPERATOR } from '@synchro-charts/core'; const SECOND_IN_MS = 1000; @@ -14,13 +15,23 @@ describe('status timeline', () => { before(() => { cy.intercept('/properties/history?*', (req) => { - req.reply( - mockGetAggregatedOrRawResponse({ - startDate: new Date(req.query.startDate), - endDate: new Date(req.query.endDate), - resolution: req.query.resolution as string, - }) - ); + if (new Date(req.query.startDate).getUTCFullYear() === 1899) { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(new Date(req.query.endDate).getTime() - SECOND_IN_MS), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } else { + req.reply( + mockGetAggregatedOrRawResponse({ + startDate: new Date(req.query.startDate), + endDate: new Date(req.query.endDate), + resolution: req.query.resolution as string, + }) + ); + } }); cy.intercept(`/assets/${assetId}`, (req) => { @@ -32,7 +43,7 @@ describe('status timeline', () => { renderChart({ chartType: 'iot-status-timeline', settings: { resolution: '0' }, - annotations: { y: [{ color: '#FF0000', comparisonOperator: 'GT', value: 26 }] }, + annotations: { y: [{ color: '#FF0000', comparisonOperator: COMPARISON_OPERATOR.GREATER_THAN, value: 26 }] }, }); cy.wait(SECOND_IN_MS * 2); diff --git a/packages/core/src/__mocks__/data-source.ts b/packages/core/src/__mocks__/data-source.ts index c59e4b019..356132a9f 100644 --- a/packages/core/src/__mocks__/data-source.ts +++ b/packages/core/src/__mocks__/data-source.ts @@ -3,10 +3,9 @@ import { DataSourceRequest, DataStream, RequestInformationAndRange, - SiteWiseDataStreamQuery -} from "../data-module/types"; -import { toDataStreamId } from "../common/dataStreamId"; -import { toId } from "@iot-app-kit/source-iotsitewise"; + SiteWiseDataStreamQuery, +} from '../data-module/types'; +import { toDataStreamId } from '../common/dataStreamId'; // A simple mock data source, which will always immediately return a successful response of your choosing. export const createMockSiteWiseDataSource = ( @@ -19,24 +18,34 @@ export const createMockSiteWiseDataSource = ( } = { dataStreams: [], onRequestData: () => {} } ): DataSource => ({ name: 'site-wise', - initiateRequest: jest.fn(({ query, request, onSuccess = () => {} }: DataSourceRequest, requestInformations: RequestInformationAndRange[]) => { - query.assets.forEach(({ assetId, properties }) => - properties.forEach(({ propertyId }) => { - const correspondingRequestInfo = requestInformations.find(({ id }) => `${ assetId }---${propertyId}` === id); - if (correspondingRequestInfo) { - onRequestData({ assetId, propertyId, request }); - onSuccess(dataStreams, 'fetchFromStartToEnd', correspondingRequestInfo.start, correspondingRequestInfo.end); - } - }) - ); - }), + initiateRequest: jest.fn( + ( + { query, request, onSuccess = () => {} }: DataSourceRequest, + requestInformations: RequestInformationAndRange[] + ) => { + query.assets.forEach(({ assetId, properties }) => + properties.forEach(({ propertyId }) => { + const correspondingRequestInfo = requestInformations.find(({ id }) => `${assetId}---${propertyId}` === id); + if (correspondingRequestInfo) { + onRequestData({ assetId, propertyId, request }); + onSuccess( + dataStreams, + correspondingRequestInfo, + correspondingRequestInfo.start, + correspondingRequestInfo.end + ); + } + }) + ); + } + ), getRequestsFromQuery: ({ query }) => query.assets .map(({ assetId, properties }) => properties.map(({ propertyId, refId }) => ({ id: toDataStreamId({ assetId, propertyId }), refId, - resolution: '0' + resolution: '0', })) ) .flat(), diff --git a/packages/core/src/data-module/IotAppKitDataModule.spec.ts b/packages/core/src/data-module/IotAppKitDataModule.spec.ts index c8835fb16..c68aa0fd7 100644 --- a/packages/core/src/data-module/IotAppKitDataModule.spec.ts +++ b/packages/core/src/data-module/IotAppKitDataModule.spec.ts @@ -226,7 +226,14 @@ describe('initial request', () => { query: DATA_STREAM_QUERY, request: { viewport: { start: START, end: END }, settings: { fetchFromStartToEnd: true } }, }), - [{ id: DATA_STREAM.id, resolution: DATA_STREAM.resolution.toString(), start: START, end: END }] + [ + expect.objectContaining({ + id: DATA_STREAM.id, + resolution: DATA_STREAM.resolution.toString(), + start: START, + end: END, + }), + ] ); }); }); @@ -395,6 +402,7 @@ it('subscribes to multiple queries on the same data source', () => { const request: TimeSeriesDataRequest = { viewport: { start: START, end: END }, + settings: { fetchFromStartToEnd: true }, }; const dataModule = new IotAppKitDataModule(); @@ -530,6 +538,7 @@ it('subscribes to multiple data streams on multiple data sources', () => { const request: TimeSeriesDataRequest = { viewport: { start: START, end: END }, + settings: { fetchFromStartToEnd: true }, }; const dataModule = new IotAppKitDataModule(); @@ -810,12 +819,12 @@ describe('caching', () => { request: { viewport: { start: START_1, end: END_1 }, settings: { fetchFromStartToEnd: true } }, }), [ - { + expect.objectContaining({ id: DATA_STREAM.id, resolution: DATA_STREAM.resolution.toString(), start: START_1, end: END_1, - }, + }), ] ); @@ -833,18 +842,18 @@ describe('caching', () => { request: { viewport: { start: START_2, end: END_2 }, settings: { fetchFromStartToEnd: true } }, }), [ - { + expect.objectContaining({ id: DATA_STREAM.id, resolution: DATA_STREAM.resolution.toString(), start: START_2, end: START_1, - }, - { + }), + expect.objectContaining({ id: DATA_STREAM.id, resolution: DATA_STREAM.resolution.toString(), start: END_1, end: END_2, - }, + }), ] ); }); @@ -874,21 +883,21 @@ describe('caching', () => { (dataSource.initiateRequest as Mock).mockClear(); update({ - request: { viewport: { start: START_2, end: END_2 } }, + request: { viewport: { start: START_2, end: END_2 }, settings: { fetchFromStartToEnd: true } }, }); expect(dataSource.initiateRequest).toBeCalledWith( expect.objectContaining({ query: DATA_STREAM_QUERY, - request: { viewport: { start: START_2, end: END_2 } }, + request: { viewport: { start: START_2, end: END_2 }, settings: { fetchFromStartToEnd: true } }, }), [ - { + expect.objectContaining({ id: DATA_STREAM_INFO.id, resolution: DATA_STREAM_INFO.resolution.toString(), start: START_2, end: END_2, - }, + }), ] ); }); @@ -927,13 +936,13 @@ describe('caching', () => { }, }), [ - { + expect.objectContaining({ id: DATA_STREAM_INFO.id, resolution: DATA_STREAM_INFO.resolution.toString(), // 1 minute time advancement invalidates 3 minutes of cache by default, which is 2 minutes from END_1 start: new Date(END.getTime() - 2 * MINUTE_IN_MS), end: END, - }, + }), ] ); }); @@ -1101,7 +1110,7 @@ describe('request scheduler', () => { queries: [DATA_STREAM_QUERY], request: { viewport: { start: START, end: END }, - settings: { refreshRate: SECOND_IN_MS * 0.1 }, + settings: { fetchFromStartToEnd: true, refreshRate: SECOND_IN_MS * 0.1 }, }, }, timeSeriesCallback @@ -1276,7 +1285,7 @@ describe('request scheduler', () => { const { update } = dataModule.subscribeToDataStreams( { queries: [DATA_STREAM_QUERY], - request: { viewport: { duration: SECOND_IN_MS } }, + request: { viewport: { duration: SECOND_IN_MS }, settings: { fetchFromStartToEnd: true } }, }, timeSeriesCallback ); @@ -1287,7 +1296,7 @@ describe('request scheduler', () => { update({ request: { viewport: { start: START, end: END }, - settings: { refreshRate: SECOND_IN_MS * 0.1 }, + settings: { refreshRate: SECOND_IN_MS * 0.1, fetchFromStartToEnd: true }, }, }); timeSeriesCallback.mockClear(); @@ -1313,7 +1322,7 @@ it('when data is requested from the viewport start to end with a buffer, include const { unsubscribe } = dataModule.subscribeToDataStreams( { queries: [DATA_STREAM_QUERY], - request: { viewport: { start, end }, settings: { requestBuffer } }, + request: { viewport: { start, end }, settings: { requestBuffer, fetchFromStartToEnd: true } }, }, timeSeriesCallback ); @@ -1323,6 +1332,7 @@ it('when data is requested from the viewport start to end with a buffer, include request: { settings: { requestBuffer, + fetchFromStartToEnd: true, }, viewport: { start, diff --git a/packages/core/src/data-module/IotAppKitDataModule.ts b/packages/core/src/data-module/IotAppKitDataModule.ts index 3bfb09356..dc001f552 100644 --- a/packages/core/src/data-module/IotAppKitDataModule.ts +++ b/packages/core/src/data-module/IotAppKitDataModule.ts @@ -16,7 +16,7 @@ import DataSourceStore from './data-source-store/dataSourceStore'; import { DataCache } from './data-cache/dataCacheWrapped'; import { TimeSeriesDataRequest } from './data-cache/requestTypes'; import { requestRange } from './data-cache/requestRange'; -import { getDateRangesToRequest } from './data-cache/caching/caching'; +import { getRequestInformations } from './data-cache/caching/caching'; import { viewportEndDate, viewportStartDate } from '../common/viewport'; import { MINUTE_IN_MS, parseDuration, SECOND_IN_MS } from '../common/time'; @@ -87,34 +87,20 @@ export class IotAppKitDataModule implements DataModule { const requiredStreams = requestedStreams.filter(isRequestedDataStream); const requests = requiredStreams - .map(({ resolution, id, cacheSettings }) => { - const dateRanges = getDateRangesToRequest({ + .map(({ resolution, id, cacheSettings }) => + getRequestInformations({ + request, store: this.dataCache.getState(), start: viewportStartDate(viewport), end: viewportEndDate(viewport), - resolution: parseDuration(resolution), + resolution, dataStreamId: id, cacheSettings: { ...this.cacheSettings, ...cacheSettings }, - }); - - return { - dateRanges, - request: { id, resolution }, - }; - }) - .flatMap(({ dateRanges, request }) => - dateRanges.map(([rangeStart, rangeEnd]) => ({ start: rangeStart, end: rangeEnd, ...request })) - ); - - requests.forEach(({ start: reqStart, end: reqEnd, id, resolution }) => { - this.dataCache.onRequest({ - id, - resolution: parseDuration(resolution), - first: reqStart, - last: reqEnd, - request, - }); - }); + }) + ) + .flat(); + + requests.forEach(this.dataCache.onRequest); if (requests.length > 0) { this.registerRequest({ queries, request }, requests); diff --git a/packages/core/src/data-module/data-cache/caching/caching.spec.ts b/packages/core/src/data-module/data-cache/caching/caching.spec.ts index 5b43fb2a6..d197b4144 100755 --- a/packages/core/src/data-module/data-cache/caching/caching.spec.ts +++ b/packages/core/src/data-module/data-cache/caching/caching.spec.ts @@ -7,6 +7,7 @@ import { getDateRangesToRequest, unexpiredCacheIntervals, maxCacheDuration, + getRequestInformations, } from './caching'; import { DEFAULT_CACHE_SETTINGS } from '../../IotAppKitDataModule'; import { HOUR_IN_MS, MINUTE_IN_MS, SECOND_IN_MS } from '../../../common/time'; @@ -369,6 +370,516 @@ describe('getDateRangesToRequest', () => { }); }); +describe('getRequestInformations', () => { + it('returns an empty array if there are no date ranges to request', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([]); + }); + + it('returns an empty array if there are date ranges to request but fetchFromStartToEnd is not set to true', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(1999, 3, 0); + const CACHE_END_DATE = new Date(1999, 4, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: {}, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: CACHE_END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([]); + }); + + it('returns fetchMostRecentBeforeEnd request information if no date ranges, in settings, and not cached', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeEnd: true, fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchMostRecentBeforeEnd: true, + }, + ]); + }); + + it('does not return fetchMostRecentBeforeEnd request information if no date ranges, not cached, and not in settings', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeEnd: false, fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([]); + }); + + it('does not return fetchMostRecentBeforeEnd request information if no date ranges, in settings, and cached', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeEnd: true, fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: { + intervals: [[END_DATE.getTime() - 2000, END_DATE.getTime()]], + items: [ + [ + { + x: END_DATE.getTime() - 1000, + y: 16, + }, + ], + ], + }, + requestCache: createDataPointCache({ + start: START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([]); + }); + + it('converts date ranges to request informations, setting fetchMostRecentBeforeEnd', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(2000, 3, 0); + const CACHE_END_DATE = new Date(2000, 4, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchFromStartToEnd: true, fetchMostRecentBeforeEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: CACHE_END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: CACHE_START_DATE, + id: STREAM_ID, + resolution: '0', + fetchFromStartToEnd: true, + }, + { + start: CACHE_END_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchFromStartToEnd: true, + }, + { + start: START_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchMostRecentBeforeEnd: true, + }, + ]); + }); + + it('converts date ranges to request informations, does not set fetchMostRecentBeforeEnd if not in settings', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(2000, 3, 0); + const CACHE_END_DATE = new Date(2000, 4, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: CACHE_END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: CACHE_START_DATE, + id: STREAM_ID, + resolution: '0', + fetchFromStartToEnd: true, + }, + { + start: CACHE_END_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchFromStartToEnd: true, + }, + ]); + }); + + it('converts date ranges to request informations, does not fetchMostRecentBeforeEnd if data point is already cached', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(2000, 3, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchFromStartToEnd: true, fetchMostRecentBeforeEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: { + intervals: [[END_DATE.getTime() - 2000, END_DATE.getTime()]], + items: [ + [ + { + x: END_DATE.getTime() - 1000, + y: 16, + }, + ], + ], + }, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: CACHE_START_DATE, + id: STREAM_ID, + resolution: '0', + fetchFromStartToEnd: true, + }, + ]); + }); + + it('appends fetchMostRecentBeforeStart request information', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(2000, 3, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeStart: true, fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + { + start: START_DATE, + end: CACHE_START_DATE, + id: STREAM_ID, + fetchFromStartToEnd: true, + resolution: '0', + }, + ]); + }); + + it('appends fetchMostRecentBeforeStart request information if no date ranges, in settings, and not cached', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeStart: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: EMPTY_CACHE, + requestCache: createDataPointCache({ + start: START_DATE, + end: END_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: END_DATE, + id: STREAM_ID, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + ]); + }); + + it('does not fetchMostRecentBeforeStart request information if there is already a recent data point', () => { + const START_DATE = new Date(2000, 0, 0); + const END_DATE = new Date(2001, 0, 0); + const CACHE_START_DATE = new Date(1999, 0, 0); + + expect( + getRequestInformations({ + request: { + viewport: { start: START_DATE, end: END_DATE }, + settings: { fetchMostRecentBeforeStart: true, fetchFromStartToEnd: true }, + }, + store: { + [STREAM_ID]: { + 0: { + id: STREAM_ID, + resolution: 0, + requestHistory: [], + isLoading: false, + isRefreshing: false, + dataCache: { + intervals: [[START_DATE.getTime() - 2000, START_DATE.getTime()]], + items: [ + [ + { + x: START_DATE.getTime() - 1000, + y: 16, + }, + ], + ], + }, + requestCache: createDataPointCache({ + start: CACHE_START_DATE, + end: START_DATE, + }), + }, + }, + }, + resolution: '0', + dataStreamId: STREAM_ID, + start: START_DATE, + end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, + }) + ).toEqual([ + { + start: START_DATE, + end: END_DATE, + id: STREAM_ID, + fetchFromStartToEnd: true, + resolution: '0', + }, + ]); + }); +}); + describe('retrieve unexpired cached time intervals', () => { const DATE_NOW = new Date(2000, 0, 0).getTime(); beforeEach(() => { diff --git a/packages/core/src/data-module/data-cache/caching/caching.ts b/packages/core/src/data-module/data-cache/caching/caching.ts index 27d15ff29..db137d1bf 100755 --- a/packages/core/src/data-module/data-cache/caching/caching.ts +++ b/packages/core/src/data-module/data-cache/caching/caching.ts @@ -1,5 +1,5 @@ import { DataPoint, Primitive } from '@synchro-charts/core'; -import { MINUTE_IN_MS, SECOND_IN_MS } from '../../../common/time'; +import { MINUTE_IN_MS, SECOND_IN_MS, parseDuration } from '../../../common/time'; import { getDataStreamStore } from '../getDataStreamStore'; import { addInterval, @@ -13,6 +13,8 @@ import { CacheSettings, DataStreamsStore, DataStreamStore, TTLDurationMapping } import { getExpiredCacheIntervals } from './expiredCacheIntervals'; import { TimeSeriesDataRequestSettings } from '../requestTypes'; import { pointBisector } from '../../../common/dataFilters'; +import { RequestInformationAndRange } from '../../types'; +import { TimeSeriesDataRequest } from '../../../../dist'; export const unexpiredCacheIntervals = ( streamStore: DataStreamStore, @@ -104,6 +106,100 @@ export const getDateRangesToRequest = ({ .map(([startMS, endMS]) => [new Date(startMS), new Date(endMS)] as [Date, Date]); }; +/** + * Returns all the request information required + * Returns empty list if there are no date ranges needed to be requested. + * + * This takes into account what date intervals for a given stream id and resolution exist, + * allowing us to only request what is needed. + * + * It also includes all the request behaviour pertaining to each date range. + */ +export const getRequestInformations = ({ + request, + store, + dataStreamId, + start, + end, + resolution, + cacheSettings, +}: { + request: TimeSeriesDataRequest; + store: DataStreamsStore; + dataStreamId: string; + start: Date; + end: Date; + resolution: string; + cacheSettings: CacheSettings; +}): RequestInformationAndRange[] => { + // get sorted date ranges that need to be requested + const dateRanges = getDateRangesToRequest({ + store, + dataStreamId, + start, + end, + resolution: parseDuration(resolution), + cacheSettings, + }); + + let requestInformations: RequestInformationAndRange[] = []; + + let fetchFromStartToEnd = request.settings?.fetchFromStartToEnd; + + // convert date ranges to request information + if (fetchFromStartToEnd) { + requestInformations = dateRanges.map(([rangeStart, rangeEnd]) => ({ + start: rangeStart, + end: rangeEnd, + id: dataStreamId, + resolution, + fetchFromStartToEnd, + })); + } + + // fetchMostRecentBeforeEnd if a recent point doesn't exist in the cache, even with no request informations + if ( + request.settings?.fetchMostRecentBeforeEnd && + !checkCacheForRecentPoint({ + store, + dataStreamId, + resolution: parseDuration(resolution), + start: end, + cacheSettings, + }) + ) { + requestInformations.push({ + start, + end, + id: dataStreamId, + resolution, + fetchMostRecentBeforeEnd: true, + }); + } + + // fetch a leading point if needed and there is no recent point in cache before the start date + if ( + request.settings?.fetchMostRecentBeforeStart && + !checkCacheForRecentPoint({ + store, + dataStreamId, + resolution: parseDuration(resolution), + start, + cacheSettings, + }) + ) { + requestInformations.unshift({ + start, + end, + id: dataStreamId, + resolution, + fetchMostRecentBeforeStart: true, + }); + } + + return requestInformations; +}; + const dataPointCompare = (a: DataPoint, b: DataPoint) => { const aTime = a.x; const bTime = b.x; diff --git a/packages/core/src/data-module/data-cache/dataActions.ts b/packages/core/src/data-module/data-cache/dataActions.ts index 490137a3c..334109e49 100755 --- a/packages/core/src/data-module/data-cache/dataActions.ts +++ b/packages/core/src/data-module/data-cache/dataActions.ts @@ -1,8 +1,7 @@ import { Action, Dispatch } from 'redux'; import { DataStreamId, Resolution } from '@synchro-charts/core'; -import { DataStream, TypeOfRequest } from '../types'; +import { DataStream, RequestInformationAndRange } from '../types'; import { ErrorDetails } from '../../common/types'; -import { TimeSeriesDataRequest } from './requestTypes'; /** * @@ -19,15 +18,7 @@ import { TimeSeriesDataRequest } from './requestTypes'; export const REQUEST = 'REQUEST'; export interface RequestData extends Action<'REQUEST'> { type: typeof REQUEST; - payload: { - id: DataStreamId; - resolution: Resolution; - request: TimeSeriesDataRequest; - // the first date of data to fetch, exclusive - first: Date; - // the most recent date of data to fetch, inclusive - last: Date; - }; + payload: RequestInformationAndRange; } export type OnRequest = (payload: RequestData['payload']) => [Date, Date][]; @@ -80,7 +71,7 @@ export interface SuccessResponse extends Action<'SUCCESS'> { data: DataStream; first: Date; last: Date; - typeOfRequest: TypeOfRequest; + requestInformation: RequestInformationAndRange; }; } export const onSuccessAction = ( @@ -88,7 +79,7 @@ export const onSuccessAction = ( data: DataStream, first: Date, last: Date, - typeOfRequest: TypeOfRequest + requestInformation: RequestInformationAndRange ): SuccessResponse => ({ type: SUCCESS, payload: { @@ -96,14 +87,14 @@ export const onSuccessAction = ( data, first, last, - typeOfRequest, + requestInformation, }, }); export const onSuccess = - (id: DataStreamId, data: DataStream, first: Date, last: Date, typeOfRequest: TypeOfRequest) => + (id: DataStreamId, data: DataStream, first: Date, last: Date, requestInformation: RequestInformationAndRange) => (dispatch: Dispatch) => { - dispatch(onSuccessAction(id, data, first, last, typeOfRequest)); + dispatch(onSuccessAction(id, data, first, last, requestInformation)); }; export type AsyncActions = RequestData | ErrorResponse | SuccessResponse; diff --git a/packages/core/src/data-module/data-cache/dataCacheWrapped.spec.ts b/packages/core/src/data-module/data-cache/dataCacheWrapped.spec.ts index d8c693bdb..a5cac09e2 100644 --- a/packages/core/src/data-module/data-cache/dataCacheWrapped.spec.ts +++ b/packages/core/src/data-module/data-cache/dataCacheWrapped.spec.ts @@ -86,10 +86,9 @@ describe('actions', () => { dataCache.onRequest({ id: ID, - resolution: RESOLUTION, - first: new Date(), - last: new Date(), - request: { viewport: { duration: '1m' } }, + resolution: '1s', + start: new Date(), + end: new Date(), }); const state = dataCache.getState() as any; @@ -129,7 +128,21 @@ describe('actions', () => { }; const dataCache = new DataCache(); - dataCache.onSuccess([DATA_STREAM], 'fetchFromStartToEnd', new Date(2000, 0, 0), new Date(2000, 1, 1)); + const start = new Date(2000, 0, 0); + const end = new Date(2000, 1, 1); + + dataCache.onSuccess( + [DATA_STREAM], + { + id: 'some-id', + resolution: '0', + fetchFromStartToEnd: true, + start, + end, + }, + start, + end + ); const state = dataCache.getState() as any; expect(state[DATA_STREAM.id][DATA_STREAM.resolution]).toBeDefined(); diff --git a/packages/core/src/data-module/data-cache/dataCacheWrapped.ts b/packages/core/src/data-module/data-cache/dataCacheWrapped.ts index 49f68e667..43a12a57e 100644 --- a/packages/core/src/data-module/data-cache/dataCacheWrapped.ts +++ b/packages/core/src/data-module/data-cache/dataCacheWrapped.ts @@ -2,13 +2,11 @@ import { Store } from 'redux'; import { Resolution } from '@synchro-charts/core'; import { DataStreamsStore } from './types'; import { configureStore } from './createStore'; -import { TimeSeriesDataRequest } from './requestTypes'; import { onErrorAction, onRequestAction, onSuccessAction } from './dataActions'; -import { viewportEndDate, viewportStartDate } from '../../common/viewport'; import { getDataStreamStore } from './getDataStreamStore'; import { Observable, map, startWith, pairwise, from } from 'rxjs'; import { filter } from 'rxjs/operators'; -import { RequestInformation, DataStream, TypeOfRequest } from '../types'; +import { RequestInformation, DataStream, RequestInformationAndRange } from '../types'; import { toDataStreams } from './toDataStreams'; import { ErrorDetails } from '../../common/types'; @@ -31,14 +29,14 @@ const hasRequestedInformationChanged = ( const getLatestDate = ({ stream, start, - typeOfRequest, + requestInformation, }: { stream: DataStream; end: Date; start: Date; - typeOfRequest: TypeOfRequest; + requestInformation: RequestInformationAndRange; }): Date => { - if (typeOfRequest === 'fetchFromStartToEnd') { + if (requestInformation.fetchFromStartToEnd) { return start; } const lastPoint = stream.data[stream.data.length - 1]?.x; @@ -112,13 +110,18 @@ export class DataCache { * coordinating the dispatching of the action throughout the file. */ - public onSuccess = (dataStreams: DataStream[], typeOfRequest: TypeOfRequest, start: Date, end: Date): void => { + public onSuccess = ( + dataStreams: DataStream[], + requestInformation: RequestInformationAndRange, + start: Date, + end: Date + ): void => { // TODO: `duration` is not an accurate way to determine what _was_ requested. // Need to change then code to utilize the actual start and end date, as utilized by the data source which initiated the request. // For example, if we have queried data for the last day, but it took 1 minute for the query to resolve, we would have the start and the end date // incorrectly offset by one minute with the correct logic. dataStreams.forEach((stream) => { - this.dataCache.dispatch(onSuccessAction(stream.id, stream, start, end, typeOfRequest)); + this.dataCache.dispatch(onSuccessAction(stream.id, stream, start, end, requestInformation)); }); }; @@ -126,19 +129,7 @@ export class DataCache { this.dataCache.dispatch(onErrorAction(id, resolution, error)); }; - public onRequest = ({ - id, - resolution, - first, - last, - request, - }: { - id: string; - resolution: Resolution; - first: Date; - last: Date; - request: TimeSeriesDataRequest; - }): void => { - this.dataCache.dispatch(onRequestAction({ id, resolution, first, last, request })); + public onRequest = (requestInformation: RequestInformationAndRange): void => { + this.dataCache.dispatch(onRequestAction(requestInformation)); }; } diff --git a/packages/core/src/data-module/data-cache/dataReducer.spec.ts b/packages/core/src/data-module/data-cache/dataReducer.spec.ts index 6c2fe43e3..b22abca94 100755 --- a/packages/core/src/data-module/data-cache/dataReducer.spec.ts +++ b/packages/core/src/data-module/data-cache/dataReducer.spec.ts @@ -11,6 +11,7 @@ const FIRST_DATE = new Date(2000, 0, 0); const LAST_DATE = new Date(2001, 0, 0); const DATE_NOW = new Date(2001, 0, 2); +const DATE_BEFORE = new Date(2000, 11, 0); beforeEach(() => { // @ts-ignore @@ -26,10 +27,10 @@ describe('loading status', () => { {}, // Empty original state onRequestAction({ id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, }) ); @@ -37,10 +38,10 @@ describe('loading status', () => { requestState, onRequestAction({ id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, }) ) as any; @@ -59,16 +60,20 @@ describe('loading status', () => { {}, // Empty original state onRequestAction({ id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, }) ); const errorState = dataReducer( requestState, - onErrorAction(ID, RESOLUTION, { msg: 'some-error', type: 'ResourceNotFoundException', status: '404' }) + onErrorAction(ID, RESOLUTION, { + msg: 'some-error', + type: 'ResourceNotFoundException', + status: '404', + }) ) as any; expect(errorState[ID][RESOLUTION]).toEqual( @@ -86,10 +91,10 @@ describe('loading status', () => { {}, // Empty original state onRequestAction({ id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, }) ) as any; @@ -104,15 +109,17 @@ describe('loading status', () => { const ID = 'some-id'; const RESOLUTION = SECOND_IN_MS; + const requestInformation = { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, + }; + const state1 = dataReducer( {}, // Empty original state - onRequestAction({ - id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '10m' }, settings: { fetchFromStartToEnd: true } }, - }) + onRequestAction(requestInformation) ); const state2 = dataReducer( @@ -128,7 +135,7 @@ describe('loading status', () => { }, FIRST_DATE, LAST_DATE, - 'fetchFromStartToEnd' + requestInformation ) ); @@ -136,10 +143,10 @@ describe('loading status', () => { state2, onRequestAction({ id: ID, - resolution: RESOLUTION, - first: new Date(LAST_DATE.getTime() + DAY_IN_MS), - last: new Date(LAST_DATE.getTime() + 2 * DAY_IN_MS), - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: new Date(LAST_DATE.getTime() + DAY_IN_MS), + end: new Date(LAST_DATE.getTime() + 2 * DAY_IN_MS), + fetchFromStartToEnd: true, }) ) as any; @@ -155,15 +162,17 @@ describe('loading status', () => { const ID = 'some-id'; const RESOLUTION = SECOND_IN_MS; + const requestInformation = { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, + }; + const state1 = dataReducer( {}, // Empty original state - onRequestAction({ - id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1m' }, settings: { fetchFromStartToEnd: true } }, - }) + onRequestAction(requestInformation) ); const successState = dataReducer( @@ -179,7 +188,7 @@ describe('loading status', () => { }, FIRST_DATE, LAST_DATE, - 'fetchFromStartToEnd' + requestInformation ) ) as any; @@ -217,10 +226,10 @@ describe('on request', () => { INITIAL_STATE, onRequestAction({ id: ID, - resolution: RESOLUTION, - first: FIRST_DATE, - last: LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchFromStartToEnd: true, }) ); @@ -315,7 +324,12 @@ it('sets the data when a success action occurs with aggregated data', () => { }; const newState = dataReducer( INITIAL_STATE, - onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, 'fetchMostRecentBeforeEnd') + onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + }) ) as any; expect(newState[ID][RESOLUTION]).toEqual( expect.objectContaining({ @@ -374,7 +388,12 @@ it('sets the data when a success action occurs', () => { }; const newState = dataReducer( INITIAL_STATE, - onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, 'fetchMostRecentBeforeStart') + onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + }) ) as any; expect(newState[ID][RESOLUTION]).toEqual( expect.objectContaining({ @@ -400,6 +419,129 @@ it('sets the data when a success action occurs', () => { ); }); +it('sets the data with the correct cache intervals when a success action occurs with fetchMostRecentBeforeStart', () => { + const ID = 'my-id'; + const RESOLUTION = SECOND_IN_MS; + + const INITIAL_STATE = { + [ID]: { + [RESOLUTION]: { + id: ID, + resolution: RESOLUTION, + isLoading: true, + isRefreshing: true, + requestHistory: [], + dataCache: EMPTY_CACHE, + requestCache: EMPTY_CACHE, + }, + }, + }; + + const newDataPoints = [{ x: DATE_BEFORE.getTime(), y: 100 }]; + + const DATA: DataStream = { + id: ID, + name: 'some name', + resolution: RESOLUTION, + aggregates: { + [RESOLUTION]: newDataPoints, + }, + data: [], + dataType: DataType.NUMBER, + }; + const newState = dataReducer( + INITIAL_STATE, + onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchMostRecentBeforeStart: true, + }) + ) as any; + expect(newState[ID][RESOLUTION]).toEqual( + expect.objectContaining({ + id: ID, + resolution: RESOLUTION, + error: undefined, + isLoading: false, + requestHistory: [ + expect.objectContaining({ + end: expect.any(Date), + requestedAt: expect.any(Date), + start: expect.any(Date), + }), + ], + dataCache: { + intervals: [[DATE_BEFORE.getTime(), LAST_DATE.getTime()]], + items: [newDataPoints], + }, + requestCache: expect.objectContaining({ + intervals: [[DATE_BEFORE.getTime(), LAST_DATE.getTime()]], + }), + }) + ); +}); + +it('sets the data with the correct cache intervals when a success action occurs with fetchMostRecentBeforeStart if no data is returned', () => { + const ID = 'my-id'; + const RESOLUTION = SECOND_IN_MS; + + const INITIAL_STATE = { + [ID]: { + [RESOLUTION]: { + id: ID, + resolution: RESOLUTION, + isLoading: true, + isRefreshing: true, + requestHistory: [], + dataCache: EMPTY_CACHE, + requestCache: EMPTY_CACHE, + }, + }, + }; + + const DATA: DataStream = { + id: ID, + name: 'some name', + resolution: RESOLUTION, + data: [], + dataType: DataType.NUMBER, + }; + const newState = dataReducer( + INITIAL_STATE, + onSuccessAction(ID, DATA, FIRST_DATE, LAST_DATE, { + id: ID, + resolution: '1s', + start: FIRST_DATE, + end: LAST_DATE, + fetchMostRecentBeforeStart: true, + }) + ) as any; + expect(newState[ID][RESOLUTION]).toEqual( + expect.objectContaining({ + id: ID, + resolution: RESOLUTION, + error: undefined, + isLoading: false, + requestHistory: [ + expect.objectContaining({ + end: expect.any(Date), + requestedAt: expect.any(Date), + start: expect.any(Date), + }), + ], + dataCache: { + intervals: [[FIRST_DATE.getTime(), LAST_DATE.getTime()]], + items: [[]], + }, + requestCache: expect.objectContaining({ + intervals: [[FIRST_DATE.getTime(), LAST_DATE.getTime()]], + }), + }) + ); +}); + it('merges data into existing data cache', () => { const ID = 'my-id'; @@ -462,10 +604,21 @@ it('merges data into existing data cache', () => { resolution: SECOND_IN_MS, dataType: DataType.NUMBER, }; + + const START_DATE_1 = new Date(2000, 8, 0); + const END_DATE_1 = new Date(DATE_THREE); + const successState = dataReducer( INITIAL_STATE, - onSuccessAction(ID, dataStream, new Date(2000, 8, 0), new Date(DATE_THREE), 'fetchMostRecentBeforeStart') + onSuccessAction(ID, dataStream, START_DATE_1, END_DATE_1, { + id: ID, + resolution: '1s', + start: START_DATE_1, + end: END_DATE_1, + fetchMostRecentBeforeEnd: true, + }) ); + expect(getDataStreamStore(ID, SECOND_IN_MS, successState)).toEqual({ ...getDataStreamStore(ID, SECOND_IN_MS, INITIAL_STATE), isLoading: false, @@ -481,6 +634,51 @@ it('merges data into existing data cache', () => { }), requestHistory: expect.any(Array), }); + + const BEFORE_START_DATA_POINT = { x: new Date(1990, 11, 0).getTime(), y: 500 }; + + const beforeStartDataStream = { + name: 'some name', + id: ID, + aggregates: { + [SECOND_IN_MS]: [BEFORE_START_DATA_POINT], + }, + data: [], + resolution: SECOND_IN_MS, + dataType: DataType.NUMBER, + }; + + const START_DATE_2 = new Date(1999, 0, 0); + const END_DATE_2 = new Date(DATE_ONE); + + const beforeStartSuccessState = dataReducer( + successState, + onSuccessAction(ID, beforeStartDataStream, START_DATE_2, END_DATE_2, { + id: ID, + resolution: '1s', + start: START_DATE_2, + end: END_DATE_2, + fetchMostRecentBeforeStart: true, + }) + ); + + expect(getDataStreamStore(ID, SECOND_IN_MS, beforeStartSuccessState)).toEqual({ + ...getDataStreamStore(ID, SECOND_IN_MS, successState), + isLoading: false, + isRefreshing: false, + id: ID, + error: undefined, + dataCache: { + intervals: [[BEFORE_START_DATA_POINT.x, DATE_FOUR]], + items: [ + [BEFORE_START_DATA_POINT, ...DATA_POINTS_ONE, OLDER_DATA_POINT_2, NEWER_DATA_POINT_1, ...DATA_POINTS_TWO], + ], + }, + requestCache: expect.objectContaining({ + intervals: [[BEFORE_START_DATA_POINT.x, DATE_FOUR]], + }), + requestHistory: expect.any(Array), + }); }); describe('requests to different resolutions', () => { @@ -514,19 +712,19 @@ describe('requests to different resolutions', () => { data: [], dataType: DataType.NUMBER, }; - const requestState = dataReducer( - INITIAL_STATE, - onRequestAction({ - id: ID, - resolution: SECOND_IN_MS / 2, - first: NEW_FIRST_DATE, - last: NEW_LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, - }) - ); + + const requestInformation = { + id: ID, + resolution: '500ms', + start: NEW_FIRST_DATE, + end: NEW_LAST_DATE, + fetchFromStartToEnd: true, + }; + + const requestState = dataReducer(INITIAL_STATE, onRequestAction(requestInformation)); const newState = dataReducer( requestState, - onSuccessAction(ID, DATA, NEW_FIRST_DATE, NEW_LAST_DATE, 'fetchFromStartToEnd') + onSuccessAction(ID, DATA, NEW_FIRST_DATE, NEW_LAST_DATE, requestInformation) ); expect(newState).toEqual({ [ID]: { @@ -576,16 +774,14 @@ describe('requests to different resolutions', () => { const NEW_LAST_DATE = new Date(2001, 0, 0); const RESOLUTION = SECOND_IN_MS / 2; - const requestState = dataReducer( - INITIAL_STATE, - onRequestAction({ - id: ID, - resolution: RESOLUTION, - first: NEW_FIRST_DATE, - last: NEW_LAST_DATE, - request: { viewport: { duration: '1d' }, settings: { fetchFromStartToEnd: true } }, - }) - ); + const requestInformation = { + id: ID, + resolution: '500ms', + start: NEW_FIRST_DATE, + end: NEW_LAST_DATE, + fetchFromStartToEnd: true, + }; + const requestState = dataReducer(INITIAL_STATE, onRequestAction(requestInformation)); const ERROR = { msg: 'error!', type: 'ResourceNotFoundException', status: '404' }; const newState = dataReducer(requestState, onErrorAction(ID, RESOLUTION, ERROR)) as any; diff --git a/packages/core/src/data-module/data-cache/dataReducer.ts b/packages/core/src/data-module/data-cache/dataReducer.ts index 6d2b563ec..5920177fd 100755 --- a/packages/core/src/data-module/data-cache/dataReducer.ts +++ b/packages/core/src/data-module/data-cache/dataReducer.ts @@ -7,6 +7,7 @@ import { addToDataPointCache, EMPTY_CACHE } from './caching/caching'; import { DataStreamsStore } from './types'; import { mergeHistoricalRequests } from './mergeHistoricalRequests'; import { getDataPoints } from '../../common/getDataPoints'; +import { parseDuration } from '../../common/time'; /** * Data Reducer @@ -20,7 +21,7 @@ export const dataReducer: Reducer = ( ): DataStreamsStore => { switch (action.type) { case REQUEST: { - const { id, resolution, first, last, request } = action.payload; + const { id, resolution, start, end, fetchFromStartToEnd } = action.payload; const streamStore = getDataStreamStore(id, resolution, state); const dataCache = streamStore != null ? streamStore.dataCache : EMPTY_CACHE; const requestCache = streamStore != null ? streamStore.requestCache : EMPTY_CACHE; @@ -29,26 +30,28 @@ export const dataReducer: Reducer = ( // We only consider it loading if data has not been requested before, or if it's already loading. const isLoading = streamStore == null || streamStore.isLoading; + const numericResolution = parseDuration(resolution); + return { ...state, [id]: { ...state[id], - [resolution]: { + [numericResolution]: { ...streamStore, - resolution, - requestHistory: request.settings?.fetchFromStartToEnd + resolution: numericResolution, + requestHistory: fetchFromStartToEnd ? mergeHistoricalRequests(existingRequestHistory, { - start: first, - end: last, + start, + end, requestedAt: new Date(Date.now()), // Date.now utilized in this funny way to assist mocking in the unit tests }) : existingRequestHistory, dataCache, - requestCache: request.settings?.fetchFromStartToEnd + requestCache: fetchFromStartToEnd ? addToDataPointCache({ cache: requestCache, - start: first, - end: last, + start, + end, }) : requestCache, id, @@ -60,7 +63,7 @@ export const dataReducer: Reducer = ( } case SUCCESS: { - const { id, data, first, last, typeOfRequest } = action.payload; + const { id, data, first, last, requestInformation } = action.payload; const streamStore = getDataStreamStore(id, data.resolution, state); // Updating request cache is a hack to deal with latest value update // TODO: clean this to one single source of truth cache @@ -68,15 +71,23 @@ export const dataReducer: Reducer = ( // We always want data in ascending order in the cache const sortedData = getDataPoints(data, data.resolution).sort((a, b) => a.x - b.x); + /** * Based on the type of request, determine the actual range requested. * * For instance, when we fetch latest value, we stop looking for data when we find the first point, and potentially seek beyond the start of the viewport. * This must be taken into account. */ + let intervalStart = first; + + // start the interval from the returned data point to avoid over-caching + // if there is no data point it's fine to cache the entire interval + if (requestInformation.fetchMostRecentBeforeStart && sortedData.length > 0) { + intervalStart = new Date(sortedData[0].x); + } const updatedDataCache = addToDataPointCache({ - start: first, + start: intervalStart, end: last, data: sortedData, cache: (streamStore && streamStore.dataCache) || EMPTY_CACHE, @@ -93,18 +104,17 @@ export const dataReducer: Reducer = ( resolution: data.resolution, id, requestHistory: mergeHistoricalRequests(existingRequestHistory, { - start: first, + start: intervalStart, end: last, requestedAt: new Date(Date.now()), // Date.now utilized in this funny way to assist mocking in the unit tests }), - requestCache: - typeOfRequest !== 'fetchFromStartToEnd' - ? addToDataPointCache({ - cache: requestCache, - start: first, - end: last, - }) - : requestCache, + requestCache: !requestInformation.fetchFromStartToEnd + ? addToDataPointCache({ + cache: requestCache, + start: intervalStart, + end: last, + }) + : requestCache, dataCache: updatedDataCache, isLoading: false, isRefreshing: false, diff --git a/packages/core/src/data-module/types.ts b/packages/core/src/data-module/types.ts index edc5b6155..fe3767d54 100644 --- a/packages/core/src/data-module/types.ts +++ b/packages/core/src/data-module/types.ts @@ -17,6 +17,9 @@ export type RequestInformation = { resolution: string; refId?: RefId; cacheSettings?: CacheSettings; + fetchMostRecentBeforeStart?: boolean; + fetchMostRecentBeforeEnd?: boolean; + fetchFromStartToEnd?: boolean; }; export type RequestInformationAndRange = RequestInformation & { start: Date; end: Date }; @@ -53,16 +56,14 @@ export type DataSource = { getRequestsFromQuery: ({ query, request }: { query: Query; request: TimeSeriesDataRequest }) => RequestInformation[]; }; -export type DataStreamCallback = (dataStreams: DataStream[], typeOfRequest: TypeOfRequest) => void; +export type DataStreamCallback = (dataStreams: DataStream[], requestInformation: RequestInformationAndRange) => void; export type OnSuccessCallback = ( dataStreams: DataStream[], - typeOfRequest: TypeOfRequest, + requestInformation: RequestInformationAndRange, start: Date, end: Date ) => void; -export type TypeOfRequest = 'fetchMostRecentBeforeStart' | 'fetchMostRecentBeforeEnd' | 'fetchFromStartToEnd'; - export type QuerySubscription = { queries: Query[]; request: TimeSeriesDataRequest; diff --git a/packages/source-iotsitewise/src/time-series-data/client/client.spec.ts b/packages/source-iotsitewise/src/time-series-data/client/client.spec.ts index d347b2624..3de0b8441 100644 --- a/packages/source-iotsitewise/src/time-series-data/client/client.spec.ts +++ b/packages/source-iotsitewise/src/time-series-data/client/client.spec.ts @@ -42,6 +42,7 @@ describe('getHistoricalPropertyDataPoints', () => { start: startDate, end: endDate, resolution: '0', + fetchFromStartToEnd: true, }, ]; @@ -77,6 +78,7 @@ describe('getHistoricalPropertyDataPoints', () => { start: startDate, end: endDate, resolution: '0', + fetchFromStartToEnd: true, }, ]; @@ -104,7 +106,13 @@ describe('getHistoricalPropertyDataPoints', () => { ], }), ], - 'fetchFromStartToEnd', + expect.objectContaining({ + id: toId({ assetId, propertyId }), + start: startDate, + end: endDate, + resolution: '0', + fetchFromStartToEnd: true, + }), startDate, endDate ); @@ -129,6 +137,7 @@ describe('getLatestPropertyDataPoint', () => { start, end, resolution: '0', + fetchMostRecentBeforeEnd: true, }, ]; @@ -151,7 +160,13 @@ describe('getLatestPropertyDataPoint', () => { ], }), ], - 'fetchMostRecentBeforeEnd', + expect.objectContaining({ + id: toId({ assetId, propertyId }), + start, + end, + resolution: '0', + fetchMostRecentBeforeEnd: true, + }), start, end ); @@ -178,6 +193,7 @@ describe('getLatestPropertyDataPoint', () => { start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, ]; @@ -211,6 +227,7 @@ describe('getAggregatedPropertyDataPoints', () => { start: startDate, end: endDate, resolution, + fetchFromStartToEnd: true, }, ]; @@ -245,6 +262,7 @@ describe('getAggregatedPropertyDataPoints', () => { start: startDate, end: endDate, resolution, + fetchFromStartToEnd: true, }, ]; @@ -284,7 +302,13 @@ describe('getAggregatedPropertyDataPoints', () => { }, }), ], - 'fetchFromStartToEnd', + expect.objectContaining({ + id: toId({ assetId, propertyId }), + start: startDate, + end: endDate, + resolution, + fetchFromStartToEnd: true, + }), startDate, endDate ); diff --git a/packages/source-iotsitewise/src/time-series-data/client/client.ts b/packages/source-iotsitewise/src/time-series-data/client/client.ts index 0d4ff0364..a4a2c178c 100644 --- a/packages/source-iotsitewise/src/time-series-data/client/client.ts +++ b/packages/source-iotsitewise/src/time-series-data/client/client.ts @@ -19,36 +19,28 @@ export class SiteWiseClient { return getLatestPropertyDataPoint({ client: this.siteWiseSdk, ...options }); } - async getHistoricalPropertyDataPoints(options: { + getHistoricalPropertyDataPoints(options: { requestInformations: RequestInformationAndRange[]; maxResults?: number; onError: ErrorCallback; onSuccess: OnSuccessCallback; }): Promise { - return getHistoricalPropertyDataPoints({ client: this.siteWiseSdk, ...options }); + return getHistoricalPropertyDataPoints({ + client: this.siteWiseSdk, + ...options, + }); } - async getMostRecentPropertyDataPointBeforeDate(options: { - requestInformations: RequestInformationAndRange[]; - date: Date; - onError: ErrorCallback; - onSuccess: OnSuccessCallback; - }): Promise { - const requestInformations = options.requestInformations.map((info) => ({ - ...info, - start: new Date(0, 0, 0), - end: options.date, - })); - return getHistoricalPropertyDataPoints({ client: this.siteWiseSdk, ...options, requestInformations }); - } - - async getAggregatedPropertyDataPoints(options: { + getAggregatedPropertyDataPoints(options: { requestInformations: RequestInformationAndRange[]; aggregateTypes: AggregateType[]; maxResults?: number; onError: ErrorCallback; onSuccess: OnSuccessCallback; }): Promise { - return getAggregatedPropertyDataPoints({ client: this.siteWiseSdk, ...options }); + return getAggregatedPropertyDataPoints({ + client: this.siteWiseSdk, + ...options, + }); } } diff --git a/packages/source-iotsitewise/src/time-series-data/client/getAggregatedPropertyDataPoints.ts b/packages/source-iotsitewise/src/time-series-data/client/getAggregatedPropertyDataPoints.ts index 306a76643..50095b91f 100644 --- a/packages/source-iotsitewise/src/time-series-data/client/getAggregatedPropertyDataPoints.ts +++ b/packages/source-iotsitewise/src/time-series-data/client/getAggregatedPropertyDataPoints.ts @@ -13,11 +13,9 @@ import { isDefined } from '../../common/predicates'; import { dataStreamFromSiteWise } from '../dataStreamFromSiteWise'; const getAggregatedPropertyDataPointsForProperty = ({ + requestInformation, assetId, propertyId, - start, - end, - resolution, aggregateTypes, maxResults, onSuccess, @@ -25,11 +23,9 @@ const getAggregatedPropertyDataPointsForProperty = ({ nextToken: prevToken, client, }: { + requestInformation: RequestInformationAndRange; assetId: AssetId; propertyId: AssetPropertyId; - start: Date; - end: Date; - resolution: string; aggregateTypes: AggregateType[]; maxResults?: number; onError: ErrorCallback; @@ -37,6 +33,15 @@ const getAggregatedPropertyDataPointsForProperty = ({ client: IoTSiteWiseClient; nextToken?: string; }) => { + let { start, end } = requestInformation; + const { resolution } = requestInformation; + + // fetch leading point without mutating requestInformation + if (requestInformation.fetchMostRecentBeforeStart) { + end = start; + start = new Date(0, 0, 0); + } + return client .send( new GetAssetPropertyAggregatesCommand({ @@ -46,7 +51,7 @@ const getAggregatedPropertyDataPointsForProperty = ({ endDate: end, resolution, aggregateTypes, - maxResults, + maxResults: requestInformation.fetchMostRecentBeforeStart ? 1 : maxResults, timeOrdering: TimeOrdering.DESCENDING, nextToken: prevToken, }) @@ -67,19 +72,17 @@ const getAggregatedPropertyDataPointsForProperty = ({ resolution: RESOLUTION_TO_MS_MAPPING[resolution], }), ], - 'fetchFromStartToEnd', + requestInformation, start, end ); } - if (nextToken) { + if (nextToken && !requestInformation.fetchMostRecentBeforeStart) { getAggregatedPropertyDataPointsForProperty({ + requestInformation, assetId, propertyId, - start, - end, - resolution, aggregateTypes, maxResults, onError, @@ -117,16 +120,14 @@ export const getAggregatedPropertyDataPoints = async ({ const requests = requestInformations .filter(({ resolution }) => resolution !== '0') .sort((a, b) => b.start.getTime() - a.start.getTime()) - .map(({ id, start, end, resolution }) => { - const { assetId, propertyId } = toSiteWiseAssetProperty(id); + .map((requestInformation) => { + const { assetId, propertyId } = toSiteWiseAssetProperty(requestInformation.id); return getAggregatedPropertyDataPointsForProperty({ + requestInformation, client, assetId, propertyId, - start, - end, - resolution, aggregateTypes, maxResults, onSuccess, diff --git a/packages/source-iotsitewise/src/time-series-data/client/getHistoricalPropertyDataPoints.ts b/packages/source-iotsitewise/src/time-series-data/client/getHistoricalPropertyDataPoints.ts index b18e2c00a..57dbac67b 100644 --- a/packages/source-iotsitewise/src/time-series-data/client/getHistoricalPropertyDataPoints.ts +++ b/packages/source-iotsitewise/src/time-series-data/client/getHistoricalPropertyDataPoints.ts @@ -7,26 +7,32 @@ import { toId, toSiteWiseAssetProperty } from '../util/dataStreamId'; import { isDefined } from '../../common/predicates'; const getHistoricalPropertyDataPointsForProperty = ({ + requestInformation, assetId, propertyId, - start, - end, maxResults, onSuccess, onError, nextToken: prevToken, client, }: { + requestInformation: RequestInformationAndRange; assetId: AssetId; propertyId: AssetPropertyId; - start: Date; - end: Date; maxResults?: number; onError: ErrorCallback; onSuccess: OnSuccessCallback; client: IoTSiteWiseClient; nextToken?: string; }) => { + let { start, end } = requestInformation; + + // fetch leading point without mutating requestInformation + if (requestInformation.fetchMostRecentBeforeStart) { + end = start; + start = new Date(0, 0, 0); + } + return client .send( new GetAssetPropertyValueHistoryCommand({ @@ -34,7 +40,7 @@ const getHistoricalPropertyDataPointsForProperty = ({ propertyId, startDate: start, endDate: end, - maxResults, + maxResults: requestInformation.fetchMostRecentBeforeStart ? 1 : maxResults, timeOrdering: TimeOrdering.DESCENDING, nextToken: prevToken, }) @@ -48,15 +54,14 @@ const getHistoricalPropertyDataPointsForProperty = ({ .map((assetPropertyValue) => toDataPoint(assetPropertyValue)) .filter(isDefined); - onSuccess([dataStreamFromSiteWise({ assetId, propertyId, dataPoints })], 'fetchFromStartToEnd', start, end); + onSuccess([dataStreamFromSiteWise({ assetId, propertyId, dataPoints })], requestInformation, start, end); } - if (nextToken) { + if (nextToken && !requestInformation.fetchMostRecentBeforeStart) { getHistoricalPropertyDataPointsForProperty({ + requestInformation, assetId, propertyId, - start, - end, maxResults, onError, onSuccess, @@ -92,15 +97,14 @@ export const getHistoricalPropertyDataPoints = async ({ const requests = requestInformations .filter(({ resolution }) => resolution === '0') .sort((a, b) => b.start.getTime() - a.start.getTime()) - .map(({ id, start, end }) => { - const { assetId, propertyId } = toSiteWiseAssetProperty(id); + .map((requestInformation) => { + const { assetId, propertyId } = toSiteWiseAssetProperty(requestInformation.id); return getHistoricalPropertyDataPointsForProperty({ + requestInformation, client, assetId, propertyId, - start, - end, maxResults, onSuccess, onError, diff --git a/packages/source-iotsitewise/src/time-series-data/client/getLatestPropertyDataPoint.ts b/packages/source-iotsitewise/src/time-series-data/client/getLatestPropertyDataPoint.ts index a24e41c1b..0e10bc3d9 100644 --- a/packages/source-iotsitewise/src/time-series-data/client/getLatestPropertyDataPoint.ts +++ b/packages/source-iotsitewise/src/time-series-data/client/getLatestPropertyDataPoint.ts @@ -18,17 +18,20 @@ export const getLatestPropertyDataPoint = async ({ }): Promise => { const end = new Date(); const requests = requestInformations - .filter(({ resolution }) => resolution === '0') + .filter(({ resolution, fetchMostRecentBeforeEnd }) => resolution === '0' && fetchMostRecentBeforeEnd) .sort((a, b) => b.start.getTime() - a.start.getTime()) - .map(({ id, start, end }) => { - const { assetId, propertyId } = toSiteWiseAssetProperty(id); + .map((requestInformation) => { + const { assetId, propertyId } = toSiteWiseAssetProperty(requestInformation.id); return client .send(new GetAssetPropertyValueCommand({ assetId, propertyId })) .then((res) => ({ - dataPoints: [toDataPoint(res.propertyValue)].filter(isDefined), - assetId, - propertyId, + siteWiseData: { + dataPoints: [toDataPoint(res.propertyValue)].filter(isDefined), + assetId, + propertyId, + }, + requestInformation, })) .catch((err) => { const dataStreamId = toId({ assetId, propertyId }); @@ -45,11 +48,14 @@ export const getLatestPropertyDataPoint = async ({ await Promise.all(requests).then((results) => { results .filter(isDefined) - .map(dataStreamFromSiteWise) - .forEach((dataStream) => { + .map(({ siteWiseData, requestInformation }) => ({ + dataStream: dataStreamFromSiteWise(siteWiseData), + requestInformation, + })) + .forEach(({ dataStream, requestInformation }) => { const lastDataPoint = dataStream.data.slice(-1)[0]; const start = lastDataPoint ? new Date(lastDataPoint.x) : new Date(0, 0, 0); - onSuccess([dataStream], 'fetchMostRecentBeforeEnd', start, end); + onSuccess([dataStream], requestInformation, start, end); }); }); } catch { diff --git a/packages/source-iotsitewise/src/time-series-data/data-source.spec.ts b/packages/source-iotsitewise/src/time-series-data/data-source.spec.ts index cdf8fb082..5b34eb9b7 100644 --- a/packages/source-iotsitewise/src/time-series-data/data-source.spec.ts +++ b/packages/source-iotsitewise/src/time-series-data/data-source.spec.ts @@ -27,10 +27,23 @@ const LAST_MINUTE_REQUEST: TimeSeriesDataRequest = { }, }; +const historicalRequestStart = new Date(2010, 0, 0); +const historicalRequestEnd = new Date(2011, 0, 0); + +const HISTORICAL_REQUEST_BEFORE_START: TimeSeriesDataRequest = { + viewport: { + start: historicalRequestStart, + end: historicalRequestEnd, + }, + settings: { + fetchMostRecentBeforeStart: true, + }, +}; + const HISTORICAL_REQUEST: TimeSeriesDataRequest = { viewport: { - start: new Date(2010, 0, 0), - end: new Date(2011, 0, 0), + start: historicalRequestStart, + end: historicalRequestEnd, }, settings: { fetchFromStartToEnd: true, @@ -112,6 +125,7 @@ describe('initiateRequest', () => { start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, ] ); @@ -167,6 +181,7 @@ describe('initiateRequest', () => { start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, ] ); @@ -224,12 +239,14 @@ describe('initiateRequest', () => { start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, { id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_2 }), start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, ] ); @@ -280,12 +297,14 @@ describe('initiateRequest', () => { start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, { id: toId({ assetId: ASSET_2, propertyId: PROPERTY_2 }), start: new Date(), end: new Date(), resolution: '0', + fetchMostRecentBeforeEnd: true, }, ] ); @@ -303,6 +322,198 @@ describe('initiateRequest', () => { }); }); }); + + describe('fetch latest before start', () => { + it('gets latest value before start for multiple properties', () => { + const getAssetPropertyValueHistory = jest.fn().mockResolvedValue(ASSET_PROPERTY_DOUBLE_VALUE); + + const mockSDK = createMockSiteWiseSDK({ getAssetPropertyValueHistory }); + + const dataSource = createDataSource(mockSDK); + + const ASSET_ID = 'some-asset-id'; + const PROPERTY_1 = 'prop-1'; + const PROPERTY_2 = 'prop-2'; + + const query: SiteWiseDataStreamQuery = { + source: SITEWISE_DATA_SOURCE, + assets: [{ assetId: ASSET_ID, properties: [{ propertyId: PROPERTY_1 }, { propertyId: PROPERTY_2 }] }], + }; + + dataSource.initiateRequest( + { + onError: noop, + onSuccess: noop, + query, + request: HISTORICAL_REQUEST_BEFORE_START, + }, + [ + { + id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_1 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + { + id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_2 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + ] + ); + + expect(getAssetPropertyValueHistory).toBeCalledTimes(2); + + expect(getAssetPropertyValueHistory).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_ID, + propertyId: PROPERTY_1, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + + expect(getAssetPropertyValueHistory).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_ID, + propertyId: PROPERTY_2, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + }); + + it('gets latest value before start for multiple assets', () => { + const getAssetPropertyValueHistory = jest.fn().mockResolvedValue(ASSET_PROPERTY_DOUBLE_VALUE); + + const mockSDK = createMockSiteWiseSDK({ getAssetPropertyValueHistory }); + + const dataSource = createDataSource(mockSDK); + + const ASSET_1 = 'asset-1'; + const ASSET_2 = 'asset-2'; + const PROPERTY_1 = 'prop-1'; + const PROPERTY_2 = 'prop-2'; + + const query: SiteWiseDataStreamQuery = { + source: SITEWISE_DATA_SOURCE, + assets: [ + { assetId: ASSET_1, properties: [{ propertyId: PROPERTY_1 }] }, + { assetId: ASSET_2, properties: [{ propertyId: PROPERTY_2 }] }, + ], + }; + + dataSource.initiateRequest( + { + onError: noop, + onSuccess: noop, + query, + request: HISTORICAL_REQUEST_BEFORE_START, + }, + [ + { + id: toId({ assetId: ASSET_1, propertyId: PROPERTY_1 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + { + id: toId({ assetId: ASSET_2, propertyId: PROPERTY_2 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '0', + fetchMostRecentBeforeStart: true, + }, + ] + ); + + expect(getAssetPropertyValueHistory).toBeCalledTimes(2); + + expect(getAssetPropertyValueHistory).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_1, + propertyId: PROPERTY_1, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + + expect(getAssetPropertyValueHistory).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_2, + propertyId: PROPERTY_2, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + }); + + it('gets latest value before start for aggregates', () => { + const getAssetPropertyAggregates = jest.fn().mockResolvedValue(ASSET_PROPERTY_DOUBLE_VALUE); + + const mockSDK = createMockSiteWiseSDK({ getAssetPropertyAggregates }); + + const dataSource = createDataSource(mockSDK); + + const ASSET_ID = 'some-asset-id'; + const PROPERTY_1 = 'prop-1'; + const PROPERTY_2 = 'prop-2'; + + const query: SiteWiseDataStreamQuery = { + source: SITEWISE_DATA_SOURCE, + assets: [{ assetId: ASSET_ID, properties: [{ propertyId: PROPERTY_1 }, { propertyId: PROPERTY_2 }] }], + }; + + dataSource.initiateRequest( + { + onError: noop, + onSuccess: noop, + query, + request: HISTORICAL_REQUEST_BEFORE_START, + }, + [ + { + id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_1 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '1h', + fetchMostRecentBeforeStart: true, + }, + { + id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_2 }), + start: historicalRequestStart, + end: historicalRequestEnd, + resolution: '1h', + fetchMostRecentBeforeStart: true, + }, + ] + ); + + expect(getAssetPropertyAggregates).toBeCalledTimes(2); + + expect(getAssetPropertyAggregates).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_ID, + propertyId: PROPERTY_1, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + + expect(getAssetPropertyAggregates).toBeCalledWith( + expect.objectContaining({ + assetId: ASSET_ID, + propertyId: PROPERTY_2, + startDate: new Date(0, 0, 0), + endDate: historicalRequestStart, + }) + ); + }); + }); }); it('requests raw data if specified per asset property', async () => { @@ -357,6 +568,7 @@ it('requests raw data if specified per asset property', async () => { start, end, resolution: '0', + fetchFromStartToEnd: true, }, ] ); @@ -391,7 +603,13 @@ it('requests raw data if specified per asset property', async () => { resolution: 0, }), ], - 'fetchFromStartToEnd', + { + id: toId({ assetId: 'some-asset-id', propertyId: 'some-property-id' }), + start, + end, + resolution: '0', + fetchFromStartToEnd: true, + }, start, end ); @@ -669,6 +887,7 @@ describe.skip('aggregated data', () => { start: new Date(), end: new Date(), resolution: '0', + fetchFromStartToEnd: true, }, ] ); @@ -765,24 +984,28 @@ describe.skip('aggregated data', () => { start: new Date(), end: new Date(), resolution: '0', + fetchFromStartToEnd: true, }, { id: toId({ assetId: 'some-asset-id', propertyId: 'some-property-id2' }), start: new Date(), end: new Date(), resolution: '0', + fetchFromStartToEnd: true, }, { id: toId({ assetId: 'some-asset-id2', propertyId: 'some-property-id' }), start: new Date(), end: new Date(), resolution: '0', + fetchFromStartToEnd: true, }, { id: toId({ assetId: 'some-asset-id2', propertyId: 'some-property-id2' }), start: new Date(), end: new Date(), resolution: '0', + fetchFromStartToEnd: true, }, ] ); diff --git a/packages/source-iotsitewise/src/time-series-data/data-source.ts b/packages/source-iotsitewise/src/time-series-data/data-source.ts index 01a77f294..b5d8719df 100644 --- a/packages/source-iotsitewise/src/time-series-data/data-source.ts +++ b/packages/source-iotsitewise/src/time-series-data/data-source.ts @@ -64,30 +64,17 @@ export const createDataSource = (siteWise: IoTSiteWiseClient): DataSource { - const requests = []; - - if (request.settings?.fetchMostRecentBeforeEnd) { - requests.push(client.getLatestPropertyDataPoint({ onSuccess, onError, requestInformations })); - } - - if (request.settings?.fetchFromStartToEnd) { - const aggregateTypes = [AggregateType.AVERAGE]; - - requests.push( - client.getAggregatedPropertyDataPoints({ - requestInformations, - onSuccess, - onError, - aggregateTypes, - }) - ); - - requests.push(client.getHistoricalPropertyDataPoints({ requestInformations, onSuccess, onError })); - } - - return Promise.all(requests); - }, + initiateRequest: ({ onSuccess, onError }, requestInformations) => + Promise.all([ + client.getLatestPropertyDataPoint({ onSuccess, onError, requestInformations }), + client.getAggregatedPropertyDataPoints({ + requestInformations, + onSuccess, + onError, + aggregateTypes: [AggregateType.AVERAGE], + }), + client.getHistoricalPropertyDataPoints({ requestInformations, onSuccess, onError }), + ]), getRequestsFromQuery: ({ query, request }) => { const resolution = determineResolution({ resolution: request.settings?.resolution,