Skip to content

Commit

Permalink
[HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata ta…
Browse files Browse the repository at this point in the history
…ble be compatible
  • Loading branch information
vinothchandar committed Jan 8, 2021
1 parent 17df517 commit 19c9247
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {

Expand All @@ -72,7 +74,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);

if (partitionsToClean.isEmpty()) {
LOG.info("Nothing to clean here. It is already clean");
LOG.info("Nothing to clean here.");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
Expand All @@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");

Map<String, List<HoodieCleanFileInfo>> cleanOps = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.stream()
// Compute the file paths, to be cleaned in each valid file group
Stream<Pair<String, List<CleanFileInfo>>> cleanInfos = context.map(partitionsToClean,
partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism).stream();

// Compute the file paths, to be cleaned in replaced file groups
List<Pair<String, List<String>>> partitionToReplacedFileIds = planner.getReplacedFileIdsToClean(earliestInstant).entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
Stream<Pair<String, List<CleanFileInfo>>> replacedCleanInfos = context.map(partitionToReplacedFileIds, partitionFileIds -> {
String partitionPath = partitionFileIds.getKey();
return Pair.of(partitionPath, planner.getDeletePathsForReplacedFileGroups(partitionPath, partitionFileIds.getRight()));
}, cleanerParallelism).stream();

Map<String, List<HoodieCleanFileInfo>> cleanOps = Stream.concat(cleanInfos, replacedCleanInfos)
.collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));

return new HoodieCleanerPlan(earliestInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,14 +112,14 @@ public Stream<String> getSavepointedDataFiles(String savepointTime) {
/**
* Returns list of partitions where clean operations needs to be performed.
*
* @param newInstantToRetain New instant to be retained after this cleanup operation
* @param earliestRetainedInstant New instant to be retained after this cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
return getPartitionPathsForCleanByCommits(newInstantToRetain);
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForFullCleaning();
default:
Expand Down Expand Up @@ -158,8 +159,7 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
* @param newInstantToRetain
* @return
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, Option<HoodieInstant> newInstantToRetain) {
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
Expand Down Expand Up @@ -226,18 +226,7 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitio
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
if (nextSlice.getBaseFile().isPresent()) {
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
return deletePaths;
Expand Down Expand Up @@ -339,6 +328,23 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
return null;
}

private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
List<CleanFileInfo> cleanPaths = new ArrayList<>();
if (nextSlice.getBaseFile().isPresent()) {
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
return cleanPaths;
}

/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
Expand Down Expand Up @@ -370,6 +376,59 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
return earliestCommitToRetain;
}

public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();

// Determine which replace commits can be cleaned.
Stream<HoodieInstant> cleanableReplaceCommits;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
if (!earliestInstantToRetain.isPresent()) {
LOG.info("Not enough instants to start cleaning replace commits");
return Collections.emptyMap();
}
// all replace commits, before the earliest instant we want to retain, should be eligible for deleting the
// replaced file groups.
cleanableReplaceCommits = replaceTimeline
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
earliestInstantToRetain.get().getTimestamp()))
.getInstants();
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
cleanableReplaceCommits = replaceTimeline.getInstants();
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}

// merge everything and make a map full of file ids to be cleaned.
return cleanableReplaceCommits.map(instant -> {
try {
return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
} catch (IOException e) {
throw new HoodieIOException("Unable to deserialize " + instant, e);
}
}).reduce((leftMap, rightMap) -> {
rightMap.forEach((partition, fileIds) -> {
if (!leftMap.containsKey(partition)) {
leftMap.put(partition, fileIds);
} else {
// duplicates should nt be possible; since replace of a file group should happen once only
leftMap.get(partition).addAll(fileIds);
}
});
return leftMap;
}).orElse(new HashMap<>());
}

public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) {
return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
.filter(fg -> eligibleFileIds.contains(fg.getFileGroupId().getFileId()))
.flatMap(HoodieFileGroup::getAllFileSlices)
.flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream())
.collect(Collectors.toList());
}

/**
* Determine if file slice needed to be preserved for pending compaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
Expand Down Expand Up @@ -158,10 +159,14 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[]
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}

public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class);
}

public static HoodieReplaceCommitMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
}

public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPla
LOG.warn("No content found in requested file for instant " + pendingReplaceInstant);
return Option.empty();
}
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get());
if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) {
return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -92,13 +95,36 @@ public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTable
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(
timeline.getInstantDetails(instant).get());
records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp()));
break;
default:
throw new HoodieException("Unknown type of action " + instant.getAction());
}

return records;
}

public static List<HoodieRecord> convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata, String instantTime) {
// treat the newly written files, as if they were a commit action.
HoodieCommitMetadata addedFilesMetadata = new HoodieCommitMetadata();
addedFilesMetadata.setOperationType(WriteOperationType.valueOf(replaceCommitMetadata.getOperationType()));
replaceCommitMetadata.getExtraMetadata().forEach(addedFilesMetadata::addMetadata);
replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) -> v.forEach(s -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
// only set the few fields that are actually needed.
writeStat.setFileSizeInBytes(s.getFileSizeInBytes());
writeStat.setTotalWriteBytes(s.getTotalWriteBytes());
writeStat.setPath(s.getPath());
writeStat.setNumDeletes(s.getNumDeletes());
addedFilesMetadata.addWriteStat(k, writeStat);
}
));
return convertMetadataToRecords(addedFilesMetadata, instantTime);
}

/**
* Finds all new files/partitions created as part of commit and creates metadata table records for them.
*
Expand Down

0 comments on commit 19c9247

Please sign in to comment.