Skip to content

Commit

Permalink
[HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata t…
Browse files Browse the repository at this point in the history
…able (#2342)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
  • Loading branch information
rmpifer and Ryan Pifer authored Dec 28, 2020
1 parent f7f8c33 commit 11661dc
Show file tree
Hide file tree
Showing 9 changed files with 906 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,17 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
Expand All @@ -72,18 +68,14 @@
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;

/**
Expand Down Expand Up @@ -232,7 +224,7 @@ public HoodieWriteConfig getWriteConfig() {
return metadataWriteConfig;
}

public HoodieTableMetadata metadata() {
public HoodieBackedTableMetadata metadata() {
return metadata;
}

Expand Down Expand Up @@ -413,38 +405,12 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);

// Read each instant in order and sync it to metadata table
final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");
ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");

switch (instant.getAction()) {
case HoodieTimeline.CLEAN_ACTION:
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
update(cleanMetadata, instant.getTimestamp());
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
update(commitMetadata, instant.getTimestamp());
break;
case HoodieTimeline.ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
timeline.getInstantDetails(instant).get());
update(rollbackMetadata, instant.getTimestamp());
break;
case HoodieTimeline.RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
timeline.getInstantDetails(instant).get());
update(restoreMetadata, instant.getTimestamp());
break;
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
default:
throw new HoodieException("Unknown type of action " + instant.getAction());

Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime());
if (records.isPresent()) {
commit(jsc, prepRecords(jsc, records.get(), MetadataPartitionType.FILES.partitionPath()), instant.getTimestamp());
}
}
// re-init the table metadata, for any future writes.
Expand All @@ -466,39 +432,7 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
return;
}

List<HoodieRecord> records = new LinkedList<>();
List<String> allPartitions = new LinkedList<>();
commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
allPartitions.add(partition);

Map<String, Long> newFiles = new HashMap<>(writeStats.size());
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
// Empty partition
LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
return;
}

int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
String filename = pathWithPartition.substring(offset);
ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
});

// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
records.add(record);
});

// New partitions created
HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
records.add(record);

LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime);
}

Expand All @@ -514,21 +448,7 @@ public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
return;
}

List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
fileDeleteCount[0] += deletedPathInfo.size();

// Files deleted from a partition
List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
.collect(Collectors.toList());
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(deletedFilenames));
records.add(record);
});

LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime);
commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime);
}

Expand All @@ -544,21 +464,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
return;
}

List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};

cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(new ArrayList<>(deletedFiles)));

records.add(record);
fileDeleteCount[0] += deletedFiles.size();
});

LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime);
}

Expand All @@ -574,12 +480,8 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
return;
}

Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles));
});
commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime());
commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime);
}

/**
Expand All @@ -594,114 +496,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime)
return;
}

Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles);
commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
}

/**
* Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
*
* During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
* function will extract this change file for each partition.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
*/
private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
final String partition = pm.getPartitionPath();

if (!pm.getSuccessDeleteFiles().isEmpty()) {
if (!partitionToDeletedFiles.containsKey(partition)) {
partitionToDeletedFiles.put(partition, new ArrayList<>());
}

// Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList());
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}

if (!pm.getAppendFiles().isEmpty()) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}

// Extract appended file name from the absolute paths saved in getAppendFiles()
pm.getAppendFiles().forEach((path, size) -> {
partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
return size + oldSize;
});
});
}
});
}

/**
* Create file delete records and commit.
*
* @param partitionToDeletedFiles {@code Map} of partitions and the deleted files
* @param instantTime Timestamp at which the deletes took place
* @param operation Type of the operation which caused the files to be deleted
*/
private void commitRollback(JavaSparkContext jsc, Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime,
String operation) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileChangeCount = {0, 0}; // deletes, appends

partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
// Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
// metadata table. Hence, the deleted filed need to be checked against the metadata.
try {
FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition));
Set<String> currentFiles =
Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet());

int origCount = deletedFiles.size();
deletedFiles.removeIf(f -> !currentFiles.contains(f));
if (deletedFiles.size() != origCount) {
LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the "
+ " metadata for partition " + partition
+ ". To delete = " + origCount + ", found=" + deletedFiles.size());
}

fileChangeCount[0] += deletedFiles.size();

Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partition)) {
filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
}

HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
} catch (IOException e) {
throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e);
}
});

partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
fileChangeCount[1] += appendedFileMap.size();

// Validate that no appended file has been deleted
ValidationUtils.checkState(
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
"Rollback file cannot both be appended and deleted");

// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
Option.empty());
records.add(record);
});

LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime());
commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -70,6 +71,11 @@ public void setUp() throws Exception {
client = new CompactionAdminClient(jsc, basePath);
}

@AfterEach
public void cleanUp() throws Exception {
cleanupResources();
}

@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
Expand Down
Loading

0 comments on commit 11661dc

Please sign in to comment.