Skip to content

Commit

Permalink
Allow getting a ParquetFileReader and the SchemaManifest from an Arro…
Browse files Browse the repository at this point in the history
…w.FileReader (#430)
  • Loading branch information
adamreeve authored Mar 21, 2024
1 parent 2879d32 commit c98ee2b
Show file tree
Hide file tree
Showing 13 changed files with 548 additions and 8 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ add_library(ParquetSharpNative SHARED
arrow/ArrowWriterPropertiesBuilder.cpp
arrow/FileReader.cpp
arrow/FileWriter.cpp
arrow/SchemaField.cpp
arrow/SchemaManifest.cpp
encryption/CryptoFactory.cpp
encryption/DecryptionConfiguration.cpp
encryption/EncryptionConfiguration.cpp
Expand Down
15 changes: 15 additions & 0 deletions cpp/arrow/FileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"
Expand Down Expand Up @@ -104,6 +105,20 @@ extern "C"
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_ParquetReader(
FileReader* reader,
parquet::ParquetFileReader** parquet_reader)
{
TRYCATCH(*parquet_reader = reader->parquet_reader();)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_Manifest(
FileReader* reader,
const SchemaManifest** manifest)
{
TRYCATCH(*manifest = &(reader->manifest());)
}

PARQUETSHARP_EXPORT void FileReader_Free(FileReader* reader)
{
delete reader;
Expand Down
42 changes: 42 additions & 0 deletions cpp/arrow/SchemaField.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <arrow/c/abi.h>
#include <arrow/c/bridge.h>
#include <parquet/arrow/schema.h>
#include <parquet/exception.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet::arrow;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_ChildrenLength(const SchemaField* field, int32_t* length)
{
TRYCATCH(*length = static_cast<int32_t>(field->children.size());)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_Child(const SchemaField* field, int32_t index, const SchemaField** child)
{
TRYCATCH(
if (index >= static_cast<int32_t>(field->children.size()))
{
throw std::out_of_range("Child field index out of range");
}
*child = &(field->children[index]);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_ColumnIndex(const SchemaField* field, int32_t* column_index)
{
TRYCATCH(
*column_index = field->column_index;
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_Field(const SchemaField* field, struct ArrowSchema* arrow_field)
{
TRYCATCH(
PARQUET_THROW_NOT_OK(arrow::ExportField(*(field->field), arrow_field));
)
}
}
38 changes: 38 additions & 0 deletions cpp/arrow/SchemaManifest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include <parquet/arrow/schema.h>
#include <parquet/exception.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet::arrow;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_SchemaFieldsLength(const SchemaManifest* manifest, int32_t* length)
{
TRYCATCH(*length = static_cast<int32_t>(manifest->schema_fields.size());)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_SchemaField(const SchemaManifest* manifest, int32_t index, const SchemaField** field)
{
TRYCATCH(
if (index >= static_cast<int32_t>(manifest->schema_fields.size()))
{
throw std::out_of_range("Field index out of range");
}
*field = &(manifest->schema_fields[index]);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_GetColumnField(const SchemaManifest* manifest, int32_t column_index, const SchemaField** field)
{
TRYCATCH(
PARQUET_THROW_NOT_OK(manifest->GetColumnField(column_index, field));
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_GetParent(const SchemaManifest* manifest, const SchemaField* field, const SchemaField** parent)
{
TRYCATCH(*parent = manifest->GetParent(field);)
}
}
201 changes: 201 additions & 0 deletions csharp.test/Arrow/TestFileReader.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow;
Expand Down Expand Up @@ -195,6 +196,168 @@ public async Task TestReadSelectedColumns()
Assert.That(rowsRead, Is.EqualTo(RowsPerRowGroup * NumRowGroups));
}

[Test]
public void TestAccessUnderlyingReader()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);
using var parquetReader = fileReader.ParquetReader;

// Verify we can access column statistics
for (var rowGroupIdx = 0; rowGroupIdx < NumRowGroups; ++rowGroupIdx)
{
using var rowGroup = parquetReader.RowGroup(rowGroupIdx);
using var colMetadata = rowGroup.MetaData.GetColumnChunkMetaData(1);
using var stats = colMetadata.Statistics as Statistics<int>;
Assert.That(stats, Is.Not.Null);
Assert.That(stats!.HasMinMax);
Assert.That(stats.Min, Is.EqualTo(rowGroupIdx * RowsPerRowGroup));
Assert.That(stats.Max, Is.EqualTo((rowGroupIdx + 1) * RowsPerRowGroup - 1));
}
}

[Test]
public void TestAccessUnderlyingReaderAfterDisposed()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
ParquetFileReader parquetReader;
using (var fileReader = new FileReader(inStream))
{
parquetReader = fileReader.ParquetReader;
}

using (parquetReader)
{
var exception = Assert.Throws<NullReferenceException>(() => { _ = parquetReader.FileMetaData; });
Assert.That(exception!.Message, Does.Contain("owning parent has been disposed"));
}
}

[Test]
public void TestSchemaManifest()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var fields = manifest.SchemaFields;

Assert.That(fields.Count, Is.EqualTo(2));

var structField = fields[0];
var structArrowField = structField.Field;

Assert.That(structArrowField.Name, Is.EqualTo("test_struct"));
Assert.That(structArrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Struct));

Assert.That(structField.ColumnIndex, Is.EqualTo(-1));
var structFields = structField.Children;
Assert.That(structFields.Count, Is.EqualTo(2));
Assert.That(structFields[0].ColumnIndex, Is.EqualTo(0));
Assert.That(structFields[1].ColumnIndex, Is.EqualTo(1));
var structArrowFieldA = structFields[0].Field;
var structArrowFieldB = structFields[1].Field;
Assert.That(structArrowFieldA.Name, Is.EqualTo("a"));
Assert.That(structArrowFieldA.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));
Assert.That(structArrowFieldB.Name, Is.EqualTo("b"));
Assert.That(structArrowFieldB.DataType.TypeId, Is.EqualTo(ArrowTypeId.Float));

Assert.That(fields[1].Children.Count, Is.EqualTo(0));
Assert.That(fields[1].ColumnIndex, Is.EqualTo(2));
var xArrowField = fields[1].Field;
Assert.That(xArrowField.Name, Is.EqualTo("x"));
Assert.That(xArrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));
}

[Test]
public void TestSchemaManifestGetSingleField()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.SchemaField(1);
Assert.That(field, Is.Not.Null);
var arrowField = field.Field;
Assert.That(arrowField.Name, Is.EqualTo("x"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));

var exception = Assert.Throws<ParquetException>(() => manifest.SchemaField(2));
Assert.That(exception!.Message, Does.Contain("out of range"));
}

[Test]
public void TestSchemaManifestGetColumnField()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.GetColumnField(2);
Assert.That(field, Is.Not.Null);
var arrowField = field.Field;
Assert.That(arrowField.Name, Is.EqualTo("x"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));

var exception = Assert.Throws<ParquetException>(() => manifest.GetColumnField(3));
Assert.That(exception!.Message, Does.Contain("Column index 3"));
}

[Test]
public void TestSchemaManifestGetFieldParent()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.GetColumnField(1);
var parent = manifest.GetParent(field);

Assert.That(parent, Is.Not.Null);
var arrowField = parent!.Field;
Assert.That(arrowField.Name, Is.EqualTo("test_struct"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Struct));

var grandparent = manifest.GetParent(parent);
Assert.That(grandparent, Is.Null);
}

[Test]
public void TestAccessSchemaManifestFieldAfterDisposed()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
SchemaField field;
using (var fileReader = new FileReader(inStream))
{
var manifest = fileReader.SchemaManifest;
field = manifest.SchemaFields[0];
}

var exception = Assert.Throws<NullReferenceException>(() => { _ = field.Field; });
Assert.That(exception!.Message, Does.Contain("owning parent has been disposed"));
}

private static void WriteTestFile(ResizableBuffer buffer)
{
var columns = new Column[]
Expand Down Expand Up @@ -226,6 +389,44 @@ private static void WriteTestFile(ResizableBuffer buffer)
fileWriter.Close();
}

private static void WriteNestedTestFile(ResizableBuffer buffer)
{
var fields = new[]
{
new Field("test_struct", new StructType(
new[]
{
new Field("a", new Int32Type(), false),
new Field("b", new FloatType(), false),
}), true),
new Field("x", new Int32Type(), false),
};
var schema = new Apache.Arrow.Schema(fields, null);

using var outStream = new BufferOutputStream(buffer);
using var writer = new FileWriter(outStream, schema);
for (var rowGroup = 0; rowGroup < NumRowGroups; ++rowGroup)
{
var start = rowGroup * RowsPerRowGroup;
var arrays = new List<IArrowArray>
{
new StructArray(fields[0].DataType, RowsPerRowGroup, new IArrowArray[]
{
new Int32Array.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).ToArray()).Build(),
new FloatArray.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).Select(i => i * 0.1f).ToArray())
.Build(),
}, new ArrowBuffer.BitmapBuilder().AppendRange(true, RowsPerRowGroup).Build()),
new Int32Array.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).ToArray()).Build()
};

var batch = new RecordBatch(schema, arrays, RowsPerRowGroup);

writer.WriteRecordBatch(batch);
}

writer.Close();
}

private const int NumRowGroups = 4;
private const int RowsPerRowGroup = 100;
}
Expand Down
30 changes: 30 additions & 0 deletions csharp/Arrow/FileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ public unsafe IArrowArrayStream GetRecordBatchReader(
return CArrowArrayStreamImporter.ImportArrayStream(&cStream);
}

/// <summary>
/// Get the underlying ParquetFileReader used by this Arrow FileReader
/// </summary>
public ParquetFileReader ParquetReader
{
get
{
var readerPtr = ExceptionInfo.Return<IntPtr>(_handle, FileReader_ParquetReader);
return new ParquetFileReader(new ChildParquetHandle(readerPtr, _handle));
}
}

/// <summary>
/// Get the schema manifest, which describes the relationship between the Arrow schema and Parquet schema
/// </summary>
public SchemaManifest SchemaManifest
{
get
{
var manifestPtr = ExceptionInfo.Return<IntPtr>(_handle, FileReader_Manifest);
return new SchemaManifest(new ChildParquetHandle(manifestPtr, _handle));
}
}

public void Dispose()
{
_handle.Dispose();
Expand Down Expand Up @@ -165,6 +189,12 @@ private static extern IntPtr FileReader_OpenFile(
private static extern unsafe IntPtr FileReader_GetRecordBatchReader(
IntPtr reader, int* rowGroups, int rowGroupsCount, int* columns, int columnsCount, CArrowArrayStream* stream);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileReader_ParquetReader(IntPtr reader, out IntPtr parquetReader);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileReader_Manifest(IntPtr reader, out IntPtr manifest);

[DllImport(ParquetDll.Name)]
private static extern void FileReader_Free(IntPtr reader);

Expand Down
Loading

0 comments on commit c98ee2b

Please sign in to comment.