Skip to content

Commit

Permalink
add single sync mode testing
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Dec 9, 2024
1 parent 0878ec8 commit 82c0d59
Showing 1 changed file with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -42,6 +44,21 @@

class ITRunSync {

@Test
void testSingleSyncMode(@TempDir Path tempDir) throws IOException {
String tableName = "test-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
File configFile = writeConfigFile(tempDir, table, tableName);
String[] args = new String[] {"--datasetConfig", configFile.getPath()};
RunSync.main(args);
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
waitForNumIcebergCommits(icebergMetadataPath, 2);
}
}

@Test
void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
ExecutorService runner = Executors.newSingleThreadExecutor();
Expand All @@ -50,19 +67,7 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
RunSync.DatasetConfig config =
RunSync.DatasetConfig.builder()
.sourceFormat("HUDI")
.targetFormats(Collections.singletonList("ICEBERG"))
.datasets(
Collections.singletonList(
RunSync.DatasetConfig.Table.builder()
.tableBasePath(table.getBasePath())
.tableName(tableName)
.build()))
.build();
File configFile = new File(tempDir + "config.yaml");
RunSync.YAML_MAPPER.writeValue(configFile, config);
File configFile = writeConfigFile(tempDir, table, tableName);
String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--continuousMode"};
runner.submit(
() -> {
Expand All @@ -72,17 +77,34 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
throw new UncheckedIOException(ex);
}
});
Path metadataPath = tempDir.resolve(tableName + "_v1").resolve("metadata");
waitForNumIcebergCommits(metadataPath, 2);
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
waitForNumIcebergCommits(icebergMetadataPath, 2);
// write more data now that table is initialized and data is synced
table.insertRows(20);
waitForNumIcebergCommits(metadataPath, 3);
assertEquals(3, numIcebergMetadataJsonFiles(metadataPath));
waitForNumIcebergCommits(icebergMetadataPath, 3);
assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath));
} finally {
runner.shutdownNow();
}
}

private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) throws IOException {
RunSync.DatasetConfig config =
RunSync.DatasetConfig.builder()
.sourceFormat("HUDI")
.targetFormats(Collections.singletonList("ICEBERG"))
.datasets(
Collections.singletonList(
RunSync.DatasetConfig.Table.builder()
.tableBasePath(table.getBasePath())
.tableName(tableName)
.build()))
.build();
File configFile = new File(tempDir + "config.yaml");
RunSync.YAML_MAPPER.writeValue(configFile, config);
return configFile;
}

@SneakyThrows
private static void waitForNumIcebergCommits(Path metadataPath, int count) {
long start = System.currentTimeMillis();
Expand Down

0 comments on commit 82c0d59

Please sign in to comment.