Skip to content

Commit

Permalink
move run sync test into an IT, update default assertions in TestRunSync
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Dec 9, 2024
1 parent 7368ec0 commit 0878ec8
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.utilities;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import lombok.SneakyThrows;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestJavaHudiTable;

class ITRunSync {

@Test
void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
ExecutorService runner = Executors.newSingleThreadExecutor();
String tableName = "test-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
RunSync.DatasetConfig config =
RunSync.DatasetConfig.builder()
.sourceFormat("HUDI")
.targetFormats(Collections.singletonList("ICEBERG"))
.datasets(
Collections.singletonList(
RunSync.DatasetConfig.Table.builder()
.tableBasePath(table.getBasePath())
.tableName(tableName)
.build()))
.build();
File configFile = new File(tempDir + "config.yaml");
RunSync.YAML_MAPPER.writeValue(configFile, config);
String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--continuousMode"};
runner.submit(
() -> {
try {
RunSync.main(args);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
Path metadataPath = tempDir.resolve(tableName + "_v1").resolve("metadata");
waitForNumIcebergCommits(metadataPath, 2);
// write more data now that table is initialized and data is synced
table.insertRows(20);
waitForNumIcebergCommits(metadataPath, 3);
assertEquals(3, numIcebergMetadataJsonFiles(metadataPath));
} finally {
runner.shutdownNow();
}
}

@SneakyThrows
private static void waitForNumIcebergCommits(Path metadataPath, int count) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(5)) {
if (numIcebergMetadataJsonFiles(metadataPath) == count) {
break;
}
Thread.sleep(5000);
}
}

@SneakyThrows
private static long numIcebergMetadataJsonFiles(Path path) {
long count = 0;
if (Files.exists(path)) {
count = Files.list(path).filter(p -> p.toString().endsWith("metadata.json")).count();
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,14 @@
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import lombok.SneakyThrows;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestJavaHudiTable;
import org.apache.xtable.iceberg.IcebergCatalogConfig;
import org.apache.xtable.utilities.RunSync.TableFormatConverters;
import org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
Expand All @@ -61,7 +45,7 @@ public void testLoadDefaultHadoopConfig() {

conf = RunSync.loadHadoopConf(null);
value = conf.get("fs.file.impl");
assertEquals("org.apache.hadoop.fs.LocalFileSystem", value);
Assertions.assertEquals("org.apache.hadoop.fs.LocalFileSystem", value);
}

/** Tests that the custom hadoop configs are loaded and can override defaults. */
Expand All @@ -87,29 +71,38 @@ public void testLoadCustomHadoopConfig() {

conf = RunSync.loadHadoopConf(customXmlConfig.getBytes());
value = conf.get("fs.file.impl");
assertEquals("override_default_value", value);
Assertions.assertEquals("override_default_value", value);
value = conf.get("fs.azure.account.oauth2.client.endpoint");
assertEquals("https://login.microsoftonline.com/", value);
Assertions.assertEquals("https://login.microsoftonline.com/", value);
}

@Test
public void testTableFormatConverterConfigDefault() throws IOException {
TableFormatConverters converters = RunSync.loadTableFormatConversionConfigs(null);
Map<String, ConversionConfig> tfConverters = converters.getTableFormatConverters();
assertEquals(3, tfConverters.size());
Assertions.assertEquals(3, tfConverters.size());
Assertions.assertNotNull(tfConverters.get(DELTA));
Assertions.assertNotNull(tfConverters.get(HUDI));
Assertions.assertNotNull(tfConverters.get(ICEBERG));

assertEquals(
Assertions.assertEquals(
"org.apache.xtable.hudi.HudiConversionSourceProvider",
tfConverters.get(HUDI).getConversionSourceProviderClass());
assertEquals(
Assertions.assertEquals(
"org.apache.xtable.hudi.HudiConversionTarget",
tfConverters.get(HUDI).getConversionTargetProviderClass());
Assertions.assertEquals(
"org.apache.xtable.iceberg.IcebergConversionTarget",
tfConverters.get(ICEBERG).getConversionTargetProviderClass());
assertEquals(
Assertions.assertEquals(
"org.apache.xtable.iceberg.IcebergConversionSourceProvider",
tfConverters.get(ICEBERG).getConversionSourceProviderClass());
Assertions.assertEquals(
"org.apache.xtable.delta.DeltaConversionTarget",
tfConverters.get(DELTA).getConversionTargetProviderClass());
Assertions.assertEquals(
"org.apache.xtable.delta.DeltaConversionSourceProvider",
tfConverters.get(DELTA).getConversionSourceProviderClass());
}

@Test
Expand All @@ -127,17 +120,18 @@ public void testTableFormatConverterCustom() throws IOException {
TableFormatConverters converters =
RunSync.loadTableFormatConversionConfigs(customConverters.getBytes());
Map<String, ConversionConfig> tfConverters = converters.getTableFormatConverters();
assertEquals(4, tfConverters.size());
Assertions.assertEquals(4, tfConverters.size());

Assertions.assertNotNull(tfConverters.get("NEW_FORMAT"));
assertEquals("bar", tfConverters.get("NEW_FORMAT").getConversionSourceProviderClass());
Assertions.assertEquals(
"bar", tfConverters.get("NEW_FORMAT").getConversionSourceProviderClass());

assertEquals("foo", tfConverters.get(HUDI).getConversionSourceProviderClass());
Assertions.assertEquals("foo", tfConverters.get(HUDI).getConversionSourceProviderClass());

Map<String, String> deltaConfigs = tfConverters.get(DELTA).getConfiguration();
assertEquals(3, deltaConfigs.size());
assertEquals("local[4]", deltaConfigs.get("spark.master"));
assertEquals("bar", deltaConfigs.get("foo"));
Assertions.assertEquals(3, deltaConfigs.size());
Assertions.assertEquals("local[4]", deltaConfigs.get("spark.master"));
Assertions.assertEquals("bar", deltaConfigs.get("foo"));
}

@Test
Expand All @@ -149,71 +143,10 @@ public void testIcebergCatalogConfig() throws IOException {
+ " option1: value1\n"
+ " option2: value2";
IcebergCatalogConfig catalogConfig = RunSync.loadIcebergCatalogConfig(icebergConfig.getBytes());
assertEquals("org.apache.xtable.CatalogImpl", catalogConfig.getCatalogImpl());
assertEquals("test", catalogConfig.getCatalogName());
assertEquals(2, catalogConfig.getCatalogOptions().size());
assertEquals("value1", catalogConfig.getCatalogOptions().get("option1"));
assertEquals("value2", catalogConfig.getCatalogOptions().get("option2"));
}

@Test
void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
ExecutorService runner = Executors.newSingleThreadExecutor();
String tableName = "test-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
RunSync.DatasetConfig config =
RunSync.DatasetConfig.builder()
.sourceFormat("HUDI")
.targetFormats(Collections.singletonList("ICEBERG"))
.datasets(
Collections.singletonList(
RunSync.DatasetConfig.Table.builder()
.tableBasePath(table.getBasePath())
.tableName(tableName)
.build()))
.build();
File configFile = new File(tempDir + "config.yaml");
RunSync.YAML_MAPPER.writeValue(configFile, config);
String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--continuousMode"};
runner.submit(
() -> {
try {
RunSync.main(args);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
Path metadataPath = tempDir.resolve(tableName + "_v1").resolve("metadata");
waitForNumIcebergCommits(metadataPath, 2);
// write more data now that table is initialized and data is synced
table.insertRows(20);
waitForNumIcebergCommits(metadataPath, 3);
assertEquals(3, numIcebergMetadataJsonFiles(metadataPath));
} finally {
runner.shutdownNow();
}
}

@SneakyThrows
private static void waitForNumIcebergCommits(Path metadataPath, int count) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(5)) {
if (numIcebergMetadataJsonFiles(metadataPath) == count) {
break;
}
Thread.sleep(5000);
}
}

@SneakyThrows
private static long numIcebergMetadataJsonFiles(Path path) {
long count = 0;
if (Files.exists(path)) {
count = Files.list(path).filter(p -> p.toString().endsWith("metadata.json")).count();
}
return count;
Assertions.assertEquals("org.apache.xtable.CatalogImpl", catalogConfig.getCatalogImpl());
Assertions.assertEquals("test", catalogConfig.getCatalogName());
Assertions.assertEquals(2, catalogConfig.getCatalogOptions().size());
Assertions.assertEquals("value1", catalogConfig.getCatalogOptions().get("option1"));
Assertions.assertEquals("value2", catalogConfig.getCatalogOptions().get("option2"));
}
}

0 comments on commit 0878ec8

Please sign in to comment.