Skip to content

Commit

Permalink
Fix TestHiveSkipEmptyFiles temporal files and directories creation
Browse files Browse the repository at this point in the history
  • Loading branch information
cvarelad-denodo authored and tdcmeehan committed Jun 13, 2024
1 parent b712b34 commit 24d3b6c
Showing 1 changed file with 92 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,26 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
import java.net.URL;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.io.Files.createTempDir;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static java.lang.String.format;
import static java.nio.file.Files.createTempFile;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
Expand All @@ -52,6 +60,7 @@ public class TestHiveSkipEmptyFiles
private DistributedQueryRunner queryFailRunner;
private DistributedQueryRunner queryBucketRunner;
private DistributedQueryRunner queryBucketFailRunner;
File temporaryDirectory;

@Override
protected QueryRunner createQueryRunner()
Expand Down Expand Up @@ -80,6 +89,25 @@ protected QueryRunner createQueryRunner()
return this.queryRunner;
}

@BeforeClass
private void generateMetadataDirectory()
throws Exception
{
temporaryDirectory = createTempDir();
generateMetadata("skip_empty_files_success");
generateMetadata("skip_empty_files_fail");
generateBucketedMetadataWithEmptyFiles(queryBucketRunner, "skip_empty_files_bucket_success", false);
generateBucketedMetadataWithEmptyFiles(queryBucketFailRunner, "skip_empty_files_bucket_insert_fail", false);
generateBucketedMetadataWithEmptyFiles(queryBucketFailRunner, "skip_empty_files_bucket_replace_fail", true);
}

@AfterClass(alwaysRun = true)
private void tearDown()
throws IOException
{
deleteRecursively(temporaryDirectory.toPath(), ALLOW_INSECURE);
}

@BeforeClass
private void createQueryFailRunner()
throws Exception
Expand Down Expand Up @@ -159,168 +187,85 @@ private void createQueryBucketFailRunner()
* Generates a temporary directory and creates two parquet files inside, one is empty and the other is not
*
* @param tableName a {@link String} containing the desired table name
* @return a {@link File} pointing to the newly created temporary directory
*/
private static File generateMetadata(String tableName)
private void generateMetadata(String tableName)
throws Exception
{
// obtains the root resource directory in order to create temporary tables
URL url = TestHiveSkipEmptyFiles.class.getClassLoader().getResource(".");
if (url == null) {
throw new RuntimeException("Could not obtain resource URL");
}
File temporaryDirectory = new File(url.getPath(), tableName);
boolean created = temporaryDirectory.mkdirs();
if (!created) {
throw new RuntimeException("Could not create resource directory: " + temporaryDirectory.getPath());
}
File firstParquetFile = new File(temporaryDirectory, randomUUID().toString());
ParquetTester.writeParquetFileFromPresto(firstParquetFile,
Path tempDirectory = Files.createDirectory(Paths.get(temporaryDirectory.getPath(), tableName));
Path firstParquetFile = createTempFile(tempDirectory, randomUUID().toString(), randomUUID().toString());
ParquetTester.writeParquetFileFromPresto(firstParquetFile.toFile(),
ImmutableList.of(IntegerType.INTEGER),
Collections.singletonList("field"),
new Iterable[] {Collections.singleton(1)},
1,
GZIP,
PARQUET_2_0);
File secondParquetFile = new File(temporaryDirectory, randomUUID().toString());
if (!secondParquetFile.createNewFile()) {
throw new RuntimeException("Could not create empty file");
}
return temporaryDirectory;
createTempFile(tempDirectory, randomUUID().toString(), randomUUID().toString());
}

/**
* Generates a temporary directory and inserts data in every partition of the bucketed table, including an empty file in the first partition
*
* @param queryRunner a {@link QueryRunner} with the desired configuration properties
* @param tableName a {@link String} containing the desired table name
* @return a {@link File} pointing to the newly created temporary directory
* @param replaceDataFileByEmptyFile a {@code true} if it is necessary to delete a partition file
*/
private File generateBucketedMetadata(DistributedQueryRunner queryRunner, String tableName, boolean replace) throws Exception
private void generateBucketedMetadataWithEmptyFiles(DistributedQueryRunner queryRunner, String tableName, boolean replaceDataFileByEmptyFile) throws Exception
{
URL url = TestHiveSkipEmptyFiles.class.getClassLoader().getResource(".");
if (url == null) {
throw new RuntimeException("Could not obtain resource URL");
}
File temporaryDirectory = new File(url.getPath(), tableName);
boolean created = temporaryDirectory.mkdirs();
if (!created) {
throw new RuntimeException("Could not create resource directory: " + temporaryDirectory.getPath());
}
Path tempDirectory = Files.createDirectory(Paths.get(temporaryDirectory.getPath(), tableName));
@Language("SQL") String createQuery = format("CREATE TABLE %s.\"%s\".\"%s\" (id %s, field %s) WITH (external_location = '%s'," +
"format = 'Parquet',partitioned_by = ARRAY['field']," +
" bucketed_by = ARRAY['id']," +
" bucket_count = 3)",
CATALOG, SCHEMA, tableName, IntegerType.INTEGER, VarcharType.VARCHAR, getResourceUrl(tableName));
CATALOG, SCHEMA, tableName, IntegerType.INTEGER, VarcharType.VARCHAR, tempDirectory.toUri());
queryRunner.execute(createQuery);
String partitionDirectoryPath = temporaryDirectory.getPath() + "/field=field1";
String name = "field";
Path partitionDirectory = Paths.get(tempDirectory + "/field=field1");
String partitionName = "field";
@Language("SQL") String insertQuery;
for (int i = 1; i <= 5; i++) {
insertQuery = format("INSERT INTO %s.\"%s\".\"%s\" VALUES (%s,'%s')",
CATALOG, SCHEMA, tableName, i, name + i);
CATALOG, SCHEMA, tableName, i, partitionName + i);
queryRunner.execute(insertQuery);
if (i == 1) {
File secondParquetFile = new File(partitionDirectoryPath, randomUUID().toString());
if (!secondParquetFile.createNewFile()) {
throw new RuntimeException("Could not create empty file");
}
}
}
if (replace) {
File partitionDirectory = new File(partitionDirectoryPath);
deleteMetadata(partitionDirectory, true);
}
return temporaryDirectory;
}

/**
* Deletes the given directory and all of its contents recursively
* Does not follow symbolic links
*
* @param temporaryDirectory a {@link File} pointing to the directory to delete
* @param hasPattern a {@code true} if it is necessary to delete only one directory file, {@code false} otherwise
*/
private static void deleteMetadata(File temporaryDirectory, boolean hasPattern)
{
@Language("RegExp") String nameStart = "000000_.*";
File[] metadataFiles = temporaryDirectory.listFiles();
if (metadataFiles != null) {
for (File file : metadataFiles) {
if ((!Files.isSymbolicLink(file.toPath()) && !hasPattern) || (hasPattern && file.getName().matches(nameStart))) {
deleteMetadata(file, false);
}
}
}
if (!hasPattern && !temporaryDirectory.delete()) {
throw new RuntimeException("Could not to delete metadata");
if (replaceDataFileByEmptyFile) {
Files.delete(Arrays.stream(requireNonNull(partitionDirectory.toFile().listFiles((dir, name) -> !name.endsWith(".crc")))).iterator().next().toPath());
}
createTempFile(partitionDirectory, randomUUID().toString(), randomUUID().toString());
}

@Test
public void testSkipEmptyFilesSuccessful()
throws Exception
{
String tableName = "skip_empty_files_success";
File resourcesLocation = generateMetadata(tableName);
executeCreationTestAndDropCycle(queryRunner, tableName, getResourceUrl(tableName), false, null);
deleteMetadata(resourcesLocation, false);
executeCreationTestAndDropCycle(queryRunner, tableName, temporaryDirectory.toURI() + tableName);
}

@Test
public void testSkipEmptyFilesError()
throws Exception
{
String tableName = "skip_empty_files_fail";
File resourcesLocation = generateMetadata(tableName);
executeCreationTestAndDropCycle(queryFailRunner, tableName, getResourceUrl(tableName), true,
".* is not a valid Parquet File");
deleteMetadata(resourcesLocation, false);
executeCreationTestAndDropCycleFail(queryFailRunner, tableName, temporaryDirectory.toURI() + tableName);
}

@Test
public void testSkipEmptyFilesBucketSuccessful()
throws Exception
{
String tableName = "skip_empty_files_bucket_success";
File resourcesLocation = generateBucketedMetadata(queryBucketRunner, tableName, false);
checkBucketedResult(queryBucketRunner, tableName, false, null);
deleteMetadata(resourcesLocation, false);
checkBucketedResult(queryBucketRunner, tableName);
}

@Test
public void testSkipEmptyFilesBucketInsertFileFail()
throws Exception
{
String tableName = "skip_empty_files_bucket_insert_fail";
File resourcesLocation = generateBucketedMetadata(queryBucketFailRunner, tableName, false);
checkBucketedResult(queryBucketFailRunner, tableName, true, ".* is corrupt.* does not match the standard naming pattern, and the number of files in the directory .* does not match the declared bucket count.*");
deleteMetadata(resourcesLocation, false);
checkBucketedResultFail(queryBucketFailRunner, tableName, ".* is corrupt.* does not match the standard naming pattern, and the number of files in the directory .* does not match the declared bucket count.*");
}

@Test
public void testSkipEmptyFilesBucketReplaceFileFail()
throws Exception
{
String tableName = "skip_empty_files_bucket_replace_fail";
File resourcesLocation = generateBucketedMetadata(queryBucketFailRunner, tableName, true);
checkBucketedResult(queryBucketFailRunner, tableName, true, ".* is not a valid Parquet File");
deleteMetadata(resourcesLocation, false);
}

/**
* Obtains the external location from the local resources directory of the project
*
* @param tableName a {@link String} containing the directory name to search for
* @return a {@link String} with the external location for the given table_name
*/
private static String getResourceUrl(String tableName)
{
URL resourceUrl = TestHiveSkipEmptyFiles.class.getClassLoader().getResource(tableName);
if (resourceUrl == null) {
throw new RuntimeException("Cannot find resource path for table name: " + tableName);
}
return resourceUrl.toString();
checkBucketedResultFail(queryBucketFailRunner, tableName, ".* is not a valid Parquet File");
}

/**
Expand All @@ -329,21 +274,28 @@ private static String getResourceUrl(String tableName)
*
* @param queryRunner a {@link QueryRunner} with the desired configuration properties
* @param tableName a {@link String} containing the desired table name
* @param shouldFail {@code true} if the table creation should fail, {@code false} otherwise
* @param errorMessage a {@link String} containing the expected error message. Will be checked if {@code shouldFail} is {@code true}
*/
private void checkBucketedResult(DistributedQueryRunner queryRunner, String tableName, boolean shouldFail, @Language("RegExp") String errorMessage)
private void checkBucketedResult(DistributedQueryRunner queryRunner, String tableName)
{
try {
@Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
if (shouldFail) {
assertQueryFails(queryRunner, selectQuery, errorMessage);
}
else {
MaterializedResult result = queryRunner.execute(selectQuery);
assertEquals(5, result.getRowCount());
}
MaterializedResult result = queryRunner.execute(selectQuery);
assertEquals(5, result.getRowCount());
}
finally {
@Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
queryRunner.execute(dropQuery);
}
}

private void checkBucketedResultFail(DistributedQueryRunner queryRunner, String tableName, @Language("RegExp") String errorMessage)
{
try {
@Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
assertQueryFails(queryRunner, selectQuery, errorMessage);
}
finally {
@Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
Expand All @@ -359,12 +311,34 @@ private void checkBucketedResult(DistributedQueryRunner queryRunner, String tabl
* @param queryRunner a {@link QueryRunner} with the desired configuration properties
* @param tableName a {@link String} containing the desired table name
* @param externalLocation a {@link String} with the external location to create the table against it
* @param shouldFail {@code true} if the table creation should fail, {@code false} otherwise
* @param errorMessage a {@link String} containing the expected error message. Will be checked if {@code shouldFail} is {@code true}
*/
private void executeCreationTestAndDropCycle(DistributedQueryRunner queryRunner, String tableName, String externalLocation, boolean shouldFail, @Language("RegExp") String errorMessage)
private void executeCreationTestAndDropCycle(DistributedQueryRunner queryRunner, String tableName, String externalLocation)
{
try {
@Language("SQL") String createQuery = format(
"CREATE TABLE %s.\"%s\".\"%s\" (field %s) WITH (external_location = '%s')",
CATALOG,
SCHEMA,
tableName,
IntegerType.INTEGER,
externalLocation);
queryRunner.execute(createQuery);
@Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
MaterializedResult result = queryRunner.execute(selectQuery);
assertEquals(1, result.getRowCount());
}
finally {
@Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
queryRunner.execute(dropQuery);
}
}

private void executeCreationTestAndDropCycleFail(DistributedQueryRunner queryRunner, String tableName, String externalLocation)
{
try {
@Language("RegExp") String errorMessage = ".* is not a valid Parquet File";
@Language("SQL") String createQuery = format(
"CREATE TABLE %s.\"%s\".\"%s\" (field %s) WITH (external_location = '%s')",
CATALOG,
Expand All @@ -375,13 +349,7 @@ private void executeCreationTestAndDropCycle(DistributedQueryRunner queryRunner,
queryRunner.execute(createQuery);
@Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
SCHEMA, tableName);
if (shouldFail) {
assertQueryFails(queryRunner, selectQuery, errorMessage);
}
else {
MaterializedResult result = queryRunner.execute(selectQuery);
assertEquals(1, result.getRowCount());
}
assertQueryFails(queryRunner, selectQuery, errorMessage);
}
finally {
@Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
Expand Down

0 comments on commit 24d3b6c

Please sign in to comment.