From 11661dc6aa03a8b8d39f823f9991ed8402a35ef1 Mon Sep 17 00:00:00 2001 From: rmpifer Date: Mon, 28 Dec 2020 09:52:02 -0800 Subject: [PATCH] [HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (#2342) Co-authored-by: Ryan Pifer --- .../HoodieBackedTableMetadataWriter.java | 227 +------------ .../client/TestCompactionAdminClient.java | 6 + ...ata.java => TestHoodieBackedMetadata.java} | 158 +++++++-- .../table/upgrade/TestUpgradeDowngrade.java | 6 + .../testutils/HoodieClientTestHarness.java | 7 +- .../metadata/AbstractHoodieTableMetadata.java | 303 +++++++++++++++++ .../metadata/HoodieBackedTableMetadata.java | 205 +----------- ...dieMetadataMergedInstantRecordScanner.java | 115 +++++++ .../metadata/HoodieTableMetadataUtil.java | 316 ++++++++++++++++++ 9 files changed, 906 insertions(+), 437 deletions(-) rename hudi-client/src/test/java/org/apache/hudi/metadata/{TestHoodieFsMetadata.java => TestHoodieBackedMetadata.java} (86%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f89a198d9abf2..c3ba2a9ec5949 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -42,13 +42,10 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; -import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -56,7 +53,6 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.metrics.DistributedRegistry; @@ -72,18 +68,14 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** @@ -232,7 +224,7 @@ public HoodieWriteConfig getWriteConfig() { return metadataWriteConfig; } - public HoodieTableMetadata metadata() { + public HoodieBackedTableMetadata metadata() { return metadata; } @@ -413,38 +405,12 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) { LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync); // Read each instant in order and sync it to metadata table - final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); - - switch (instant.getAction()) { - case HoodieTimeline.CLEAN_ACTION: - HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant); - update(cleanMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.COMPACTION_ACTION: - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - update(commitMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.ROLLBACK_ACTION: - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( - timeline.getInstantDetails(instant).get()); - update(rollbackMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.RESTORE_ACTION: - HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( - timeline.getInstantDetails(instant).get()); - update(restoreMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.SAVEPOINT_ACTION: - // Nothing to be done here - break; - default: - throw new HoodieException("Unknown type of action " + instant.getAction()); + + Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime()); + if (records.isPresent()) { + commit(jsc, prepRecords(jsc, records.get(), MetadataPartitionType.FILES.partitionPath()), instant.getTimestamp()); } } // re-init the table metadata, for any future writes. @@ -466,39 +432,7 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) { return; } - List records = new LinkedList<>(); - List allPartitions = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); - - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } - - int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; - String filename = pathWithPartition.substring(offset); - ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); - newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); - }); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); - }); - - // New partitions created - HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); - records.add(record); - - LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() - + ". #partitions_updated=" + records.size()); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -514,21 +448,7 @@ public void update(HoodieCleanerPlan cleanerPlan, String instantTime) { return; } - List records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { - fileDeleteCount[0] += deletedPathInfo.size(); - - // Files deleted from a partition - List deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) - .collect(Collectors.toList()); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(deletedFilenames)); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -544,21 +464,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { return; } - List records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - - cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { - // Files deleted from a partition - List deletedFiles = partitionMetadata.getSuccessDeleteFiles(); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(new ArrayList<>(deletedFiles))); - - records.add(record); - fileDeleteCount[0] += deletedFiles.size(); - }); - - LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -574,12 +480,8 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { return; } - Map> partitionToAppendedFiles = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles)); - }); - commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime()); + commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } /** @@ -594,114 +496,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) return; } - Map> partitionToAppendedFiles = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); - processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles); - commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); - } - - /** - * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. - * - * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This - * function will extract this change file for each partition. - * - * @param rollbackMetadata {@code HoodieRollbackMetadata} - * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. - * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. - */ - private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles) { - rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { - final String partition = pm.getPartitionPath(); - - if (!pm.getSuccessDeleteFiles().isEmpty()) { - if (!partitionToDeletedFiles.containsKey(partition)) { - partitionToDeletedFiles.put(partition, new ArrayList<>()); - } - - // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() - List deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) - .collect(Collectors.toList()); - partitionToDeletedFiles.get(partition).addAll(deletedFiles); - } - - if (!pm.getAppendFiles().isEmpty()) { - if (!partitionToAppendedFiles.containsKey(partition)) { - partitionToAppendedFiles.put(partition, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getAppendFiles() - pm.getAppendFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { - return size + oldSize; - }); - }); - } - }); - } - - /** - * Create file delete records and commit. - * - * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files - * @param instantTime Timestamp at which the deletes took place - * @param operation Type of the operation which caused the files to be deleted - */ - private void commitRollback(JavaSparkContext jsc, Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, String instantTime, - String operation) { - List records = new LinkedList<>(); - int[] fileChangeCount = {0, 0}; // deletes, appends - - partitionToDeletedFiles.forEach((partition, deletedFiles) -> { - // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the - // metadata table. Hence, the deleted filed need to be checked against the metadata. - try { - FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition)); - Set currentFiles = - Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet()); - - int origCount = deletedFiles.size(); - deletedFiles.removeIf(f -> !currentFiles.contains(f)); - if (deletedFiles.size() != origCount) { - LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the " - + " metadata for partition " + partition - + ". To delete = " + origCount + ", found=" + deletedFiles.size()); - } - - fileChangeCount[0] += deletedFiles.size(); - - Option> filesAdded = Option.empty(); - if (partitionToAppendedFiles.containsKey(partition)) { - filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); - } - - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, - Option.of(new ArrayList<>(deletedFiles))); - records.add(record); - } catch (IOException e) { - throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e); - } - }); - - partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { - fileChangeCount[1] += appendedFileMap.size(); - - // Validate that no appended file has been deleted - ValidationUtils.checkState( - !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), - "Rollback file cannot both be appended and deleted"); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), - Option.empty()); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() - + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime()); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 1200f67cc079a..c42110e7ca3ca 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -37,6 +37,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +71,11 @@ public void setUp() throws Exception { client = new CompactionAdminClient(jsc, basePath); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testUnscheduleCompactionPlan() throws Exception { int numEntriesPerInstant = 10; diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java similarity index 86% rename from hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java rename to hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 2823035d719c2..48d07e57cbae1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -73,10 +73,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; -public class TestHoodieFsMetadata extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class); +public class TestHoodieBackedMetadata extends HoodieClientTestHarness { + private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); @TempDir public java.nio.file.Path tempFolder; @@ -91,7 +92,7 @@ public void init(HoodieTableType tableType) throws IOException { initSparkContexts("TestHoodieMetadata"); initFileSystem(); fs.mkdirs(new Path(basePath)); - initMetaClient(); + initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); } @@ -168,6 +169,7 @@ public void testOnlyValidPartitionsAdded() throws Exception { //@ParameterizedTest //@EnumSource(HoodieTableType.class) //public void testTableOperations(HoodieTableType tableType) throws Exception { + @Test public void testTableOperations() throws Exception { //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed init(HoodieTableType.COPY_ON_WRITE); @@ -257,6 +259,7 @@ public void testTableOperations() throws Exception { //@ParameterizedTest //@EnumSource(HoodieTableType.class) //public void testRollbackOperations(HoodieTableType tableType) throws Exception { + @Test public void testRollbackOperations() throws Exception { //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed init(HoodieTableType.COPY_ON_WRITE); @@ -363,6 +366,40 @@ public void testRollbackOperations() throws Exception { } + /** + * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op + * occurs to metadata. + * @throws Exception + */ + @Test + public void testRollbackUnsyncedCommit() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) { + // Initialize table with metadata + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + // Commit with metadata disabled + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + client.rollback(newCommitTime); + } + + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) { + validateMetadata(client); + } + } + /** * Test sync of table operations. */ @@ -472,7 +509,6 @@ public void testSync() throws Exception { validateMetadata(client); assertTrue(metadata(client).isInSync()); } - } /** @@ -619,6 +655,82 @@ public void testMetadataMetrics() throws Exception { } } + /** + * Test when reading from metadata table which is out of sync with dataset that results are still consistent. + */ + // @ParameterizedTest + // @EnumSource(HoodieTableType.class) + @Test + public void testMetadataOutOfSync() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + + HoodieWriteClient unsyncedClient = new HoodieWriteClient<>(jsc, getWriteConfig(true, true)); + + // Enable metadata so table is initialized + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) { + // Perform Bulk Insert + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 20); + client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + } + + // Perform commit operations with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + // Perform Insert + String newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 20); + client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Perform Upsert + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUniqueUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Compaction + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "004"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform clean operation with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + // One more commit needed to trigger clean so upsert and compact + String newCommitTime = "005"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "006"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + + // Clean + newCommitTime = "007"; + client.clean(newCommitTime); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform restore with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + client.restoreToInstant("004"); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + } + /** * Validate the metadata tables contents to ensure it matches what is on the file system. * @@ -626,25 +738,18 @@ public void testMetadataMetrics() throws Exception { */ private void validateMetadata(HoodieWriteClient client) throws IOException { HoodieWriteConfig config = client.getConfig(); - HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); - assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + HoodieBackedTableMetadata tableMetadata = metadata(client); + assertNotNull(tableMetadata, "MetadataReader should have been initialized"); if (!config.useFileListingMetadata()) { return; } HoodieTimer timer = new HoodieTimer().startTimer(); - // Validate write config for metadata table - HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); - assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); - assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); - - // Metadata table should be in sync with the dataset - assertTrue(metadata(client).isInSync()); - // Partitions should match List fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath); - List metadataPartitions = metadataWriter.metadata().getAllPartitionPaths(); + List metadataPartitions = tableMetadata.getAllPartitionPaths(); Collections.sort(fsPartitions); Collections.sort(metadataPartitions); @@ -665,8 +770,9 @@ private void validateMetadata(HoodieWriteClient client) throws IOException { } else { partitionPath = new Path(basePath, partition); } + FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath); - FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath); + FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath); List fsFileNames = Arrays.stream(fsStatuses) .map(s -> s.getPath().getName()).collect(Collectors.toList()); List metadataFilenames = Arrays.stream(metaStatuses) @@ -687,9 +793,9 @@ private void validateMetadata(HoodieWriteClient client) throws IOException { // FileSystemView should expose the same data List fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList()); - fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g)); - fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b))); - fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s))); + fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g)); + fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b))); + fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s))); long numFiles = fileGroups.stream() .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum()) @@ -702,10 +808,18 @@ private void validateMetadata(HoodieWriteClient client) throws IOException { } }); - HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + // Validate write config for metadata table + HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); + assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); + assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); // Metadata table should be in sync with the dataset - assertTrue(metadataWriter.metadata().isInSync()); + assertTrue(metadata(client).isInSync()); + + HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); @@ -742,7 +856,7 @@ private HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteClient client) private HoodieBackedTableMetadata metadata(HoodieWriteClient client) { HoodieWriteConfig clientConfig = client.getConfig(); - return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(), + return (HoodieBackedTableMetadata) AbstractHoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(), clientConfig.useFileListingMetadata(), clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index e1dc4cefa1452..da5f434c98976 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -46,6 +46,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -91,6 +92,11 @@ public void setUp() throws Exception { initDFSMetaClient(); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testLeftOverUpdatedPropFileCleanup() throws IOException { testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index f1e3f175ea9a2..cf49174225c1c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -189,6 +190,10 @@ protected void cleanupFileSystem() throws IOException { * @throws IOException */ protected void initMetaClient() throws IOException { + initMetaClient(getTableType()); + } + + protected void initMetaClient(HoodieTableType tableType) throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -197,7 +202,7 @@ protected void initMetaClient() throws IOException { throw new IllegalStateException("The Spark context has not been initialized."); } - metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java new file mode 100644 index 0000000000000..15b9ebfd35f9e --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Interface that supports querying various pieces of metadata about a hudi table. + */ +public abstract class AbstractHoodieTableMetadata implements HoodieTableMetadata { + + private static final Logger LOG = LogManager.getLogger(AbstractHoodieTableMetadata.class); + + static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; + static final int BUFFER_SIZE = 10 * 1024 * 1024; + + protected final SerializableConfiguration hadoopConf; + protected final Option metrics; + protected final String datasetBasePath; + + // Directory used for Spillable Map when merging records + final String spillableMapDirectory; + + protected boolean enabled; + private final boolean validateLookups; + private final boolean assumeDatePartitioning; + + private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner; + + protected AbstractHoodieTableMetadata(Configuration hadoopConf, String datasetBasePath, String spillableMapDirectory, + boolean enabled, boolean validateLookups, boolean enableMetrics, + boolean assumeDatePartitioning) { + this.hadoopConf = new SerializableConfiguration(hadoopConf); + this.datasetBasePath = datasetBasePath; + this.spillableMapDirectory = spillableMapDirectory; + + this.enabled = enabled; + this.validateLookups = validateLookups; + this.assumeDatePartitioning = assumeDatePartitioning; + + if (enableMetrics) { + this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata"))); + } else { + this.metrics = Option.empty(); + } + } + + public static AbstractHoodieTableMetadata create(Configuration conf, String datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata, + boolean verifyListings, boolean enableMetrics, boolean shouldAssumeDatePartitioning) { + return new HoodieBackedTableMetadata(conf, datasetBasePath, spillableMapPath, useFileListingFromMetadata, verifyListings, + enableMetrics, shouldAssumeDatePartitioning); + } + + + /** + * Return the list of files in a partition. + * + * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of + * partitions is retrieved directly from the underlying {@code FileSystem}. + * + * On any errors retrieving the listing from the metadata, defaults to using the file system listings. + * + * @param partitionPath The absolute path of the partition to list + */ + + public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException { + if (enabled) { + try { + return fetchAllFilesInPartition(partitionPath); + } catch (Exception e) { + LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e); + } + } + return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath); + } + + /** + * Return the list of partitions in the dataset. + * + * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of + * partitions is retrieved directly from the underlying {@code FileSystem}. + * + * On any errors retrieving the listing from the metadata, defaults to using the file system listings. + * + */ + public List getAllPartitionPaths() throws IOException { + if (enabled) { + try { + return fetchAllPartitionPaths(); + } catch (Exception e) { + LOG.error("Failed to retrieve list of partition from metadata", e); + } + } + + return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths(); + } + + /** + * Returns a list of all partitions. + */ + /** + * Returns a list of all partitions. + */ + protected List fetchAllPartitionPaths() throws IOException { + HoodieTimer timer = new HoodieTimer().startTimer(); + Option> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); + + List partitions = Collections.emptyList(); + if (hoodieRecord.isPresent()) { + if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { + throw new HoodieMetadataException("Metadata partition list record is inconsistent: " + + hoodieRecord.get().getData()); + } + + partitions = hoodieRecord.get().getData().getFilenames(); + // Partition-less tables have a single empty partition + if (partitions.contains(NON_PARTITIONED_NAME)) { + partitions.remove(NON_PARTITIONED_NAME); + partitions.add(""); + } + } + + if (validateLookups) { + // Validate the Metadata Table data by listing the partitions from the file system + timer.startTimer(); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning); + List actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer())); + + Collections.sort(actualPartitions); + Collections.sort(partitions); + if (!actualPartitions.equals(partitions)) { + LOG.error("Validation of metadata partition list failed. Lists do not match."); + LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray())); + LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray())); + + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); + } + + // Return the direct listing as it should be correct + partitions = actualPartitions; + } + + LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); + return partitions; + } + + /** + * Return all the files from the partition. + * + * @param partitionPath The absolute path of the partition + */ + private FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath); + if (partitionName.isEmpty()) { + partitionName = NON_PARTITIONED_NAME; + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Option> hoodieRecord = getMergedRecordByKey(partitionName); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); + + FileStatus[] statuses = {}; + if (hoodieRecord.isPresent()) { + if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { + throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: " + + hoodieRecord.get().getData()); + } + statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath); + } + + if (validateLookups) { + // Validate the Metadata Table data by listing the partitions from the file system + timer.startTimer(); + + // Ignore partition metadata file + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); + FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath, + p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer())); + + List directFilenames = Arrays.stream(directStatuses) + .map(s -> s.getPath().getName()).sorted() + .collect(Collectors.toList()); + + List metadataFilenames = Arrays.stream(statuses) + .map(s -> s.getPath().getName()).sorted() + .collect(Collectors.toList()); + + if (!metadataFilenames.equals(directFilenames)) { + LOG.error("Validation of metadata file listing for partition " + partitionName + " failed."); + LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray())); + LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray())); + + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); + } + + // Return the direct listing as it should be correct + statuses = directStatuses; + } + + LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length); + return statuses; + } + + /** + * Retrieve the merged {@code HoodieRecord} mapped to the given key. + * + * @param key The key of the record + */ + private Option> getMergedRecordByKey(String key) throws IOException { + Option> mergedRecord; + openTimelineScanner(); + + Option> metadataHoodieRecord = getRecordByKeyFromMetadata(key); + // Retrieve record from unsynced timeline instants + Option> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key); + if (timelineHoodieRecord.isPresent()) { + if (metadataHoodieRecord.isPresent()) { + HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData()); + mergedRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload)); + } else { + mergedRecord = timelineHoodieRecord; + } + } else { + mergedRecord = metadataHoodieRecord; + } + return mergedRecord; + } + + protected abstract Option> getRecordByKeyFromMetadata(String key) throws IOException; + + private void openTimelineScanner() throws IOException { + if (timelineRecordScanner != null) { + // Already opened + return; + } + + HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); + List unsyncedInstants = findInstantsToSync(datasetMetaClient); + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + timelineRecordScanner = + new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); + } + + protected List findInstantsToSync() { + HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); + return findInstantsToSync(datasetMetaClient); + } + + protected abstract List findInstantsToSync(HoodieTableMetaClient datasetMetaClient); + + public boolean isInSync() { + return enabled && findInstantsToSync().isEmpty(); + } + + protected void closeReaders() { + timelineRecordScanner = null; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 3a1c7bfd9f2a3..bf5aa763da7cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -19,8 +19,6 @@ package org.apache.hudi.metadata; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,17 +27,13 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -52,7 +46,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -65,24 +58,13 @@ * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system. * No updates are applied to the table and it is not synced. */ -public class HoodieBackedTableMetadata implements HoodieTableMetadata { +public class HoodieBackedTableMetadata extends AbstractHoodieTableMetadata { private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class); - private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; - private static final int BUFFER_SIZE = 10 * 1024 * 1024; - private final SerializableConfiguration hadoopConf; - private final String datasetBasePath; private final String metadataBasePath; - private final Option metrics; private HoodieTableMetaClient metaClient; - private boolean enabled; - private final boolean validateLookups; - private final boolean assumeDatePartitioning; - // Directory used for Spillable Map when merging records - private final String spillableMapDirectory; - // Readers for the base and log file which store the metadata private transient HoodieFileReader baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; @@ -95,13 +77,8 @@ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, Str public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) { - this.hadoopConf = new SerializableConfiguration(conf); - this.datasetBasePath = datasetBasePath; + super(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning); this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); - this.validateLookups = validateLookups; - this.spillableMapDirectory = spillableMapDirectory; - this.enabled = enabled; - this.assumeDatePartitioning = assumeDatePartitioning; if (enabled) { try { @@ -116,164 +93,6 @@ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, Str } else { LOG.info("Metadata table is disabled."); } - - if (enableMetrics) { - this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata"))); - } else { - this.metrics = Option.empty(); - } - } - - /** - * Return the list of partitions in the dataset. - * - * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of - * partitions is retrieved directly from the underlying {@code FileSystem}. - * - * On any errors retrieving the listing from the metadata, defaults to using the file system listings. - * - */ - @Override - public List getAllPartitionPaths() - throws IOException { - if (enabled) { - try { - return fetchAllPartitionPaths(); - } catch (Exception e) { - LOG.error("Failed to retrieve list of partition from metadata", e); - } - } - - return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths(); - } - - /** - * Return the list of files in a partition. - * - * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of - * partitions is retrieved directly from the underlying {@code FileSystem}. - * - * On any errors retrieving the listing from the metadata, defaults to using the file system listings. - * - * @param partitionPath The absolute path of the partition to list - */ - @Override - public FileStatus[] getAllFilesInPartition(Path partitionPath) - throws IOException { - if (enabled) { - try { - return fetchAllFilesInPartition(partitionPath); - } catch (Exception e) { - LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e); - } - } - - return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath); - } - - /** - * Returns a list of all partitions. - */ - protected List fetchAllPartitionPaths() throws IOException { - HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); - - List partitions = Collections.emptyList(); - if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata partition list record is inconsistent: " - + hoodieRecord.get().getData()); - } - - partitions = hoodieRecord.get().getData().getFilenames(); - // Partition-less tables have a single empty partition - if (partitions.contains(NON_PARTITIONED_NAME)) { - partitions.remove(NON_PARTITIONED_NAME); - partitions.add(""); - } - } - - if (validateLookups) { - // Validate the Metadata Table data by listing the partitions from the file system - timer.startTimer(); - FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning); - List actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer())); - - Collections.sort(actualPartitions); - Collections.sort(partitions); - if (!actualPartitions.equals(partitions)) { - LOG.error("Validation of metadata partition list failed. Lists do not match."); - LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray())); - LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray())); - - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); - } - - // Return the direct listing as it should be correct - partitions = actualPartitions; - } - - LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); - return partitions; - } - - /** - * Return all the files from the partition. - * - * @param partitionPath The absolute path of the partition - */ - FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath); - if (partitionName.isEmpty()) { - partitionName = NON_PARTITIONED_NAME; - } - - HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getMergedRecordByKey(partitionName); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - - FileStatus[] statuses = {}; - if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); - } - statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath); - } - - if (validateLookups) { - // Validate the Metadata Table data by listing the partitions from the file system - timer.startTimer(); - - // Ignore partition metadata file - FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath, - p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer())); - - List directFilenames = Arrays.stream(directStatuses) - .map(s -> s.getPath().getName()).sorted() - .collect(Collectors.toList()); - - List metadataFilenames = Arrays.stream(statuses) - .map(s -> s.getPath().getName()).sorted() - .collect(Collectors.toList()); - - if (!metadataFilenames.equals(directFilenames)) { - LOG.error("Validation of metadata file listing for partition " + partitionName + " failed."); - LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray())); - LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray())); - - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); - } - - // Return the direct listing as it should be correct - statuses = directStatuses; - } - - LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length); - return statuses; } /** @@ -281,7 +100,8 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { * * @param key The key of the record */ - private Option> getMergedRecordByKey(String key) throws IOException { + @Override + protected Option> getRecordByKeyFromMetadata(String key) throws IOException { openBaseAndLogFiles(); // Retrieve record from base file @@ -314,7 +134,7 @@ private Option> getMergedRecordByKey(String /** * Open readers to the base and log files. */ - private synchronized void openBaseAndLogFiles() throws IOException { + protected synchronized void openBaseAndLogFiles() throws IOException { if (logRecordScanner != null) { // Already opened return; @@ -363,7 +183,9 @@ private synchronized void openBaseAndLogFiles() throws IOException { metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer())); } + @Override protected void closeReaders() { + super.closeReaders(); if (baseFileReader != null) { baseFileReader.close(); baseFileReader = null; @@ -371,19 +193,6 @@ protected void closeReaders() { logRecordScanner = null; } - /** - * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table. - */ - @Override - public boolean isInSync() { - return enabled && findInstantsToSync().isEmpty(); - } - - private List findInstantsToSync() { - HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); - return findInstantsToSync(datasetMetaClient); - } - /** * Return an ordered list of instants which have not been synced to the Metadata Table. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java new file mode 100644 index 0000000000000..c98f48cb14130 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify + * a filter to limit keys that are merged and stored in memory. + */ +public class HoodieMetadataMergedInstantRecordScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class); + + HoodieTableMetaClient metaClient; + private List instants; + private Option lastSyncTs; + private Set mergeKeyFilter; + protected final ExternalSpillableMap> records; + + public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient metaClient, List instants, + Option lastSyncTs, Schema readerSchema, Long maxMemorySizeInBytes, + String spillableMapBasePath, Set mergeKeyFilter) throws IOException { + this.metaClient = metaClient; + this.instants = instants; + this.lastSyncTs = lastSyncTs; + this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet(); + this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(readerSchema)); + + scan(); + } + + /** + * Converts instants in scanner to metadata table records and processes each record. + * + * @param + * @throws IOException + */ + private void scan() { + for (HoodieInstant instant : instants) { + try { + Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs); + if (records.isPresent()) { + records.get().forEach(record -> processNextRecord(record)); + } + } catch (Exception e) { + LOG.error(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + throw new HoodieException(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + } + } + } + + /** + * Process metadata table record by merging with existing record if it is a part of the key filter. + * + * @param hoodieRecord + */ + private void processNextRecord(HoodieRecord hoodieRecord) { + String key = hoodieRecord.getRecordKey(); + if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) { + if (records.containsKey(key)) { + // Merge and store the merged record + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); + } else { + // Put the record as is + records.put(key, hoodieRecord); + } + } + } + + /** + * Retrieve merged hoodie record for given key. + * + * @param key of the record to retrieve + * @return {@code HoodieRecord} if key was found else {@code Option.empty()} + */ + public Option> getRecordByKey(String key) { + return Option.ofNullable((HoodieRecord) records.get(key)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java new file mode 100644 index 0000000000000..9a25825a1f89e --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; + +/** + * A utility to convert timeline information to metadata table records. + */ +public class HoodieTableMetadataUtil { + + private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class); + + /** + * Converts a timeline instant to metadata table records. + * + * @param datasetMetaClient The meta client associated with the timeline instant + * @param instant to fetch and convert to metadata table records + * @return a list of metadata table records + * @throws IOException + */ + public static Option> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option lastSyncTs) throws IOException { + HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); + Option> records = Option.empty(); + ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); + + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant); + records = Option.of(convertMetadataToRecords(cleanMetadata, instant.getTimestamp())); + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + records = Option.of(convertMetadataToRecords(commitMetadata, instant.getTimestamp())); + break; + case HoodieTimeline.ROLLBACK_ACTION: + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp(), lastSyncTs)); + break; + case HoodieTimeline.RESTORE_ACTION: + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp(), lastSyncTs)); + break; + case HoodieTimeline.SAVEPOINT_ACTION: + // Nothing to be done here + break; + default: + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + + return records; + } + + /** + * Finds all new files/partitions created as part of commit and creates metadata table records for them. + * + * @param commitMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) { + + List records = new LinkedList<>(); + List allPartitions = new LinkedList<>(); + commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { + final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; + allPartitions.add(partition); + + Map newFiles = new HashMap<>(writeStats.size()); + writeStats.forEach(hoodieWriteStat -> { + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + return; + } + + int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; + String filename = pathWithPartition.substring(offset); + ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); + newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); + }); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( + partition, Option.of(newFiles), Option.empty()); + records.add(record); + }); + + // New partitions created + HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); + records.add(record); + + LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + + ". #partitions_updated=" + records.size()); + return records; + } + + /** + * Finds all files that will be deleted as part of a planned clean and creates metadata table records for them. + * + * @param cleanerPlan from timeline to convert + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCleanerPlan cleanerPlan, String instantTime) { + List records = new LinkedList<>(); + + int[] fileDeleteCount = {0}; + cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { + fileDeleteCount[0] += deletedPathInfo.size(); + + // Files deleted from a partition + List deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) + .collect(Collectors.toList()); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(deletedFilenames)); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + + return records; + } + + /** + * Finds all files that were deleted as part of a clean and creates metadata table records for them. + * + * @param cleanMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) { + List records = new LinkedList<>(); + int[] fileDeleteCount = {0}; + + cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + // Files deleted from a partition + List deletedFiles = partitionMetadata.getSuccessDeleteFiles(); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(new ArrayList<>(deletedFiles))); + + records.add(record); + fileDeleteCount[0] += deletedFiles.size(); + }); + + LOG.info("Found at " + instantTime + " from Clean. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + + return records; + } + + /** + * Aggregates all files deleted and appended to from all rollbacks associated with a restore operation then + * creates metadata table records for them. + * + * @param restoreMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { + Map> partitionToAppendedFiles = new HashMap<>(); + Map> partitionToDeletedFiles = new HashMap<>(); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { + rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); + }); + + + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + } + + public static List convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, Option lastSyncTs) { + + Map> partitionToAppendedFiles = new HashMap<>(); + Map> partitionToDeletedFiles = new HashMap<>(); + processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); + } + + /** + * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. + * + * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This + * function will extract this change file for each partition. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. + * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. + */ + private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, + Option lastSyncTs) { + + rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata + if (lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get())) { + return; + } + + final String partition = pm.getPartitionPath(); + if (!pm.getSuccessDeleteFiles().isEmpty()) { + if (!partitionToDeletedFiles.containsKey(partition)) { + partitionToDeletedFiles.put(partition, new ArrayList<>()); + } + + // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() + List deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) + .collect(Collectors.toList()); + partitionToDeletedFiles.get(partition).addAll(deletedFiles); + } + + if (!pm.getAppendFiles().isEmpty()) { + if (!partitionToAppendedFiles.containsKey(partition)) { + partitionToAppendedFiles.put(partition, new HashMap<>()); + } + + // Extract appended file name from the absolute paths saved in getAppendFiles() + pm.getAppendFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { + return size + oldSize; + }); + }); + } + }); + } + + private static List convertFilesToRecords(Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, String instantTime, + String operation) { + List records = new LinkedList<>(); + int[] fileChangeCount = {0, 0}; // deletes, appends + + partitionToDeletedFiles.forEach((partition, deletedFiles) -> { + fileChangeCount[0] += deletedFiles.size(); + + Option> filesAdded = Option.empty(); + if (partitionToAppendedFiles.containsKey(partition)) { + filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); + } + + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, + Option.of(new ArrayList<>(deletedFiles))); + records.add(record); + }); + + partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { + fileChangeCount[1] += appendedFileMap.size(); + + // Validate that no appended file has been deleted + ValidationUtils.checkState( + !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), + "Rollback file cannot both be appended and deleted"); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), + Option.empty()); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() + + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + + return records; + } + +}