Skip to content

Commit

Permalink
Modify layout of grouping files (#4525)
Browse files Browse the repository at this point in the history
Also, did some renaming from grouping->indexing
  • Loading branch information
malhotrashivam authored Oct 31, 2023
1 parent 242742c commit bc26812
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,14 @@ public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
public FileVisitResult preVisitDirectory(
@NotNull final Path dir,
@NotNull final BasicFileAttributes attrs) {
final String dirName = dir.getFileName().toString();
// Skip dot directories
if (!dirName.isEmpty() && dirName.charAt(0) == '.') {
return FileVisitResult.SKIP_SUBTREE;
}
if (++columnCount > 0) {
// We're descending and past the root
final String[] components = dir.getFileName().toString().split("=", 2);
final String[] components = dirName.split("=", 2);
if (components.length != 2) {
throw new TableDataException(
"Unexpected directory name format (not key=value) at " + dir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.function.Function;

import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;
Expand Down Expand Up @@ -237,9 +236,34 @@ private static String minusParquetSuffix(@NotNull final String s) {
return s;
}

public static Function<String, String> defaultGroupingFileName(@NotNull final String path) {
final String prefix = minusParquetSuffix(path);
return columnName -> prefix + "_" + columnName + "_grouping.parquet";
/**
* Generates the index file path relative to the table destination file path.
*
* @param tableDest Destination path for the main table containing these indexing columns
* @param columnName Name of the indexing column
*
* @return The relative index file path. For example, for table with destination {@code "table.parquet"} and
* indexing column {@code "IndexingColName"}, the method will return
* {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"}
*/
public static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String columnName) {
return String.format(".dh_metadata/indexes/%s/index_%s_%s", columnName, columnName, tableDest.getName());
}

/**
* Legacy method for generating a grouping file name. We used to place grouping files right next to the original
* table destination.
*
* @param tableDest Destination path for the main table containing these grouping columns
* @param columnName Name of the grouping column
*
* @return The relative grouping file path. For example, for table with destination {@code "table.parquet"} and
* grouping column {@code "GroupingColName"}, the method will return
* {@code "table_GroupingColName_grouping.parquet"}
*/
public static String legacyGroupingFileName(@NotNull final File tableDest, @NotNull final String columnName) {
final String prefix = minusParquetSuffix(tableDest.getName());
return prefix + "_" + columnName + "_grouping.parquet";
}

/**
Expand Down Expand Up @@ -285,7 +309,7 @@ private static void installShadowFile(@NotNull final File destFile, @NotNull fin
}

/**
* Roll back any changes made in the {@link #installShadowFile} in a best effort manner
* Roll back any changes made in the {@link #installShadowFile} in best-effort manner
*/
private static void rollbackFile(@NotNull final File destFile) {
final File backupDestFile = getBackupFile(destFile);
Expand All @@ -297,7 +321,7 @@ private static void rollbackFile(@NotNull final File destFile) {
/**
* Make any missing ancestor directories of {@code destination}.
*
* @param destination The destination file
* @param destination The destination parquet file
* @return The first created directory, or null if no directories were made.
*/
private static File prepareDestinationFileLocation(@NotNull File destination) {
Expand Down Expand Up @@ -362,12 +386,13 @@ private static Map<String, ParquetTableWriter.GroupingColumnWritingInfo> groupin
for (int gci = 0; gci < groupingColumnNames.length; gci++) {
final String groupingColumnName = groupingColumnNames[gci];
final String parquetColumnName = parquetColumnNames[gci];
final String groupingFilePath = defaultGroupingFileName(destFile.getPath()).apply(parquetColumnName);
final File groupingFile = new File(groupingFilePath);
deleteBackupFile(groupingFile);
final File shadowGroupingFile = getShadowFile(groupingFile);
final String indexFileRelativePath = getRelativeIndexFilePath(destFile, parquetColumnName);
final File indexFile = new File(destFile.getParent(), indexFileRelativePath);
prepareDestinationFileLocation(indexFile);
deleteBackupFile(indexFile);
final File shadowIndexFile = getShadowFile(indexFile);
gcwim.put(groupingColumnName, new ParquetTableWriter.GroupingColumnWritingInfo(parquetColumnName,
groupingFile, shadowGroupingFile));
indexFile, shadowIndexFile));
}
return gcwim;
}
Expand Down Expand Up @@ -397,15 +422,12 @@ public static void writeParquetTables(@NotNull final Table[] sources,
}
Arrays.stream(destinations).forEach(ParquetTools::deleteBackupFile);

// Write tables at temporary shadow file paths in the same directory to prevent overwriting any existing files
// Write tables and index files at temporary shadow file paths in the same directory to prevent overwriting
// any existing files
final File[] shadowDestFiles =
Arrays.stream(destinations)
.map(ParquetTools::getShadowFile)
.toArray(File[]::new);
Arrays.stream(destinations).map(ParquetTools::getShadowFile).toArray(File[]::new);
final File[] firstCreatedDirs =
Arrays.stream(shadowDestFiles)
.map(ParquetTools::prepareDestinationFileLocation)
.toArray(File[]::new);
Arrays.stream(shadowDestFiles).map(ParquetTools::prepareDestinationFileLocation).toArray(File[]::new);

// List of shadow files, to clean up in case of exceptions
final List<File> shadowFiles = new ArrayList<>();
Expand All @@ -426,7 +448,7 @@ public static void writeParquetTables(@NotNull final Table[] sources,
// Create grouping info for each table and write the table and grouping files to shadow path
groupingColumnWritingInfoMaps = new ArrayList<>(sources.length);

// Shared parquet column names across all tables
// Same parquet column names across all tables
final String[] parquetColumnNames = Arrays.stream(groupingColumns)
.map(writeInstructions::getParquetColumnNameFromColumnNameOrDefault)
.toArray(String[]::new);
Expand Down Expand Up @@ -455,10 +477,10 @@ public static void writeParquetTables(@NotNull final Table[] sources,
final Map<String, ParquetTableWriter.GroupingColumnWritingInfo> gcwim =
groupingColumnWritingInfoMaps.get(tableIdx);
for (final ParquetTableWriter.GroupingColumnWritingInfo gfwi : gcwim.values()) {
final File groupingDestFile = gfwi.metadataFilePath;
final File shadowGroupingFile = gfwi.destFile;
destFiles.add(groupingDestFile);
installShadowFile(groupingDestFile, shadowGroupingFile);
final File indexDestFile = gfwi.metadataFilePath;
final File shadowIndexFile = gfwi.destFile;
destFiles.add(indexDestFile);
installShadowFile(indexDestFile, shadowIndexFile);
}
}
}
Expand Down Expand Up @@ -570,7 +592,12 @@ private static Table readTableInternal(
return readPartitionedTableWithMetadata(source, instructions);
}
final Path firstEntryPath;
try (final DirectoryStream<Path> sourceStream = Files.newDirectoryStream(sourcePath)) {
// Ignore dot files while looking for the first entry
try (final DirectoryStream<Path> sourceStream =
Files.newDirectoryStream(sourcePath, (path) -> {
final String filename = path.getFileName().toString();
return !filename.isEmpty() && filename.charAt(0) != '.';
})) {
final Iterator<Path> entryIterator = sourceStream.iterator();
if (!entryIterator.hasNext()) {
throw new TableDataException("Source directory " + source + " is empty");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.deephaven.parquet.table.layout;

import java.nio.file.Path;
import io.deephaven.parquet.table.ParquetTableWriter;

import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import java.nio.file.Path;

final class ParquetFileHelper {

public static boolean fileNameMatches(Path path) {
return path.getFileName().toString().endsWith(PARQUET_FILE_EXTENSION);
/**
* Used as a filter to select relevant parquet files while reading all files in a directory.
*/
static boolean fileNameMatches(final Path path) {
final String fileName = path.getFileName().toString();
return fileName.endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -146,33 +147,55 @@ private ParquetTableLocation tl() {
private static final ColumnDefinition<Long> LAST_KEY_COL_DEF =
ColumnDefinition.ofLong("__lastKey__");

/**
* Helper method for logging a warning on failure in reading an index file
*/
private void logWarnFailedToRead(final String indexFilePath) {
log.warn().append("Failed to read expected index file ").append(indexFilePath)
.append(" for table location ").append(tl()).append(", column ")
.append(getName())
.endl();
}

@Override
@Nullable
public <METADATA_TYPE> METADATA_TYPE getMetadata(
@NotNull final ColumnDefinition<?> columnDefinition) {
public <METADATA_TYPE> METADATA_TYPE getMetadata(@NotNull final ColumnDefinition<?> columnDefinition) {
if (!hasGroupingTable) {
return null;
}

final Function<String, String> defaultGroupingFilenameByColumnName =
ParquetTools.defaultGroupingFileName(tl().getParquetFile().getAbsolutePath());
final File parquetFile = tl().getParquetFile();
try {
final GroupingColumnInfo groupingColumnInfo =
tl().getGroupingColumns().get(parquetColumnName);
final ParquetFileReader parquetFileReader;
final String groupingFileName = groupingColumnInfo == null
? defaultGroupingFilenameByColumnName.apply(parquetColumnName)
: tl().getParquetFile().toPath().getParent()
.resolve(groupingColumnInfo.groupingTablePath()).toString();
try {
parquetFileReader =
new ParquetFileReader(groupingFileName, tl().getChannelProvider());
} catch (Exception e) {
log.warn().append("Failed to read expected grouping file ").append(groupingFileName)
.append(" for table location ").append(tl()).append(", column ")
.append(getName())
.endl();
return null;
ParquetFileReader parquetFileReader;
final String indexFilePath;
final GroupingColumnInfo groupingColumnInfo = tl().getGroupingColumns().get(parquetColumnName);
if (groupingColumnInfo != null) {
final String indexFileRelativePath = groupingColumnInfo.groupingTablePath();
indexFilePath = parquetFile.toPath().getParent().resolve(indexFileRelativePath).toString();
try {
parquetFileReader = new ParquetFileReader(indexFilePath, tl().getChannelProvider());
} catch (final RuntimeException e) {
logWarnFailedToRead(indexFilePath);
return null;
}
} else {
final String relativeIndexFilePath =
ParquetTools.getRelativeIndexFilePath(parquetFile, parquetColumnName);
indexFilePath = parquetFile.toPath().getParent().resolve(relativeIndexFilePath).toString();
try {
parquetFileReader = new ParquetFileReader(indexFilePath, tl().getChannelProvider());
} catch (final RuntimeException e1) {
// Retry with legacy grouping file path
final String legacyGroupingFileName =
ParquetTools.legacyGroupingFileName(parquetFile, parquetColumnName);
final File legacyGroupingFile = new File(parquetFile.getParent(), legacyGroupingFileName);
try {
parquetFileReader =
new ParquetFileReader(legacyGroupingFile.getAbsolutePath(), tl().getChannelProvider());
} catch (final RuntimeException e2) {
logWarnFailedToRead(indexFilePath);
return null;
}
}
}
final Map<String, ColumnTypeInfo> columnTypes = ParquetSchemaReader.parseMetadata(
new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData)
Expand All @@ -187,7 +210,7 @@ public <METADATA_TYPE> METADATA_TYPE getMetadata(
final ColumnChunkReader endPosReader =
rowGroupReader.getColumnChunk(Collections.singletonList(END_POS));
if (groupingKeyReader == null || beginPosReader == null || endPosReader == null) {
log.warn().append("Grouping file ").append(groupingFileName)
log.warn().append("Index file ").append(indexFilePath)
.append(" is missing one or more expected columns for table location ")
.append(tl()).append(", column ").append(getName());
return null;
Expand Down
Loading

0 comments on commit bc26812

Please sign in to comment.