diff --git a/src/app/core/services/data-set.service.ts b/src/app/core/services/data-set.service.ts index dd532056..bb24f27a 100644 --- a/src/app/core/services/data-set.service.ts +++ b/src/app/core/services/data-set.service.ts @@ -1,5 +1,5 @@ import { Injectable } from '@angular/core'; -import { Subscription, Observable, sampleTime,ReplaySubject } from 'rxjs'; +import { Subscription, Observable, ReplaySubject, MonoTypeOperatorFunction, interval, withLatestFrom } from 'rxjs'; import { AppSettingsService } from './app-settings.service'; import { SignalKService, pathRegistrationValue } from './signalk.service'; import { UUID } from'../../utils/uuid' @@ -112,14 +112,13 @@ private setupServiceRegistry(uuid: string): void { * Starts the recording process of a Data Source. It firsts reads the _historicalDataset configuration, * then starts building the _historicalDataset values, and pushes them to the Subject. * - * This method handles the process that takes SK data and feed the Subject. _historicalDataset "clients", - * ie. widgets, will use the getDatasetObservable() method to receive data from the Subject. + * This method handles the process that takes SK data and feeds the Subject. Clients/Observers, + * (widgets mostly), will use the getDatasetObservable() method to receive data from the Subject. * * Concept: SK_path_values -> datasource -> (ReplaySubject) <- Widget observers * - * Once a datasource is started, ReplaySubject subscribers - * (widgets) will receive _historicalDataset data updates. - * . + * Once a datasource is started, subscribers will receive historical data (equal to the + * length of the dataset)pushed to the Subject, as as future data. * * @private * @param {string} uuid The UUID of the DataSource to start @@ -135,27 +134,34 @@ private setupServiceRegistry(uuid: string): void { // Get _historicalDataset data setup this.setDatasetConfigurationOptions(configuration); + let dataSource: IDatasetServiceDataSource = null; - // Cleanup existing _historicalDataset if present. - const dsIndex = this._svcDataSource. findIndex(dataSub => dataSub.uuid == uuid); + // Check if dataSource is already present + const dsIndex = this._svcDataSource.findIndex(dataSub => dataSub.uuid == uuid); if (dsIndex >= 0) { - this.stop(uuid); + dataSource = this._svcDataSource[dsIndex]; + } else { + // Add a new DataSource + dataSource = this._svcDataSource[ + this._svcDataSource.push({ + uuid: uuid, + _pathObserverSubscription: null, + _historicalDataset: [] + }) - 1 + ]; } - // Add a fresh _historicalDataset - const dataSource: IDatasetServiceDataSource = this._svcDataSource[ - this._svcDataSource.push({ - uuid: uuid, - _pathObserverSubscription: null, - _historicalDataset: [] - }) - 1 - ]; - console.log(`[Dataset Service] Starting Dataset recording process: ${configuration.uuid}`); console.log(`[Dataset Service] Path: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Datapoints: ${configuration.maxDataPoints}, Period: ${configuration.period}`); + // Emit at a regular interval using the last value. We use this and not sampleTime() to make sure that if there is no new data, we still send the last know value. This is to prevent dataset blanks that look ugly on the chart + function sampleInterval(period: number): MonoTypeOperatorFunction { + return (source) => interval(period).pipe(withLatestFrom(source, (_, value) => value)); + }; + + // Subscribe to path data and update _historicalDataset upon reception - dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleTime(configuration.sampleTime)).subscribe( + dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleInterval(configuration.sampleTime)).subscribe( (newValue: pathRegistrationValue) => { if (newValue.value === null) return; // we don't need null values @@ -186,6 +192,7 @@ private setupServiceRegistry(uuid: string): void { const dataSource = this._svcDataSource.find(d => d.uuid == uuid); console.log(`[Dataset Service] Stopping Dataset ${uuid} data capture`); dataSource._pathObserverSubscription.unsubscribe(); + dataSource._historicalDataset = []; } /**