Skip to content

Commit

Permalink
[HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata t…
Browse files Browse the repository at this point in the history
…able (#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
  • Loading branch information
Ryan Pifer authored and vinothchandar committed Jan 4, 2021
1 parent 2bd4a68 commit 4b94529
Show file tree
Hide file tree
Showing 11 changed files with 910 additions and 469 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public SyncableFileSystemView getHoodieView() {
private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
if (config.useFileListingMetadata()) {
FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
return new HoodieMetadataFileSystemView(metaClient, this.metadata, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, viewConfig.isIncrementalTimelineSyncEnabled());
} else {
return getViewManager().getFileSystemView(metaClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -70,6 +71,11 @@ public void setUp() throws Exception {
client = new CompactionAdminClient(context, basePath);
}

@AfterEach
public void cleanUp() throws Exception {
cleanupResources();
}

@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestHoodieFsMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class);
public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);

@TempDir
public java.nio.file.Path tempFolder;
Expand All @@ -95,7 +95,7 @@ public void init(HoodieTableType tableType) throws IOException {
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
initMetaClient();
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);

Expand Down Expand Up @@ -371,7 +371,41 @@ public void testRollbackOperations() throws Exception {
client.syncTableMetadata();
validateMetadata(client);
}
}

/**
* Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op
* occurs to metadata.
* @throws Exception
*/
@Test
public void testRollbackUnsyncedCommit() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Initialize table with metadata
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}

String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Commit with metadata disabled
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
validateMetadata(client);
}
}

/**
Expand Down Expand Up @@ -636,33 +670,104 @@ public void testMetadataMetrics() throws Exception {
}
}

/**
* Test when reading from metadata table which is out of sync with dataset that results are still consistent.
*/
// @ParameterizedTest
// @EnumSource(HoodieTableType.class)
@Test
public void testMetadataOutOfSync() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true));

// Enable metadata so table is initialized
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Perform Bulk Insert
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
}

// Perform commit operations with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Perform Insert
String newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();

// Perform Upsert
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 20);
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();

// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "004";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
}
}

assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);

// Perform clean operation with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// One more commit needed to trigger clean so upsert and compact
String newCommitTime = "005";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();

if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "006";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
}

// Clean
newCommitTime = "007";
client.clean(newCommitTime);
}

assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);

// Perform restore with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.restoreToInstant("004");
}

assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);
}


/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
*
* @throws IOException
*/
private void validateMetadata(SparkRDDWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");

HoodieBackedTableMetadata tableMetadata = metadata(client);
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
if (!config.useFileListingMetadata()) {
return;
}

HoodieTimer timer = new HoodieTimer().startTimer();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

// Validate write config for metadata table
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");

// Metadata table should be in sync with the dataset
assertTrue(metadata(client).isInSync());

// Partitions should match
List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath);
List<String> metadataPartitions = metadataWriter.metadata().getAllPartitionPaths();
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();

Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);
Expand All @@ -684,7 +789,7 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {
partitionPath = new Path(basePath, partition);
}
FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath);
FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
Expand All @@ -705,9 +810,9 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());

fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));

long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
Expand All @@ -720,10 +825,17 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {
}
});

HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");

// Validate write config for metadata table
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");

// Metadata table should be in sync with the dataset
assertTrue(metadataWriter.metadata().isInSync());
assertTrue(metadata(client).isInSync());
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);

// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -91,6 +92,11 @@ public void setUp() throws Exception {
initDFSMetaClient();
}

@AfterEach
public void cleanUp() throws Exception {
cleanupResources();
}

@Test
public void testLeftOverUpdatedPropFileCleanup() throws IOException {
testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
Expand Down Expand Up @@ -204,6 +205,10 @@ protected void cleanupFileSystem() throws IOException {
* @throws IOException
*/
protected void initMetaClient() throws IOException {
initMetaClient(getTableType());
}

protected void initMetaClient(HoodieTableType tableType) throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
Expand All @@ -212,7 +217,7 @@ protected void initMetaClient() throws IOException {
throw new IllegalStateException("The Spark context has not been initialized.");
}

metaClient = HoodieTestUtils.init(context.getHadoopConf().get(), basePath, getTableType());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
}

/**
Expand Down
Loading

0 comments on commit 4b94529

Please sign in to comment.