Skip to content

Commit

Permalink
Use filesystemview and json format from metadata. Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
satishkotha committed Jan 9, 2021
1 parent 19c9247 commit df5ef96
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
// TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed.
boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant);
if (!deleteSuccess) {
// throw error and stop archival if deleting replaced file groups failed.
throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName());
LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
}
try {
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -50,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {

Expand Down Expand Up @@ -83,21 +81,9 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");

// Compute the file paths, to be cleaned in each valid file group
Stream<Pair<String, List<CleanFileInfo>>> cleanInfos = context.map(partitionsToClean,
partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism).stream();

// Compute the file paths, to be cleaned in replaced file groups
List<Pair<String, List<String>>> partitionToReplacedFileIds = planner.getReplacedFileIdsToClean(earliestInstant).entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
Stream<Pair<String, List<CleanFileInfo>>> replacedCleanInfos = context.map(partitionToReplacedFileIds, partitionFileIds -> {
String partitionPath = partitionFileIds.getKey();
return Pair.of(partitionPath, planner.getDeletePathsForReplacedFileGroups(partitionPath, partitionFileIds.getRight()));
}, cleanerParallelism).stream();

Map<String, List<HoodieCleanFileInfo>> cleanOps = Stream.concat(cleanInfos, replacedCleanInfos)
Map<String, List<HoodieCleanFileInfo>> cleanOps = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));

return new HoodieCleanerPlan(earliestInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -49,7 +50,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -159,7 +159,8 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
* @param newInstantToRetain
* @return
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, Option<HoodieInstant> newInstantToRetain) {
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
Expand All @@ -168,10 +169,16 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand All @@ -196,13 +203,17 @@ private List<String> getPartitionPathsForFullCleaning() throws IOException {
private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));

List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand Down Expand Up @@ -258,7 +269,11 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partition

// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
Expand Down Expand Up @@ -311,6 +326,20 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partition
}
return deletePaths;
}

private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
if (earliestCommitToRetain.isPresent()) {
replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
} else {
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted)
.filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
.flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
.collect(Collectors.toList());
}

/**
* Gets the latest version < instantTime. This version file could still be used by queries.
Expand Down Expand Up @@ -376,59 +405,6 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
return earliestCommitToRetain;
}

public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();

// Determine which replace commits can be cleaned.
Stream<HoodieInstant> cleanableReplaceCommits;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
if (!earliestInstantToRetain.isPresent()) {
LOG.info("Not enough instants to start cleaning replace commits");
return Collections.emptyMap();
}
// all replace commits, before the earliest instant we want to retain, should be eligible for deleting the
// replaced file groups.
cleanableReplaceCommits = replaceTimeline
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
earliestInstantToRetain.get().getTimestamp()))
.getInstants();
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
cleanableReplaceCommits = replaceTimeline.getInstants();
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}

// merge everything and make a map full of file ids to be cleaned.
return cleanableReplaceCommits.map(instant -> {
try {
return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
} catch (IOException e) {
throw new HoodieIOException("Unable to deserialize " + instant, e);
}
}).reduce((leftMap, rightMap) -> {
rightMap.forEach((partition, fileIds) -> {
if (!leftMap.containsKey(partition)) {
leftMap.put(partition, fileIds);
} else {
// duplicates should nt be possible; since replace of a file group should happen once only
leftMap.get(partition).addAll(fileIds);
}
});
return leftMap;
}).orElse(new HashMap<>());
}

public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) {
return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
.filter(fg -> eligibleFileIds.contains(fg.getFileGroupId().getFileId()))
.flatMap(HoodieFileGroup::getAllFileSlices)
.flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream())
.collect(Collectors.toList());
}

/**
* Determine if file slice needed to be preserved for pending compaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFil
List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
LOG.info("Rolling back commit action.");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand Down Expand Up @@ -498,6 +499,15 @@ public void testSync(HoodieTableType tableType) throws Exception {
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());

// insert overwrite to test replacecommit
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
records = dataGen.generateInserts(newCommitTime, 5);
HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
writeStatuses = replaceResult.getWriteStatuses().collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
}

// Enable metadata table and ensure it is synced
Expand Down Expand Up @@ -800,6 +810,7 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {

// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));

fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
Expand Down
Loading

0 comments on commit df5ef96

Please sign in to comment.