Skip to content

Commit

Permalink
Exact data partitions list is added when refreshing the cloud DW.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Nov 25, 2023
1 parent 685fe06 commit 05c0f2e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.dqops.core.synchronization.fileexchange;

import com.dqops.cloud.rest.model.RefreshedPartitionModel;
import com.dqops.core.filesystem.metadata.FileDifference;
import com.dqops.core.synchronization.contract.DqoRoot;
import com.dqops.data.storage.HivePartitionPathUtility;
Expand All @@ -33,6 +34,7 @@ public class TargetTableModifiedPartitions {
private Set<String> affectedConnections = new LinkedHashSet<>();
private Set<String> affectedTables = new LinkedHashSet<>();
private Set<LocalDate> affectedMonths = new LinkedHashSet<>();
private Set<RefreshedPartitionModel> affectedPartitions = new LinkedHashSet<>();

/**
* Creates an object that identifies the target table and a list of modified months, table folders and month folders.
Expand Down Expand Up @@ -77,20 +79,40 @@ public Set<LocalDate> getAffectedMonths() {
return affectedMonths;
}

/**
* Returns a set of affected partitions, that identify exactly the connection, table and a monthly partition.
* @return Set of unique partitions that were affected.
*/
public Set<RefreshedPartitionModel> getAffectedPartitions() {
return affectedPartitions;
}

/**
* Sets a reference to a set of affected partitions.
* @param affectedPartitions Collection of affected partitions.
*/
public void setAffectedPartitions(Set<RefreshedPartitionModel> affectedPartitions) {
this.affectedPartitions = affectedPartitions;
}

/**
* Checks if there are any changes that must be refreshed. If at least one connection, table or month was modified, we will refresh it.
* @return True when there are any changes, false if no files (parquet files for a table partition) were modified.
*/
public boolean hasAnyChanges() {
if (this.affectedConnections.size() > 0) {
if (!this.affectedConnections.isEmpty()) {
return true;
}

if (!this.affectedTables.isEmpty()) {
return true;
}

if (this.affectedTables.size() > 0) {
if (!this.affectedMonths.isEmpty()) {
return true;
}

if (this.affectedMonths.size() > 0) {
if (!this.affectedPartitions.isEmpty()) {
return true;
}

Expand All @@ -106,18 +128,24 @@ public void addModifications(Collection<FileDifference> localChanges) {
Path relativePath = fileDifference.getRelativePath();
for (int nameIndex = 0; nameIndex < relativePath.getNameCount() - 1; nameIndex++) {
String folderName = relativePath.getName(nameIndex).toString();
RefreshedPartitionModel affectedPartition = new RefreshedPartitionModel();

if (HivePartitionPathUtility.validHivePartitionConnectionFolderName(folderName)) {
String connectionName = HivePartitionPathUtility.connectionFromHivePartitionFolderName(folderName);
this.affectedConnections.add(connectionName);
affectedPartition.setConnection(connectionName);
}
else if (HivePartitionPathUtility.validHivePartitionTableFolderName(folderName)) {
String tableName = HivePartitionPathUtility.tableFromHivePartitionFolderName(folderName).toBaseFileName();
this.affectedTables.add(tableName);
affectedPartition.setSchemaTableName(tableName);
} else if (HivePartitionPathUtility.validHivePartitionMonthFolderName(folderName)) {
LocalDate monthDate = HivePartitionPathUtility.monthFromHivePartitionFolderName(folderName);
this.affectedMonths.add(monthDate);
affectedPartition.setMonth(monthDate);
}

this.affectedPartitions.add(affectedPartition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ public void refreshNativeTable(TargetTableModifiedPartitions targetTableModified
RefreshTableRequest refreshTableRequest = new RefreshTableRequest();
refreshTableRequest.setTable(targetTableParameter);

if (targetTableModifiedPartitions.getAffectedConnections().size() > 0) {
if (!targetTableModifiedPartitions.getAffectedConnections().isEmpty()) {
refreshTableRequest.setConnections(new ArrayList<>(targetTableModifiedPartitions.getAffectedConnections()));
}

if (targetTableModifiedPartitions.getAffectedTables().size() > 0) {
if (!targetTableModifiedPartitions.getAffectedTables().isEmpty()) {
refreshTableRequest.setTables(new ArrayList<>(targetTableModifiedPartitions.getAffectedTables()));
}

if (targetTableModifiedPartitions.getAffectedMonths().size() > 0) {
if (!targetTableModifiedPartitions.getAffectedMonths().isEmpty()) {
refreshTableRequest.setMonths(new ArrayList<>(targetTableModifiedPartitions.getAffectedMonths()));
}

if (!targetTableModifiedPartitions.getAffectedPartitions().isEmpty()) {
refreshTableRequest.setPartitions(new ArrayList<>(targetTableModifiedPartitions.getAffectedPartitions()));
}

tenantDataWarehouseApi.refreshNativeTable(refreshTableRequest);
}
}

0 comments on commit 05c0f2e

Please sign in to comment.