From b5bd2dfb684cb4a4e6732a507f134c1b5357513b Mon Sep 17 00:00:00 2001 From: godind Date: Wed, 20 Mar 2024 21:31:39 -0400 Subject: [PATCH 1/3] fix edit registration --- src/app/core/services/data-set.service.ts | 30 ++++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/app/core/services/data-set.service.ts b/src/app/core/services/data-set.service.ts index dd532056..f98d360a 100644 --- a/src/app/core/services/data-set.service.ts +++ b/src/app/core/services/data-set.service.ts @@ -136,27 +136,32 @@ private setupServiceRegistry(uuid: string): void { // Get _historicalDataset data setup this.setDatasetConfigurationOptions(configuration); - // Cleanup existing _historicalDataset if present. - const dsIndex = this._svcDataSource. findIndex(dataSub => dataSub.uuid == uuid); + + let dataSource: IDatasetServiceDataSource = null; + + // Check if dataSource is already present + const dsIndex = this._svcDataSource.findIndex(dataSub => dataSub.uuid == uuid); if (dsIndex >= 0) { - this.stop(uuid); + dataSource = this._svcDataSource[dsIndex]; + } else { + // Add a new DataSource + dataSource = this._svcDataSource[ + this._svcDataSource.push({ + uuid: uuid, + _pathObserverSubscription: null, + _historicalDataset: [] + }) - 1 + ]; } - // Add a fresh _historicalDataset - const dataSource: IDatasetServiceDataSource = this._svcDataSource[ - this._svcDataSource.push({ - uuid: uuid, - _pathObserverSubscription: null, - _historicalDataset: [] - }) - 1 - ]; - console.log(`[Dataset Service] Starting Dataset recording process: ${configuration.uuid}`); console.log(`[Dataset Service] Path: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Datapoints: ${configuration.maxDataPoints}, Period: ${configuration.period}`); // Subscribe to path data and update _historicalDataset upon reception dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleTime(configuration.sampleTime)).subscribe( (newValue: pathRegistrationValue) => { + let d = new Date(); + console.warn(`value = ${newValue.value} - ${d.getSeconds()}' ${d.getMilliseconds()}"`) if (newValue.value === null) return; // we don't need null values // Keep the array to specified size before adding new value @@ -186,6 +191,7 @@ private setupServiceRegistry(uuid: string): void { const dataSource = this._svcDataSource.find(d => d.uuid == uuid); console.log(`[Dataset Service] Stopping Dataset ${uuid} data capture`); dataSource._pathObserverSubscription.unsubscribe(); + dataSource._historicalDataset = []; } /** From 37d8bf67b14de116e63aa7006d627c2d3209c26a Mon Sep 17 00:00:00 2001 From: godind Date: Thu, 21 Mar 2024 14:36:31 -0400 Subject: [PATCH 2/3] With interval working --- src/app/core/services/data-set.service.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/app/core/services/data-set.service.ts b/src/app/core/services/data-set.service.ts index f98d360a..037380db 100644 --- a/src/app/core/services/data-set.service.ts +++ b/src/app/core/services/data-set.service.ts @@ -1,5 +1,5 @@ import { Injectable } from '@angular/core'; -import { Subscription, Observable, sampleTime,ReplaySubject } from 'rxjs'; +import { Subscription, Observable, sampleTime,ReplaySubject, MonoTypeOperatorFunction, interval, map, switchMap, pipe, withLatestFrom, tap } from 'rxjs'; import { AppSettingsService } from './app-settings.service'; import { SignalKService, pathRegistrationValue } from './signalk.service'; import { UUID } from'../../utils/uuid' @@ -157,11 +157,18 @@ private setupServiceRegistry(uuid: string): void { console.log(`[Dataset Service] Starting Dataset recording process: ${configuration.uuid}`); console.log(`[Dataset Service] Path: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Datapoints: ${configuration.maxDataPoints}, Period: ${configuration.period}`); + function sampleInterval(period: number): MonoTypeOperatorFunction { + // return switchMap((value) => { + // console.log(source); + return (source) => interval(period).pipe(withLatestFrom(source, (_, value) => value)); + }; + + // Subscribe to path data and update _historicalDataset upon reception - dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleTime(configuration.sampleTime)).subscribe( + dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleInterval(configuration.sampleTime)).subscribe( (newValue: pathRegistrationValue) => { let d = new Date(); - console.warn(`value = ${newValue.value} - ${d.getSeconds()}' ${d.getMilliseconds()}"`) + console.log(`value = ${newValue.value} - ${d.getSeconds()}' ${d.getMilliseconds()}"`) if (newValue.value === null) return; // we don't need null values // Keep the array to specified size before adding new value From 07f8f4b307df5c4889df98ac5d10dfa32c52d881 Mon Sep 17 00:00:00 2001 From: godind Date: Thu, 21 Mar 2024 15:30:29 -0400 Subject: [PATCH 3/3] Final --- src/app/core/services/data-set.service.ts | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/app/core/services/data-set.service.ts b/src/app/core/services/data-set.service.ts index 037380db..bb24f27a 100644 --- a/src/app/core/services/data-set.service.ts +++ b/src/app/core/services/data-set.service.ts @@ -1,5 +1,5 @@ import { Injectable } from '@angular/core'; -import { Subscription, Observable, sampleTime,ReplaySubject, MonoTypeOperatorFunction, interval, map, switchMap, pipe, withLatestFrom, tap } from 'rxjs'; +import { Subscription, Observable, ReplaySubject, MonoTypeOperatorFunction, interval, withLatestFrom } from 'rxjs'; import { AppSettingsService } from './app-settings.service'; import { SignalKService, pathRegistrationValue } from './signalk.service'; import { UUID } from'../../utils/uuid' @@ -112,14 +112,13 @@ private setupServiceRegistry(uuid: string): void { * Starts the recording process of a Data Source. It firsts reads the _historicalDataset configuration, * then starts building the _historicalDataset values, and pushes them to the Subject. * - * This method handles the process that takes SK data and feed the Subject. _historicalDataset "clients", - * ie. widgets, will use the getDatasetObservable() method to receive data from the Subject. + * This method handles the process that takes SK data and feeds the Subject. Clients/Observers, + * (widgets mostly), will use the getDatasetObservable() method to receive data from the Subject. * * Concept: SK_path_values -> datasource -> (ReplaySubject) <- Widget observers * - * Once a datasource is started, ReplaySubject subscribers - * (widgets) will receive _historicalDataset data updates. - * . + * Once a datasource is started, subscribers will receive historical data (equal to the + * length of the dataset)pushed to the Subject, as as future data. * * @private * @param {string} uuid The UUID of the DataSource to start @@ -135,8 +134,6 @@ private setupServiceRegistry(uuid: string): void { // Get _historicalDataset data setup this.setDatasetConfigurationOptions(configuration); - - let dataSource: IDatasetServiceDataSource = null; // Check if dataSource is already present @@ -157,9 +154,8 @@ private setupServiceRegistry(uuid: string): void { console.log(`[Dataset Service] Starting Dataset recording process: ${configuration.uuid}`); console.log(`[Dataset Service] Path: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Datapoints: ${configuration.maxDataPoints}, Period: ${configuration.period}`); + // Emit at a regular interval using the last value. We use this and not sampleTime() to make sure that if there is no new data, we still send the last know value. This is to prevent dataset blanks that look ugly on the chart function sampleInterval(period: number): MonoTypeOperatorFunction { - // return switchMap((value) => { - // console.log(source); return (source) => interval(period).pipe(withLatestFrom(source, (_, value) => value)); }; @@ -167,8 +163,6 @@ private setupServiceRegistry(uuid: string): void { // Subscribe to path data and update _historicalDataset upon reception dataSource._pathObserverSubscription = this.signalk.subscribePath(configuration.uuid, configuration.path, configuration.pathSource).pipe(sampleInterval(configuration.sampleTime)).subscribe( (newValue: pathRegistrationValue) => { - let d = new Date(); - console.log(`value = ${newValue.value} - ${d.getSeconds()}' ${d.getMilliseconds()}"`) if (newValue.value === null) return; // we don't need null values // Keep the array to specified size before adding new value