Skip to content

Commit

Permalink
Allow specifying column definitions for a ParquetRowWriter (#284)
Browse files Browse the repository at this point in the history
* Allow providing an array of Column instances to CreateRowWriter

This gives users more control over the logical type mappings for columns,
eg. to specify IsAdjustedToUtc for Timestamps, and works when using a tuple
type where we can't add attributes to fields.

* Move column mapping out of write delegate cache

This prevents getting the incorrect columns when re-using the write
delegate and columns have been manually specified.

Introduces a new MappedField struct to replace the previous tuple to
make this more manageable.

* Comment fixes
  • Loading branch information
adamreeve authored Jun 14, 2022
1 parent 8a2c65f commit b5ecca4
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 45 deletions.
104 changes: 104 additions & 0 deletions csharp.test/TestRowOrientedParquetFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,110 @@ public static void TestWriterPropertiesArgument()
Assert.AreEqual("This unit test", writer.WriterProperties.CreatedBy);
}

/// <summary>
/// Test specifying columns using Column instances rather than just column names.
/// </summary>
[Test]
public static void TestColumnsSpecifiedForTuple()
{
var columns = new Column[]
{
new Column<int>("a"),
new Column<DateTime>("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)),
// Include a decimal column to check we handle not having a ParquetDecimalScale attribute
new Column<decimal>("c", LogicalType.Decimal(precision: 29, scale: 4)),
};
var rows = new[]
{
(42, new DateTime(2022, 6, 14, 9, 58, 0), decimal.One),
(24, new DateTime(2022, 6, 14, 9, 58, 1), decimal.One / 2),
};
using var buffer = new ResizableBuffer();
using (var outputStream = new BufferOutputStream(buffer))
{
using var writer = ParquetFile.CreateRowWriter<(int, DateTime, decimal)>(outputStream, columns);
writer.WriteRows(rows);
writer.Close();
}

using var inputStream = new BufferReader(buffer);
using var reader = ParquetFile.CreateRowReader<(int, DateTime, decimal)>(inputStream);
var rowsRead = reader.ReadRows(0);

Assert.That(rowsRead, Is.EqualTo(rows));
Assert.That(reader.FileMetaData.Schema.Column(1).LogicalType, Is.EqualTo(
LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)));
Assert.That(reader.FileMetaData.Schema.Column(2).LogicalType, Is.EqualTo(
LogicalType.Decimal(precision: 29, scale: 4)));
}

[Test]
public static void TestColumnsSpecifiedForStruct()
{
var columns = new Column[]
{
new Column<int>("a"),
new Column<float>("b"),
new Column<DateTime>("c", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)),
// Note: Scale here takes precedence over the scale from the ParquetDecimalScale attribute
new Column<decimal>("d", LogicalType.Decimal(precision: 29, scale: 4)),
};
var rows = new[]
{
new Row1 {A = 1, B = 1.5f, C = new DateTime(2022, 6, 14, 10, 7, 1), D = decimal.One / 2},
new Row1 {A = 2, B = 2.5f, C = new DateTime(2022, 6, 14, 10, 7, 2), D = decimal.One / 4},
};
using var buffer = new ResizableBuffer();
using (var outputStream = new BufferOutputStream(buffer))
{
using var writer = ParquetFile.CreateRowWriter<Row1>(outputStream, columns);
writer.WriteRows(rows);
writer.Close();
}

using var inputStream = new BufferReader(buffer);
using var reader = ParquetFile.CreateRowReader<Row1>(inputStream);
var rowsRead = reader.ReadRows(0);

Assert.That(rowsRead, Is.EqualTo(rows));
Assert.That(reader.FileMetaData.Schema.Column(2).LogicalType, Is.EqualTo(
LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)));
Assert.That(reader.FileMetaData.Schema.Column(3).LogicalType, Is.EqualTo(
LogicalType.Decimal(precision: 29, scale: 4)));
}

[Test]
public static void TestColumnLengthMismatch()
{
var columns = new Column[]
{
new Column<int>("a"),
new Column<DateTime>("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)),
};
using var buffer = new ResizableBuffer();
using var outputStream = new BufferOutputStream(buffer);
var exception = Assert.Throws<ArgumentException>(() =>
ParquetFile.CreateRowWriter<(int, DateTime, decimal)>(outputStream, columns));
Assert.That(exception!.Message, Does.Contain(
"The number of columns specified (2) does not mach the number of public fields and properties (3)"));
}

[Test]
public static void TestColumnTypeMismatch()
{
var columns = new Column[]
{
new Column<int>("a"),
new Column<DateTime>("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)),
};
using var buffer = new ResizableBuffer();
using var outputStream = new BufferOutputStream(buffer);
var exception = Assert.Throws<ArgumentException>(() =>
ParquetFile.CreateRowWriter<(int?, DateTime)>(outputStream, columns));
Assert.That(exception!.Message, Does.Contain(
"Expected a system type of 'System.Nullable`1[System.Int32]' for column 0 (a) but received 'System.Int32'"));
}

private static void TestRoundtrip<TTuple>(TTuple[] rows)
{
RoundTripAndCompare(rows, rows, columnNames: null);
Expand Down
24 changes: 24 additions & 0 deletions csharp/RowOriented/MappedField.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Reflection;

namespace ParquetSharp.RowOriented
{
/// <summary>
/// Represents a field or property of a type that is to be mapped to a Parquet column
/// </summary>
internal struct MappedField
{
public readonly string Name;
public readonly string? MappedColumn;
public readonly Type Type;
public readonly MemberInfo Info;

public MappedField(string name, string? mappedColumn, Type type, MemberInfo info)
{
Name = name;
MappedColumn = mappedColumn;
Type = type;
Info = info;
}
}
}
Loading

0 comments on commit b5ecca4

Please sign in to comment.