From 1a91a0967975968d0bb7ea391864024f84e2af5b Mon Sep 17 00:00:00 2001 From: Piotr Czarnas Date: Thu, 17 Oct 2024 22:21:47 +0200 Subject: [PATCH] Memory optimizations when loading historic data for anomaly detection. --- .../factory/SensorReadoutsColumnNames.java | 13 +++ .../snapshot/SensorReadoutsSnapshot.java | 32 +----- .../snapshot/SensorReadoutsTimeSeriesMap.java | 108 +++++++++++++----- .../TableCheckExecutionServiceImpl.java | 39 ++++--- 4 files changed, 115 insertions(+), 77 deletions(-) diff --git a/dqops/src/main/java/com/dqops/data/readouts/factory/SensorReadoutsColumnNames.java b/dqops/src/main/java/com/dqops/data/readouts/factory/SensorReadoutsColumnNames.java index 45cb7f4233..783bf2f82b 100644 --- a/dqops/src/main/java/com/dqops/data/readouts/factory/SensorReadoutsColumnNames.java +++ b/dqops/src/main/java/com/dqops/data/readouts/factory/SensorReadoutsColumnNames.java @@ -227,6 +227,7 @@ public class SensorReadoutsColumnNames { COLUMN_NAME_COLUMN_NAME, DATA_GROUP_NAME_COLUMN_NAME, + DATA_GROUP_HASH_COLUMN_NAME, TABLE_COMPARISON_NAME_COLUMN_NAME, DURATION_MS_COLUMN_NAME, @@ -259,4 +260,16 @@ public class SensorReadoutsColumnNames { DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "8", DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "9" }; + + /** + * A list of a minimum set of columns from sensor readouts that are used to feed time series sensors with historical data. + */ + public static final String[] SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA = new String[] { + TIME_PERIOD_COLUMN_NAME, + TIME_PERIOD_UTC_COLUMN_NAME, + CHECK_HASH_COLUMN_NAME, + DATA_GROUP_HASH_COLUMN_NAME, + ACTUAL_VALUE_COLUMN_NAME, + EXPECTED_VALUE_COLUMN_NAME + }; } diff --git a/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsSnapshot.java b/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsSnapshot.java index 5cae6eab3a..9e5725c08a 100644 --- a/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsSnapshot.java +++ b/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsSnapshot.java @@ -19,10 +19,7 @@ import com.dqops.core.principal.UserDomainIdentity; import com.dqops.core.synchronization.contract.DqoRoot; import com.dqops.data.readouts.factory.SensorReadoutsColumnNames; -import com.dqops.data.storage.FileStorageSettings; -import com.dqops.data.storage.ParquetPartitionStorageService; -import com.dqops.data.storage.TableDataSnapshot; -import com.dqops.data.storage.TablePartitioningPattern; +import com.dqops.data.storage.*; import com.dqops.metadata.sources.PhysicalTableName; import com.dqops.utils.reflection.ObjectMemorySizeUtility; import com.dqops.utils.tables.TableColumnUtility; @@ -39,7 +36,6 @@ */ public class SensorReadoutsSnapshot extends TableDataSnapshot { public static String PARQUET_FILE_NAME = "sensor_readout.0.parquet"; - public static boolean ENABLE_PRE_FILLING_TIME_SERIES_CACHE = false; private SensorReadoutsTimeSeriesMap timeSeriesMap; /** @@ -101,30 +97,8 @@ public SensorReadoutsTimeSeriesMap getHistoricReadoutsTimeSeries() { return this.timeSeriesMap; } - Table allLoadedData = this.getAllData(); - this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(), allLoadedData); - - if (allLoadedData != null && ENABLE_PRE_FILLING_TIME_SERIES_CACHE) { - // THIS SECTION is disabled for the moment in favor of using an index and searching for time series on demand - - TableSliceGroup tableSlices = allLoadedData.splitOn(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME, - SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME); - - for (TableSlice tableSlice : tableSlices) { - Table timeSeriesTable = tableSlice.asTable(); - LongColumn checkHashColumn = (LongColumn) timeSeriesTable.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME); - LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(timeSeriesTable, - SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME); - long checkHashId = checkHashColumn.get(0); // the first row has the value - long dataStreamHash = dataStreamHashColumn.isMissing(0) ? 0L : dataStreamHashColumn.get(0); - - SensorReadoutTimeSeriesKey timeSeriesKey = new SensorReadoutTimeSeriesKey(checkHashId, dataStreamHash); - Table sortedTimeSeriesTable = timeSeriesTable.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME); - SensorReadoutsTimeSeriesData timeSeriesData = new SensorReadoutsTimeSeriesData(timeSeriesKey, sortedTimeSeriesTable); - this.timeSeriesMap.add(timeSeriesData); - } - } - + this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(), + this.getLoadedMonthlyPartitions()); return this.timeSeriesMap; } } diff --git a/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsTimeSeriesMap.java b/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsTimeSeriesMap.java index 9e7361d79d..6a615c1b67 100644 --- a/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsTimeSeriesMap.java +++ b/dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsTimeSeriesMap.java @@ -16,46 +16,58 @@ package com.dqops.data.readouts.snapshot; import com.dqops.data.readouts.factory.SensorReadoutsColumnNames; +import com.dqops.data.storage.LoadedMonthlyPartition; +import com.dqops.data.storage.ParquetPartitionId; import com.dqops.utils.tables.TableColumnUtility; +import lombok.Data; import tech.tablesaw.api.LongColumn; import tech.tablesaw.api.Table; import tech.tablesaw.index.LongIndex; import tech.tablesaw.selection.Selection; +import java.lang.ref.WeakReference; import java.time.LocalDate; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.TreeMap; /** * Dictionary of identified time series in the historic sensor readout results. */ public class SensorReadoutsTimeSeriesMap { - private final Map entries = new LinkedHashMap<>(); + private final Map> entries = new LinkedHashMap<>(); + private final Map partitionMap; + private final Map partitionIndexes = new TreeMap<>(); private LocalDate firstLoadedMonth; private LocalDate lastLoadedMonth; - private Table allLoadedData; - private LongColumn checkHashColumn; - private LongColumn dataStreamHashColumn; - private LongIndex checkHashIndex; - private LongIndex dataStreamHashIndex; /** * Create a time series map. * @param firstLoadedMonth The date of the first loaded month. * @param lastLoadedMonth The date of the last loaded month. + * @param partitionMap Dictionary of loaded partitions. */ - public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth, Table allLoadedData) { + public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth, + Map partitionMap) { this.firstLoadedMonth = firstLoadedMonth; this.lastLoadedMonth = lastLoadedMonth; - this.allLoadedData = allLoadedData; - - if (allLoadedData != null) { - this.checkHashColumn = (LongColumn) allLoadedData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME); - this.dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(allLoadedData, - SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME); - this.checkHashIndex = new LongIndex(this.checkHashColumn); - this.dataStreamHashIndex = new LongIndex(this.dataStreamHashColumn); + this.partitionMap = partitionMap; + if (partitionMap != null) { + for (Map.Entry partitionKeyValue : partitionMap.entrySet()) { + Table partitionData = partitionKeyValue.getValue().getData(); + if (partitionData == null) { + return; + } + + LongColumn checkHashColumn = (LongColumn) partitionData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME); + LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(partitionData, + SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME); + LongIndex checkHashIndex = new LongIndex(checkHashColumn); + LongIndex dataStreamHashIndex = new LongIndex(dataStreamHashColumn); + + PartitionIndexes partitionIndexesEntry = new PartitionIndexes(checkHashIndex, dataStreamHashIndex, partitionKeyValue.getValue()); + this.partitionIndexes.put(partitionKeyValue.getKey(), partitionIndexesEntry); + } } } @@ -83,30 +95,66 @@ public LocalDate getLastLoadedMonth() { */ public SensorReadoutsTimeSeriesData findTimeSeriesData(long checkHashId, long dimensionId) { SensorReadoutTimeSeriesKey key = new SensorReadoutTimeSeriesKey(checkHashId, dimensionId); - SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = this.entries.get(key); + WeakReference sensorReadoutsTimeSeriesDataRef = this.entries.get(key); + SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = sensorReadoutsTimeSeriesDataRef != null ? + sensorReadoutsTimeSeriesDataRef.get() : null; + if (sensorReadoutsTimeSeriesData != null) { return sensorReadoutsTimeSeriesData; } - if (this.checkHashIndex == null) { - return null; - } + Table allTimeSeriesData = null; + + for (Map.Entry partitionIndexesKeyValue : this.partitionIndexes.entrySet()) { + PartitionIndexes partitionIndexesEntry = partitionIndexesKeyValue.getValue(); + Selection checkHashRows = partitionIndexesEntry.checkHashIndex.get(checkHashId); + Selection groupHashRows = partitionIndexesEntry.dataStreamHashIndex.get(dimensionId); + + Table partitionDataTable = partitionIndexesEntry.partitionData.getData(); + if (partitionDataTable == null) { + continue; + } - Selection checkHashRows = this.checkHashIndex.get(checkHashId); - Selection groupHashRows = this.dataStreamHashIndex.get(dimensionId); + Table filteredPartitionRows = partitionDataTable.where(checkHashRows.and(groupHashRows)); + Table sortedTimeSeriesTable = filteredPartitionRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME); - Table filteredRows = this.allLoadedData.where(checkHashRows.and(groupHashRows)); - Table sortedTimeSeriesTable = filteredRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME); + if (allTimeSeriesData == null) { + allTimeSeriesData = sortedTimeSeriesTable; + } else { + allTimeSeriesData.append(sortedTimeSeriesTable); + } + } + + SensorReadoutsTimeSeriesData timeSeriesDataSlice = new SensorReadoutsTimeSeriesData(key, allTimeSeriesData); - SensorReadoutsTimeSeriesData newSubset = new SensorReadoutsTimeSeriesData(key, sortedTimeSeriesTable); - return newSubset; + // TODO: we could store it in the cache.. but not for the moment, maybe for a different use case + return timeSeriesDataSlice; } /** - * Adds a time series object to the dictionary. - * @param timeSeries Time series object. + * Partition indexes container. */ - public void add(SensorReadoutsTimeSeriesData timeSeries) { - this.entries.put(timeSeries.getKey(), timeSeries); + @Data + public static class PartitionIndexes { + /** + * Check hash index. + */ + private final LongIndex checkHashIndex; + + /** + * Data stream (data group) hash index. + */ + private final LongIndex dataStreamHashIndex; + + /** + * The partition data. + */ + private final LoadedMonthlyPartition partitionData; + + public PartitionIndexes(LongIndex checkHashIndex, LongIndex dataStreamHashIndex, LoadedMonthlyPartition monthlyPartition) { + this.checkHashIndex = checkHashIndex; + this.dataStreamHashIndex = dataStreamHashIndex; + this.partitionData = monthlyPartition; + } } } diff --git a/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java b/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java index a242f961ac..00c2854dab 100644 --- a/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java +++ b/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java @@ -224,8 +224,11 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont PhysicalTableName physicalTableName = tableSpec.getPhysicalTableName(); UserDomainIdentity userDomainIdentity = userHome.getUserIdentity(); - SensorReadoutsSnapshot sensorReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createSnapshot(connectionName, physicalTableName, userDomainIdentity); - Table allNormalizedSensorResultsTable = sensorReadoutsSnapshot.getTableDataChanges().getNewOrChangedRows(); + SensorReadoutsSnapshot outputSensorReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createSnapshot(connectionName, physicalTableName, userDomainIdentity); + SensorReadoutsSnapshot historicSensorReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createReadOnlySnapshot(connectionName, physicalTableName, + SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA, userDomainIdentity); + + Table allNormalizedSensorResultsTable = outputSensorReadoutsSnapshot.getTableDataChanges().getNewOrChangedRows(); IntColumn severityColumnTemporary = IntColumn.create(CheckResultsColumnNames.SEVERITY_COLUMN_NAME); allNormalizedSensorResultsTable.addColumns(severityColumnTemporary); // temporary column to allow importing custom severity from custom SQL checks, bypassing rule evaluation, this column is removed before saving jobCancellationToken.throwIfCancelled(); @@ -243,19 +246,19 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont List> singleTableChecks = checks.stream().filter(c -> !c.isTableComparisonCheck()) .collect(Collectors.toList()); executeSingleTableChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken, - checkExecutionSummary, singleTableChecks, tableSpec, sensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, + checkExecutionSummary, singleTableChecks, tableSpec, historicSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, checksForErrorSampling, collectErrorSamples); List> tableComparisonChecks = checks.stream().filter(c -> c.isTableComparisonCheck()) .collect(Collectors.toList()); executeTableComparisonChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken, - checkExecutionSummary, tableComparisonChecks, tableSpec, sensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, + checkExecutionSummary, tableComparisonChecks, tableSpec, historicSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, allRuleEvaluationResultsTable, allErrorsTable, executionStatistics); - if (sensorReadoutsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) { + if (outputSensorReadoutsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) { allNormalizedSensorResultsTable.removeColumns(severityColumnTemporary); // removed, it was temporary - progressListener.onSavingSensorResults(new SavingSensorResultsEvent(tableSpec, sensorReadoutsSnapshot)); - sensorReadoutsSnapshot.save(); + progressListener.onSavingSensorResults(new SavingSensorResultsEvent(tableSpec, outputSensorReadoutsSnapshot)); + outputSensorReadoutsSnapshot.save(); } if (checkResultsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) { @@ -314,8 +317,8 @@ public void executeSingleTableChecks( CheckExecutionSummary checkExecutionSummary, Collection> checks, TableSpec tableSpec, - SensorReadoutsSnapshot sensorReadoutsSnapshot, - Table allNormalizedSensorResultsTable, + SensorReadoutsSnapshot historicSensorReadoutsSnapshot, + Table allNormalizedSensorResultsOutputTable, CheckResultsSnapshot checkResultsSnapshot, Table allRuleEvaluationResultsTable, Table allErrorsTable, @@ -358,8 +361,8 @@ public void executeSingleTableChecks( LocalDateTime timePeriodEnd = effectiveTimeWindowFilter.calculateTimePeriodEnd(sensorRunParameters.getCheckType(), sensorRunParameters.getTimePeriodGradient(), defaultTimeZoneId); long checkHash = sensorRunParameters.getCheck().getHierarchyId().hashCode64(); - sensorReadoutsSnapshot.ensureMonthsAreLoaded(timePeriodStart.toLocalDate(), timePeriodEnd.toLocalDate()); - Table allOldSensorReadouts = sensorReadoutsSnapshot.getAllData(); + historicSensorReadoutsSnapshot.ensureMonthsAreLoaded(timePeriodStart.toLocalDate(), timePeriodEnd.toLocalDate()); + Table allOldSensorReadouts = historicSensorReadoutsSnapshot.getAllData(); Table sensorReadoutColumns = TableCopyUtility.extractColumns(allOldSensorReadouts, SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_RETURNED_BY_SENSORS); LongColumn checkHashColumn = sensorReadoutColumns.longColumn(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME); @@ -389,7 +392,7 @@ public void executeSingleTableChecks( tableSpec, sensorRunParameters, sensorExecutionResult, normalizedSensorResults)); if (executionTarget != RunChecksTarget.only_rules) { - allNormalizedSensorResultsTable.append(normalizedSensorResults.getTable()); + allNormalizedSensorResultsOutputTable.append(normalizedSensorResults.getTable()); } if (executionTarget == RunChecksTarget.only_sensors) { @@ -403,7 +406,7 @@ public void executeSingleTableChecks( if (ruleDefinitionName == null) { // no rule to run, just the sensor... - sensorReadoutsSnapshot.ensureMonthsAreLoaded(minTimePeriod.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic results for merging + historicSensorReadoutsSnapshot.ensureMonthsAreLoaded(minTimePeriod.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic results for merging } else { RuleDefinitionFindResult ruleDefinitionFindResult = this.ruleDefinitionFindService.findRule(executionContext, ruleDefinitionName); @@ -417,14 +420,14 @@ public void executeSingleTableChecks( LocalDateTimePeriodUtility.calculateLocalDateTimeMinusTimePeriods( minTimePeriod, ruleTimeWindowSettings.getPredictionTimeWindow(), timeGradientForRuleScope); - sensorReadoutsSnapshot.ensureMonthsAreLoaded(earliestRequiredReadout.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic sensor readouts + historicSensorReadoutsSnapshot.ensureMonthsAreLoaded(earliestRequiredReadout.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic sensor readouts checkResultsSnapshot.ensureMonthsAreLoaded(earliestRequiredReadout.toLocalDate(), maxTimePeriod.toLocalDate()); // will be used for notifications } try { RuleEvaluationResult ruleEvaluationResult = this.ruleEvaluationService.evaluateRules( executionContext, sensorExecutionResult.getSensorRunParameters().getCheck(), sensorRunParameters, - normalizedSensorResults, sensorReadoutsSnapshot, progressListener); + normalizedSensorResults, historicSensorReadoutsSnapshot, progressListener); progressListener.onRuleExecuted(new RuleExecutedEvent(tableSpec, sensorRunParameters, normalizedSensorResults, ruleEvaluationResult)); allRuleEvaluationResultsTable.append(ruleEvaluationResult.getRuleResultsTable()); @@ -477,7 +480,7 @@ public void executeTableComparisonChecks( CheckExecutionSummary checkExecutionSummary, Collection> checks, TableSpec tableSpec, - SensorReadoutsSnapshot sensorReadoutsSnapshot, + SensorReadoutsSnapshot historicSensorReadoutsSnapshot, Table allNormalizedSensorResultsTable, CheckResultsSnapshot checkResultsSnapshot, Table allRuleEvaluationResultsTable, @@ -579,14 +582,14 @@ public void executeTableComparisonChecks( if (ruleDefinitionName == null) { // no rule to run, just the sensor... - sensorReadoutsSnapshot.ensureMonthsAreLoaded(minTimePeriod.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic results for merging + historicSensorReadoutsSnapshot.ensureMonthsAreLoaded(minTimePeriod.toLocalDate(), maxTimePeriod.toLocalDate()); // preload required historic results for merging checkResultsSnapshot.ensureMonthsAreLoaded(minTimePeriod.toLocalDate(), maxTimePeriod.toLocalDate()); // will be used for notifications } else { try { RuleEvaluationResult ruleEvaluationResult = this.ruleEvaluationService.evaluateRules( executionContext, sensorExecutionResultComparedTable.getSensorRunParameters().getCheck(), sensorRunParametersComparedTable, - normalizedSensorResultsComparedTable, sensorReadoutsSnapshot, progressListener); + normalizedSensorResultsComparedTable, historicSensorReadoutsSnapshot, progressListener); progressListener.onRuleExecuted(new RuleExecutedEvent(tableSpec, sensorRunParametersComparedTable, normalizedSensorResultsComparedTable, ruleEvaluationResult)); allRuleEvaluationResultsTable.append(ruleEvaluationResult.getRuleResultsTable());