diff --git a/dqops/src/main/java/com/dqops/core/synchronization/fileexchange/TargetTableModifiedPartitions.java b/dqops/src/main/java/com/dqops/core/synchronization/fileexchange/TargetTableModifiedPartitions.java index f3242c36db..7665b11d70 100644 --- a/dqops/src/main/java/com/dqops/core/synchronization/fileexchange/TargetTableModifiedPartitions.java +++ b/dqops/src/main/java/com/dqops/core/synchronization/fileexchange/TargetTableModifiedPartitions.java @@ -15,6 +15,7 @@ */ package com.dqops.core.synchronization.fileexchange; +import com.dqops.cloud.rest.model.RefreshedPartitionModel; import com.dqops.core.filesystem.metadata.FileDifference; import com.dqops.core.synchronization.contract.DqoRoot; import com.dqops.data.storage.HivePartitionPathUtility; @@ -33,6 +34,7 @@ public class TargetTableModifiedPartitions { private Set affectedConnections = new LinkedHashSet<>(); private Set affectedTables = new LinkedHashSet<>(); private Set affectedMonths = new LinkedHashSet<>(); + private Set affectedPartitions = new LinkedHashSet<>(); /** * Creates an object that identifies the target table and a list of modified months, table folders and month folders. @@ -77,20 +79,40 @@ public Set getAffectedMonths() { return affectedMonths; } + /** + * Returns a set of affected partitions, that identify exactly the connection, table and a monthly partition. + * @return Set of unique partitions that were affected. + */ + public Set getAffectedPartitions() { + return affectedPartitions; + } + + /** + * Sets a reference to a set of affected partitions. + * @param affectedPartitions Collection of affected partitions. + */ + public void setAffectedPartitions(Set affectedPartitions) { + this.affectedPartitions = affectedPartitions; + } + /** * Checks if there are any changes that must be refreshed. If at least one connection, table or month was modified, we will refresh it. * @return True when there are any changes, false if no files (parquet files for a table partition) were modified. */ public boolean hasAnyChanges() { - if (this.affectedConnections.size() > 0) { + if (!this.affectedConnections.isEmpty()) { + return true; + } + + if (!this.affectedTables.isEmpty()) { return true; } - if (this.affectedTables.size() > 0) { + if (!this.affectedMonths.isEmpty()) { return true; } - if (this.affectedMonths.size() > 0) { + if (!this.affectedPartitions.isEmpty()) { return true; } @@ -106,18 +128,24 @@ public void addModifications(Collection localChanges) { Path relativePath = fileDifference.getRelativePath(); for (int nameIndex = 0; nameIndex < relativePath.getNameCount() - 1; nameIndex++) { String folderName = relativePath.getName(nameIndex).toString(); + RefreshedPartitionModel affectedPartition = new RefreshedPartitionModel(); if (HivePartitionPathUtility.validHivePartitionConnectionFolderName(folderName)) { String connectionName = HivePartitionPathUtility.connectionFromHivePartitionFolderName(folderName); this.affectedConnections.add(connectionName); + affectedPartition.setConnection(connectionName); } else if (HivePartitionPathUtility.validHivePartitionTableFolderName(folderName)) { String tableName = HivePartitionPathUtility.tableFromHivePartitionFolderName(folderName).toBaseFileName(); this.affectedTables.add(tableName); + affectedPartition.setSchemaTableName(tableName); } else if (HivePartitionPathUtility.validHivePartitionMonthFolderName(folderName)) { LocalDate monthDate = HivePartitionPathUtility.monthFromHivePartitionFolderName(folderName); this.affectedMonths.add(monthDate); + affectedPartition.setMonth(monthDate); } + + this.affectedPartitions.add(affectedPartition); } } } diff --git a/dqops/src/main/java/com/dqops/core/synchronization/service/DqoCloudWarehouseServiceImpl.java b/dqops/src/main/java/com/dqops/core/synchronization/service/DqoCloudWarehouseServiceImpl.java index b1c9687a4d..c34b8a86bb 100644 --- a/dqops/src/main/java/com/dqops/core/synchronization/service/DqoCloudWarehouseServiceImpl.java +++ b/dqops/src/main/java/com/dqops/core/synchronization/service/DqoCloudWarehouseServiceImpl.java @@ -69,18 +69,22 @@ public void refreshNativeTable(TargetTableModifiedPartitions targetTableModified RefreshTableRequest refreshTableRequest = new RefreshTableRequest(); refreshTableRequest.setTable(targetTableParameter); - if (targetTableModifiedPartitions.getAffectedConnections().size() > 0) { + if (!targetTableModifiedPartitions.getAffectedConnections().isEmpty()) { refreshTableRequest.setConnections(new ArrayList<>(targetTableModifiedPartitions.getAffectedConnections())); } - if (targetTableModifiedPartitions.getAffectedTables().size() > 0) { + if (!targetTableModifiedPartitions.getAffectedTables().isEmpty()) { refreshTableRequest.setTables(new ArrayList<>(targetTableModifiedPartitions.getAffectedTables())); } - if (targetTableModifiedPartitions.getAffectedMonths().size() > 0) { + if (!targetTableModifiedPartitions.getAffectedMonths().isEmpty()) { refreshTableRequest.setMonths(new ArrayList<>(targetTableModifiedPartitions.getAffectedMonths())); } + if (!targetTableModifiedPartitions.getAffectedPartitions().isEmpty()) { + refreshTableRequest.setPartitions(new ArrayList<>(targetTableModifiedPartitions.getAffectedPartitions())); + } + tenantDataWarehouseApi.refreshNativeTable(refreshTableRequest); } }