diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java index 18e638e49bdb5..1011f68d0d7a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -49,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; public abstract class BaseCleanActionExecutor extends BaseActionExecutor { @@ -72,7 +74,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { - LOG.info("Nothing to clean here. It is already clean"); + LOG.info("Nothing to clean here."); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); @@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); - Map> cleanOps = context - .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) - .stream() + // Compute the file paths, to be cleaned in each valid file group + Stream>> cleanInfos = context.map(partitionsToClean, + partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), + cleanerParallelism).stream(); + + // Compute the file paths, to be cleaned in replaced file groups + List>> partitionToReplacedFileIds = planner.getReplacedFileIdsToClean(earliestInstant).entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + Stream>> replacedCleanInfos = context.map(partitionToReplacedFileIds, partitionFileIds -> { + String partitionPath = partitionFileIds.getKey(); + return Pair.of(partitionPath, planner.getDeletePathsForReplacedFileGroups(partitionPath, partitionFileIds.getRight())); + }, cleanerParallelism).stream(); + + Map> cleanOps = Stream.concat(cleanInfos, replacedCleanInfos) .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue()))); return new HoodieCleanerPlan(earliestInstant diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 31d433d0fe386..5cdb46cc7d144 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -49,6 +49,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -111,14 +112,14 @@ public Stream getSavepointedDataFiles(String savepointTime) { /** * Returns list of partitions where clean operations needs to be performed. * - * @param newInstantToRetain New instant to be retained after this cleanup operation + * @param earliestRetainedInstant New instant to be retained after this cleanup operation * @return list of partitions to scan for cleaning * @throws IOException when underlying file-system throws this exception */ - public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { + public List getPartitionPathsToClean(Option earliestRetainedInstant) throws IOException { switch (config.getCleanerPolicy()) { case KEEP_LATEST_COMMITS: - return getPartitionPathsForCleanByCommits(newInstantToRetain); + return getPartitionPathsForCleanByCommits(earliestRetainedInstant); case KEEP_LATEST_FILE_VERSIONS: return getPartitionPathsForFullCleaning(); default: @@ -158,8 +159,7 @@ private List getPartitionPathsForCleanByCommits(Option in * @param newInstantToRetain * @return */ - private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, - Option newInstantToRetain) { + private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, Option newInstantToRetain) { LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + ". New Instant to retain : " + newInstantToRetain); @@ -226,18 +226,7 @@ private List getFilesToCleanKeepingLatestVersions(String partitio // Delete the remaining files while (fileSliceIterator.hasNext()) { FileSlice nextSlice = fileSliceIterator.next(); - if (nextSlice.getBaseFile().isPresent()) { - HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); - deletePaths.add(new CleanFileInfo(dataFile.getPath(), false)); - if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true)); - } - } - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); - } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } } return deletePaths; @@ -339,6 +328,23 @@ private String getLatestVersionBeforeCommit(List fileSliceList, Hoodi return null; } + private List getCleanFileInfoForSlice(FileSlice nextSlice) { + List cleanPaths = new ArrayList<>(); + if (nextSlice.getBaseFile().isPresent()) { + HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); + cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false)); + if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true)); + } + } + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + return cleanPaths; + } + /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ @@ -370,6 +376,59 @@ public Option getEarliestCommitToRetain() { return earliestCommitToRetain; } + public Map> getReplacedFileIdsToClean(Option earliestInstantToRetain) { + HoodieCleaningPolicy policy = config.getCleanerPolicy(); + HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline(); + + // Determine which replace commits can be cleaned. + Stream cleanableReplaceCommits; + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + if (!earliestInstantToRetain.isPresent()) { + LOG.info("Not enough instants to start cleaning replace commits"); + return Collections.emptyMap(); + } + // all replace commits, before the earliest instant we want to retain, should be eligible for deleting the + // replaced file groups. + cleanableReplaceCommits = replaceTimeline + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, + earliestInstantToRetain.get().getTimestamp())) + .getInstants(); + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { + // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely + // In other words, the file versions only apply to the active file groups. + cleanableReplaceCommits = replaceTimeline.getInstants(); + } else { + throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); + } + + // merge everything and make a map full of file ids to be cleaned. + return cleanableReplaceCommits.map(instant -> { + try { + return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds(); + } catch (IOException e) { + throw new HoodieIOException("Unable to deserialize " + instant, e); + } + }).reduce((leftMap, rightMap) -> { + rightMap.forEach((partition, fileIds) -> { + if (!leftMap.containsKey(partition)) { + leftMap.put(partition, fileIds); + } else { + // duplicates should nt be possible; since replace of a file group should happen once only + leftMap.get(partition).addAll(fileIds); + } + }); + return leftMap; + }).orElse(new HashMap<>()); + } + + public List getDeletePathsForReplacedFileGroups(String partitionPath, List eligibleFileIds) { + return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath) + .filter(fg -> eligibleFileIds.contains(fg.getFileGroupId().getFileId())) + .flatMap(HoodieFileGroup::getAllFileSlices) + .flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream()) + .collect(Collectors.toList()); + } + /** * Determine if file slice needed to be preserved for pending compaction. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 32e60c30bd868..962d69dc3bb15 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieInstantInfo; +import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -158,10 +159,14 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[] return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class); } - public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException { + public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class); } + public static HoodieReplaceCommitMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class); + } + public static T deserializeAvroMetadata(byte[] bytes, Class clazz) throws IOException { DatumReader reader = new SpecificDatumReader<>(clazz); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index fcc3274031b4d..70dfa2a2af1f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -86,7 +86,7 @@ public static Option> getClusteringPla LOG.warn("No content found in requested file for instant " + pendingReplaceInstant); return Option.empty(); } - HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get()); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get()); if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) { return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 115001a02a3a7..22d68c9e7585a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -23,8 +23,11 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -92,6 +95,11 @@ public static Option> convertInstantToMetaRecords(HoodieTable case HoodieTimeline.SAVEPOINT_ACTION: // Nothing to be done here break; + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp())); + break; default: throw new HoodieException("Unknown type of action " + instant.getAction()); } @@ -99,6 +107,24 @@ public static Option> convertInstantToMetaRecords(HoodieTable return records; } + public static List convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata, String instantTime) { + // treat the newly written files, as if they were a commit action. + HoodieCommitMetadata addedFilesMetadata = new HoodieCommitMetadata(); + addedFilesMetadata.setOperationType(WriteOperationType.valueOf(replaceCommitMetadata.getOperationType())); + replaceCommitMetadata.getExtraMetadata().forEach(addedFilesMetadata::addMetadata); + replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) -> v.forEach(s -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + // only set the few fields that are actually needed. + writeStat.setFileSizeInBytes(s.getFileSizeInBytes()); + writeStat.setTotalWriteBytes(s.getTotalWriteBytes()); + writeStat.setPath(s.getPath()); + writeStat.setNumDeletes(s.getNumDeletes()); + addedFilesMetadata.addWriteStat(k, writeStat); + } + )); + return convertMetadataToRecords(addedFilesMetadata, instantTime); + } + /** * Finds all new files/partitions created as part of commit and creates metadata table records for them. *