Skip to content

Commit

Permalink
Added support to write metadata files in parquet (#5105)
Browse files Browse the repository at this point in the history
Also,
- Added support to do partitioned parquet writing.
- Offset index information will be read from parquet files on demand and not while reading column chunks.
  • Loading branch information
malhotrashivam authored Apr 2, 2024
1 parent 765934b commit a43948f
Show file tree
Hide file tree
Showing 41 changed files with 2,564 additions and 344 deletions.
1 change: 1 addition & 0 deletions BenchmarkSupport/BenchmarkSupport.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(':Base')
implementation project(':engine-table')
implementation project(':extensions-parquet-table')
implementation project(':extensions-parquet-base')
implementation project(':Configuration')
implementation 'org.openjdk.jmh:jmh-core:1.20'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.engine.table.impl.util.TableBuilder;
import io.deephaven.benchmarking.BenchmarkTools;
import org.openjdk.jmh.infra.BenchmarkParams;

import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -44,7 +45,7 @@ public void init() {

public void logOutput() throws IOException {
final Path outputPath = BenchmarkTools.dataDir()
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + ParquetTableWriter.PARQUET_FILE_EXTENSION);
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + PARQUET_FILE_EXTENSION);

final Table output = outputBuilder.build();
ParquetTools.writeTable(output, outputPath.toFile(), RESULT_DEF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public final class Channels {

Expand Down
1 change: 1 addition & 0 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ dependencies {
testImplementation project(':BenchmarkSupport')
testImplementation project(':extensions-csv')
testImplementation project(':extensions-parquet-table')
testImplementation project(':extensions-parquet-base')
testImplementation project(':extensions-source-support')
testImplementation project(':Numerics')
testImplementation project(':extensions-suanshu')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations.util;

import io.deephaven.time.DateTimeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

/**
* This class takes a partition value object and formats it to a {@link String}. Useful when generating partitioning
* paths for table. Complementary to {@link PartitionParser}.
*/
public enum PartitionFormatter {
ForString {
@Override
public String formatObject(@NotNull final Object value) {
return (String) value;
}
},
ForBoolean {
@Override
public String formatObject(@NotNull final Object value) {
return ((Boolean) value).toString();
}
},
ForChar {
@Override
public String formatObject(@NotNull final Object value) {
return ((Character) value).toString();
}
},
ForByte {
@Override
public String formatObject(@NotNull final Object value) {
return ((Byte) value).toString();
}
},
ForShort {
@Override
public String formatObject(@NotNull final Object value) {
return ((Short) value).toString();
}
},
ForInt {
@Override
public String formatObject(@NotNull final Object value) {
return ((Integer) value).toString();
}
},
ForLong {
@Override
public String formatObject(@NotNull final Object value) {
return ((Long) value).toString();
}
},
ForFloat {
@Override
public String formatObject(@NotNull final Object value) {
return ((Float) value).toString();
}
},
ForDouble {
@Override
public String formatObject(@NotNull final Object value) {
return ((Double) value).toString();
}
},
ForBigInteger {
@Override
public String formatObject(@NotNull final Object value) {
return ((BigInteger) value).toString();
}
},
ForBigDecimal {
@Override
public String formatObject(@NotNull final Object value) {
return ((BigDecimal) value).toString();
}
},
ForInstant {
@Override
public String formatObject(@NotNull final Object value) {
return ((Instant) value).toString();
}
},
ForLocalDate {
@Override
public String formatObject(@NotNull final Object value) {
return DateTimeUtils.formatDate((LocalDate) value);
}
},
ForLocalTime {
@Override
public String formatObject(@NotNull final Object value) {
return ((LocalTime) value).toString();
}
};

private static final Map<Class<?>, PartitionFormatter> typeMap = new HashMap<>();
static {
typeMap.put(String.class, ForString);
typeMap.put(Boolean.class, ForBoolean);
typeMap.put(boolean.class, ForBoolean);
typeMap.put(Character.class, ForChar);
typeMap.put(char.class, ForChar);
typeMap.put(Byte.class, ForByte);
typeMap.put(byte.class, ForByte);
typeMap.put(Short.class, ForShort);
typeMap.put(short.class, ForShort);
typeMap.put(Integer.class, ForInt);
typeMap.put(int.class, ForInt);
typeMap.put(Long.class, ForLong);
typeMap.put(long.class, ForLong);
typeMap.put(Float.class, ForFloat);
typeMap.put(float.class, ForFloat);
typeMap.put(Double.class, ForDouble);
typeMap.put(double.class, ForDouble);
typeMap.put(BigInteger.class, ForBigInteger);
typeMap.put(BigDecimal.class, ForBigDecimal);
typeMap.put(Instant.class, ForInstant);
typeMap.put(LocalDate.class, ForLocalDate);
typeMap.put(LocalTime.class, ForLocalTime);
}

abstract String formatObject(@NotNull final Object value);

/**
* Takes a partition value object and returns a formatted string. Returns an empty string if the object is null.
*/
public String format(@Nullable final Object value) {
if (value == null) {
return "";
}
return formatObject(value);
}

/**
* Takes a partitioning column type and returns the corresponding formatter.
*/
public static PartitionFormatter getFormatterForType(@NotNull final Class<?> clazz) {
final PartitionFormatter formatter = typeMap.get(clazz);
if (formatter != null) {
return formatter;
} else {
throw new UnsupportedOperationException("Unsupported type: " + clazz.getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout;
import io.deephaven.test.types.OutOfBandTest;
Expand Down Expand Up @@ -75,6 +74,7 @@
import static io.deephaven.util.QueryConstants.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION;

@Category(OutOfBandTest.class)
public class QueryTableAggregationTest {
Expand Down Expand Up @@ -3951,7 +3951,7 @@ private Table makeDiskTable(File directory) throws IOException {
final TableDefaults result = testTable(stringCol("Symbol", syms),
intCol("Value", values));

final File outputFile = new File(directory, "disk_table" + ParquetTableWriter.PARQUET_FILE_EXTENSION);
final File outputFile = new File(directory, "disk_table" + PARQUET_FILE_EXTENSION);

ParquetTools.writeTable(result, outputFile, result.getDefinition());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ public interface ColumnChunkReader {
int getMaxRl();

/**
* @return The offset index for this column chunk, or null if it not found in the metadata.
* @return Whether the column chunk has offset index information set in the metadata or not.
*/
@Nullable
OffsetIndex getOffsetIndex();
boolean hasOffsetIndex();

/**
* @param context The channel context to use for reading the offset index.
* @return Get the offset index for a column chunk.
* @throws UnsupportedOperationException If the column chunk does not have an offset index.
*/
OffsetIndex getOffsetIndex(final SeekableChannelContext context);

/**
* Used to iterate over column page readers for each page with the capability to set channel context to for reading
Expand Down Expand Up @@ -69,9 +75,9 @@ interface ColumnPageDirectAccessor {
}

/**
* @return An accessor for individual parquet pages.
* @return An accessor for individual parquet pages which uses the provided offset index.
*/
ColumnPageDirectAccessor getPageAccessor();
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex);

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
import io.deephaven.util.datastructures.LazyCachingFunction;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -41,18 +42,13 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {

private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory nullMaterializerFactory;

private URI uri;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
*/
Expand All @@ -63,11 +59,9 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
MessageType type, List<Type> fieldTypes, final long numRows, final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootURI = rootURI;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand All @@ -76,12 +70,22 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
} else {
decompressor = CompressorAdapter.PASSTHRU;
}
this.offsetIndex = offsetIndex;
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new LazyCachingFunction<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
}
// Construct the reader object but don't read the offset index yet
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())
? new OffsetIndexReaderImpl(channelsProvider, columnChunk, columnChunkURI)
: OffsetIndexReader.NULL;
}

@Override
Expand All @@ -99,8 +103,16 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public OffsetIndex getOffsetIndex() {
return offsetIndex;
@Override
public boolean hasOffsetIndex() {
return columnChunk.isSetOffset_index_offset();
}

@Override
public OffsetIndex getOffsetIndex(final SeekableChannelContext context) {
// Reads and caches the offset index if it hasn't been read yet. Throws an exception if the offset index cannot
// be read from this source
return offsetIndexReader.getOffsetIndex(context);
}

@Override
Expand All @@ -109,23 +121,15 @@ public ColumnPageReaderIterator getPageIterator() {
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl();
return new ColumnPageDirectAccessorImpl(offsetIndex);
}

private URI getURI() {
if (uri != null) {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
}
return columnChunkURI;
}

@Override
Expand Down Expand Up @@ -308,7 +312,11 @@ private static int getNumValues(PageHeader pageHeader) {

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}
private final OffsetIndex offsetIndex;

ColumnPageDirectAccessorImpl(final OffsetIndex offsetIndex) {
this.offsetIndex = offsetIndex;
}

@Override
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
Expand Down
Loading

0 comments on commit a43948f

Please sign in to comment.