diff --git a/csharp.test/TestRowOrientedParquetFile.cs b/csharp.test/TestRowOrientedParquetFile.cs index f697c8ba..fbbccc90 100644 --- a/csharp.test/TestRowOrientedParquetFile.cs +++ b/csharp.test/TestRowOrientedParquetFile.cs @@ -157,6 +157,110 @@ public static void TestWriterPropertiesArgument() Assert.AreEqual("This unit test", writer.WriterProperties.CreatedBy); } + /// + /// Test specifying columns using Column instances rather than just column names. + /// + [Test] + public static void TestColumnsSpecifiedForTuple() + { + var columns = new Column[] + { + new Column("a"), + new Column("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)), + // Include a decimal column to check we handle not having a ParquetDecimalScale attribute + new Column("c", LogicalType.Decimal(precision: 29, scale: 4)), + }; + var rows = new[] + { + (42, new DateTime(2022, 6, 14, 9, 58, 0), decimal.One), + (24, new DateTime(2022, 6, 14, 9, 58, 1), decimal.One / 2), + }; + using var buffer = new ResizableBuffer(); + using (var outputStream = new BufferOutputStream(buffer)) + { + using var writer = ParquetFile.CreateRowWriter<(int, DateTime, decimal)>(outputStream, columns); + writer.WriteRows(rows); + writer.Close(); + } + + using var inputStream = new BufferReader(buffer); + using var reader = ParquetFile.CreateRowReader<(int, DateTime, decimal)>(inputStream); + var rowsRead = reader.ReadRows(0); + + Assert.That(rowsRead, Is.EqualTo(rows)); + Assert.That(reader.FileMetaData.Schema.Column(1).LogicalType, Is.EqualTo( + LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros))); + Assert.That(reader.FileMetaData.Schema.Column(2).LogicalType, Is.EqualTo( + LogicalType.Decimal(precision: 29, scale: 4))); + } + + [Test] + public static void TestColumnsSpecifiedForStruct() + { + var columns = new Column[] + { + new Column("a"), + new Column("b"), + new Column("c", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)), + // Note: Scale here takes precedence over the scale from the ParquetDecimalScale attribute + new Column("d", LogicalType.Decimal(precision: 29, scale: 4)), + }; + var rows = new[] + { + new Row1 {A = 1, B = 1.5f, C = new DateTime(2022, 6, 14, 10, 7, 1), D = decimal.One / 2}, + new Row1 {A = 2, B = 2.5f, C = new DateTime(2022, 6, 14, 10, 7, 2), D = decimal.One / 4}, + }; + using var buffer = new ResizableBuffer(); + using (var outputStream = new BufferOutputStream(buffer)) + { + using var writer = ParquetFile.CreateRowWriter(outputStream, columns); + writer.WriteRows(rows); + writer.Close(); + } + + using var inputStream = new BufferReader(buffer); + using var reader = ParquetFile.CreateRowReader(inputStream); + var rowsRead = reader.ReadRows(0); + + Assert.That(rowsRead, Is.EqualTo(rows)); + Assert.That(reader.FileMetaData.Schema.Column(2).LogicalType, Is.EqualTo( + LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros))); + Assert.That(reader.FileMetaData.Schema.Column(3).LogicalType, Is.EqualTo( + LogicalType.Decimal(precision: 29, scale: 4))); + } + + [Test] + public static void TestColumnLengthMismatch() + { + var columns = new Column[] + { + new Column("a"), + new Column("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)), + }; + using var buffer = new ResizableBuffer(); + using var outputStream = new BufferOutputStream(buffer); + var exception = Assert.Throws(() => + ParquetFile.CreateRowWriter<(int, DateTime, decimal)>(outputStream, columns)); + Assert.That(exception!.Message, Does.Contain( + "The number of columns specified (2) does not mach the number of public fields and properties (3)")); + } + + [Test] + public static void TestColumnTypeMismatch() + { + var columns = new Column[] + { + new Column("a"), + new Column("b", LogicalType.Timestamp(isAdjustedToUtc: false, TimeUnit.Micros)), + }; + using var buffer = new ResizableBuffer(); + using var outputStream = new BufferOutputStream(buffer); + var exception = Assert.Throws(() => + ParquetFile.CreateRowWriter<(int?, DateTime)>(outputStream, columns)); + Assert.That(exception!.Message, Does.Contain( + "Expected a system type of 'System.Nullable`1[System.Int32]' for column 0 (a) but received 'System.Int32'")); + } + private static void TestRoundtrip(TTuple[] rows) { RoundTripAndCompare(rows, rows, columnNames: null); diff --git a/csharp/RowOriented/MappedField.cs b/csharp/RowOriented/MappedField.cs new file mode 100644 index 00000000..d095c244 --- /dev/null +++ b/csharp/RowOriented/MappedField.cs @@ -0,0 +1,24 @@ +using System; +using System.Reflection; + +namespace ParquetSharp.RowOriented +{ + /// + /// Represents a field or property of a type that is to be mapped to a Parquet column + /// + internal struct MappedField + { + public readonly string Name; + public readonly string? MappedColumn; + public readonly Type Type; + public readonly MemberInfo Info; + + public MappedField(string name, string? mappedColumn, Type type, MemberInfo info) + { + Name = name; + MappedColumn = mappedColumn; + Type = type; + Info = info; + } + } +} diff --git a/csharp/RowOriented/ParquetFile.cs b/csharp/RowOriented/ParquetFile.cs index b68431ed..9e06dd96 100644 --- a/csharp/RowOriented/ParquetFile.cs +++ b/csharp/RowOriented/ParquetFile.cs @@ -97,19 +97,78 @@ public static ParquetRowWriter CreateRowWriter( return new ParquetRowWriter(outputStream, columns, writerProperties, keyValueMetadata, writeDelegate); } - private static ParquetRowReader.ReadAction GetOrCreateReadDelegate((string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + /// + /// Create a row-oriented writer to a file path using the specified column definitions. + /// Note that any MapToColumn or ParquetDecimalScale attributes will be overridden by the column definitions. + /// + public static ParquetRowWriter CreateRowWriter( + string path, + Column[] columns, + Compression compression = Compression.Snappy, + IReadOnlyDictionary? keyValueMetadata = null) + { + var (columnsToUse, writeDelegate) = GetOrCreateWriteDelegate(columns); + return new ParquetRowWriter(path, columnsToUse, compression, keyValueMetadata, writeDelegate); + } + + /// + /// Create a row-oriented writer to a file path using the specified writerProperties and column definitions. + /// Note that any MapToColumn or ParquetDecimalScale attributes will be overridden by the column definitions. + /// + public static ParquetRowWriter CreateRowWriter( + string path, + WriterProperties writerProperties, + Column[] columns, + IReadOnlyDictionary? keyValueMetadata = null) + { + var (columnsToUse, writeDelegate) = GetOrCreateWriteDelegate(columns); + return new ParquetRowWriter(path, columnsToUse, writerProperties, keyValueMetadata, writeDelegate); + } + + /// + /// Create a row-oriented writer to an output stream using the specified column definitions. + /// Note that any MapToColumn or ParquetDecimalScale attributes will be overridden by the column definitions. + /// + public static ParquetRowWriter CreateRowWriter( + OutputStream outputStream, + Column[] columns, + Compression compression = Compression.Snappy, + IReadOnlyDictionary? keyValueMetadata = null) + { + var (columnsToUse, writeDelegate) = GetOrCreateWriteDelegate(columns); + return new ParquetRowWriter(outputStream, columnsToUse, compression, keyValueMetadata, writeDelegate); + } + + /// + /// Create a row-oriented writer to an output stream using the specified writerProperties and column definitions. + /// Note that any MapToColumn or ParquetDecimalScale attributes will be overridden by the column definitions. + /// + public static ParquetRowWriter CreateRowWriter( + OutputStream outputStream, + WriterProperties writerProperties, + Column[] columns, + IReadOnlyDictionary? keyValueMetadata = null) + { + var (columnsToUse, writeDelegate) = GetOrCreateWriteDelegate(columns); + return new ParquetRowWriter(outputStream, columnsToUse, writerProperties, keyValueMetadata, writeDelegate); + } + + private static ParquetRowReader.ReadAction GetOrCreateReadDelegate(MappedField[] fields) { return (ParquetRowReader.ReadAction) ReadDelegatesCache.GetOrAdd(typeof(TTuple), k => CreateReadDelegate(fields)); } private static (Column[] columns, ParquetRowWriter.WriteAction writeDelegate) GetOrCreateWriteDelegate(string[]? columnNames) { - var (columns, writeDelegate) = WriteDelegates.GetOrAdd(typeof(TTuple), k => CreateWriteDelegate()); + var (fields, writeDelegate) = WriteDelegates.GetOrAdd(typeof(TTuple), k => CreateWriteDelegate()); + var columns = fields.Select(GetColumn).ToArray(); if (columnNames != null) { if (columnNames.Length != columns.Length) { - throw new ArgumentException("the length of column names does not mach the number of public fields and properties", nameof(columnNames)); + throw new ArgumentException( + $"The length of column names ({columnNames.Length}) does not mach the number of " + + $"public fields and properties ({columns.Length})", nameof(columnNames)); } columns = columns.Select((c, i) => new Column(c.LogicalSystemType, columnNames[i], c.LogicalTypeOverride, c.Length)).ToArray(); @@ -118,10 +177,31 @@ private static (Column[] columns, ParquetRowWriter.WriteAction writeDele return (columns, (ParquetRowWriter.WriteAction) writeDelegate); } + private static (Column[] columns, ParquetRowWriter.WriteAction writeDelegate) GetOrCreateWriteDelegate(Column[] columns) + { + var (fields, writeDelegate) = WriteDelegates.GetOrAdd(typeof(TTuple), k => CreateWriteDelegate()); + if (columns.Length != fields.Length) + { + throw new ArgumentException( + $"The number of columns specified ({columns.Length}) does not mach the number of public " + + $"fields and properties ({fields.Length})", nameof(columns)); + } + for (var i = 0; i < columns.Length; ++i) + { + if (columns[i].LogicalSystemType != fields[i].Type) + { + throw new ArgumentException( + $"Expected a system type of '{fields[i].Type}' for column {i} ({columns[i].Name}) " + + $"but received '{columns[i].LogicalSystemType}'", nameof(columns)); + } + } + return (columns, (ParquetRowWriter.WriteAction) writeDelegate); + } + /// /// Returns a delegate to read rows from individual Parquet columns. /// - private static ParquetRowReader.ReadAction CreateReadDelegate((string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + private static ParquetRowReader.ReadAction CreateReadDelegate(MappedField[] fields) { // Parameters var reader = Expression.Parameter(typeof(ParquetRowReader), "reader"); @@ -129,14 +209,14 @@ private static ParquetRowReader.ReadAction CreateReadDelegate((s var length = Expression.Parameter(typeof(int), "length"); // Use constructor or the property setters. - var ctor = typeof(TTuple).GetConstructor(BindingFlags.Public | BindingFlags.Instance, null, fields.Select(f => f.type).ToArray(), null); + var ctor = typeof(TTuple).GetConstructor(BindingFlags.Public | BindingFlags.Instance, null, fields.Select(f => f.Type).ToArray(), null); // Buffers. - var buffers = fields.Select(f => Expression.Variable(f.type.MakeArrayType(), $"buffer_{f.name}")).ToArray(); - var bufferAssigns = fields.Select((f, i) => (Expression) Expression.Assign(buffers[i], Expression.NewArrayBounds(f.type, length))).ToArray(); + var buffers = fields.Select(f => Expression.Variable(f.Type.MakeArrayType(), $"buffer_{f.Name}")).ToArray(); + var bufferAssigns = fields.Select((f, i) => (Expression) Expression.Assign(buffers[i], Expression.NewArrayBounds(f.Type, length))).ToArray(); // Read the columns from Parquet and populate the buffers. - var reads = buffers.Select((buffer, i) => Expression.Call(reader, GetReadMethod(fields[i].type), Expression.Constant(i), buffer, length)).ToArray(); + var reads = buffers.Select((buffer, i) => Expression.Call(reader, GetReadMethod(fields[i].Type), Expression.Constant(i), buffer, length)).ToArray(); // Loop over the tuples, constructing them from the column buffers. var index = Expression.Variable(typeof(int), "index"); @@ -144,7 +224,7 @@ private static ParquetRowReader.ReadAction CreateReadDelegate((s Expression.Assign( Expression.ArrayAccess(tuples, index), ctor == null - ? Expression.MemberInit(Expression.New(typeof(TTuple)), fields.Select((f, i) => Expression.Bind(f.info, Expression.ArrayAccess(buffers[i], index)))) + ? Expression.MemberInit(Expression.New(typeof(TTuple)), fields.Select((f, i) => Expression.Bind(f.Info, Expression.ArrayAccess(buffers[i], index)))) : (Expression) Expression.New(ctor, fields.Select((f, i) => (Expression) Expression.ArrayAccess(buffers[i], index))) ) ); @@ -156,12 +236,11 @@ private static ParquetRowReader.ReadAction CreateReadDelegate((s } /// - /// Return a delegate to write rows to individual Parquet columns, as well the column types and names. + /// Return a delegate to write rows to individual Parquet columns, as well the fields to be mapped to columns. /// - private static (Column[] columns, ParquetRowWriter.WriteAction writeDelegate) CreateWriteDelegate() + private static (MappedField[] fields, ParquetRowWriter.WriteAction writeDelegate) CreateWriteDelegate() { var fields = GetFieldsAndProperties(typeof(TTuple)); - var columns = fields.Select(GetColumn).ToArray(); // Parameters var writer = Expression.Parameter(typeof(ParquetRowWriter), "writer"); @@ -171,9 +250,9 @@ private static (Column[] columns, ParquetRowWriter.WriteAction writeDele var columnBodies = fields.Select(f => { // Column buffer - var bufferType = f.type.MakeArrayType(); - var buffer = Expression.Variable(bufferType, $"buffer_{f.name}"); - var bufferAssign = Expression.Assign(buffer, Expression.NewArrayBounds(f.type, length)); + var bufferType = f.Type.MakeArrayType(); + var buffer = Expression.Variable(bufferType, $"buffer_{f.Name}"); + var bufferAssign = Expression.Assign(buffer, Expression.NewArrayBounds(f.Type, length)); var bufferReset = Expression.Assign(buffer, Expression.Constant(null, bufferType)); // Loop over the tuples and fill the current column buffer. @@ -181,7 +260,7 @@ private static (Column[] columns, ParquetRowWriter.WriteAction writeDele var loop = For(index, Expression.Constant(0), Expression.NotEqual(index, length), Expression.PreIncrementAssign(index), Expression.Assign( Expression.ArrayAccess(buffer, index), - Expression.PropertyOrField(Expression.ArrayAccess(tuples, index), f.name) + Expression.PropertyOrField(Expression.ArrayAccess(tuples, index), f.Name) ) ); @@ -199,7 +278,7 @@ private static (Column[] columns, ParquetRowWriter.WriteAction writeDele var body = Expression.Block(columnBodies); var lambda = Expression.Lambda.WriteAction>(body, writer, tuples, length); OnWriteExpressionCreated?.Invoke(lambda); - return (columns, lambda.Compile()); + return (fields, lambda.Compile()); } private static MethodInfo GetReadMethod(Type type) @@ -245,9 +324,9 @@ private static Expression For( ); } - private static (string name, string? mappedColumn, Type type, MemberInfo info)[] GetFieldsAndProperties(Type type) + private static MappedField[] GetFieldsAndProperties(Type type) { - var list = new List<(string name, string? mappedColumn, Type type, MemberInfo info)>(); + var list = new List(); var flags = BindingFlags.Public | BindingFlags.Instance; if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(ValueTuple<,,,,,,,>)) @@ -258,13 +337,13 @@ private static (string name, string? mappedColumn, Type type, MemberInfo info)[] foreach (var field in type.GetFields(flags)) { var mappedColumn = field.GetCustomAttribute()?.ColumnName; - list.Add((field.Name, mappedColumn, field.FieldType, field)); + list.Add(new MappedField(field.Name, mappedColumn, field.FieldType, field)); } foreach (var property in type.GetProperties(flags)) { var mappedColumn = property.GetCustomAttribute()?.ColumnName; - list.Add((property.Name, mappedColumn, property.PropertyType, property)); + list.Add(new MappedField(property.Name, mappedColumn, property.PropertyType, property)); } // The order in which fields are processed is important given that when a tuple type is used in @@ -283,33 +362,33 @@ private static (string name, string? mappedColumn, Type type, MemberInfo info)[] // Note that most of the time GetFields() and GetProperties() _do_ return in declaration order and the times when they don't // are determined at runtime and not by the type. As a resut it is pretty much impossible to cover this with a unit test. Hence this // rather long comment aimed at avoiding accidental removal! - return list.OrderBy(x => x.info.MetadataToken).ToArray(); + return list.OrderBy(x => x.Info.MetadataToken).ToArray(); } - private static Column GetColumn((string name, string? mappedColumn, Type type, MemberInfo info) field) + private static Column GetColumn(MappedField field) { - var isDecimal = field.type == typeof(decimal) || field.type == typeof(decimal?); - var decimalScale = field.info.GetCustomAttributes(typeof(ParquetDecimalScaleAttribute)) + var isDecimal = field.Type == typeof(decimal) || field.Type == typeof(decimal?); + var decimalScale = field.Info.GetCustomAttributes(typeof(ParquetDecimalScaleAttribute)) .Cast() .SingleOrDefault(); if (!isDecimal && decimalScale != null) { - throw new ArgumentException($"field '{field.name}' has a {nameof(ParquetDecimalScaleAttribute)} despite not being a decimal type"); + throw new ArgumentException($"field '{field.Name}' has a {nameof(ParquetDecimalScaleAttribute)} despite not being a decimal type"); } if (isDecimal && decimalScale == null) { - throw new ArgumentException($"field '{field.name}' has no {nameof(ParquetDecimalScaleAttribute)} despite being a decimal type"); + throw new ArgumentException($"field '{field.Name}' has no {nameof(ParquetDecimalScaleAttribute)} despite being a decimal type"); } - return new Column(field.type, field.mappedColumn ?? field.name, isDecimal ? LogicalType.Decimal(29, decimalScale!.Scale) : null); + return new Column(field.Type, field.MappedColumn ?? field.Name, isDecimal ? LogicalType.Decimal(29, decimalScale!.Scale) : null); } private static readonly ConcurrentDictionary ReadDelegatesCache = new ConcurrentDictionary(); - private static readonly ConcurrentDictionary WriteDelegates = - new ConcurrentDictionary(); + private static readonly ConcurrentDictionary WriteDelegates = + new ConcurrentDictionary(); } } diff --git a/csharp/RowOriented/ParquetRowReader.cs b/csharp/RowOriented/ParquetRowReader.cs index cab4616a..f0b5cdfd 100644 --- a/csharp/RowOriented/ParquetRowReader.cs +++ b/csharp/RowOriented/ParquetRowReader.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Reflection; using System.Linq; using ParquetSharp.IO; @@ -14,31 +13,31 @@ public sealed class ParquetRowReader : IDisposable { internal delegate void ReadAction(ParquetRowReader parquetRowReader, TTuple[] rows, int length); - internal ParquetRowReader(string path, ReadAction readAction, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + internal ParquetRowReader(string path, ReadAction readAction, MappedField[] fields) : this(new ParquetFileReader(path), readAction, fields) { } - internal ParquetRowReader(string path, ReaderProperties readerProperties, ReadAction readAction, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + internal ParquetRowReader(string path, ReaderProperties readerProperties, ReadAction readAction, MappedField[] fields) : this(new ParquetFileReader(path, readerProperties), readAction, fields) { } - internal ParquetRowReader(RandomAccessFile randomAccessFile, ReadAction readAction, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + internal ParquetRowReader(RandomAccessFile randomAccessFile, ReadAction readAction, MappedField[] fields) : this(new ParquetFileReader(randomAccessFile), readAction, fields) { } - internal ParquetRowReader(RandomAccessFile randomAccessFile, ReaderProperties readerProperties, ReadAction readAction, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + internal ParquetRowReader(RandomAccessFile randomAccessFile, ReaderProperties readerProperties, ReadAction readAction, MappedField[] fields) : this(new ParquetFileReader(randomAccessFile, readerProperties), readAction, fields) { } - internal ParquetRowReader(ParquetFileReader parquetFileReader, ReadAction readAction, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + internal ParquetRowReader(ParquetFileReader parquetFileReader, ReadAction readAction, MappedField[] fields) { _parquetFileReader = parquetFileReader; _readAction = readAction; - _columnMapping = HasExplicitColumndMapping(fields) ? new ExplicitColumnMapping(this, fields) : null; + _columnMapping = HasExplicitColumnMapping(fields) ? new ExplicitColumnMapping(this, fields) : null; } public void Dispose() @@ -72,10 +71,10 @@ internal void ReadColumn(int column, TValue[] values, int length) } } - private static bool HasExplicitColumndMapping((string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + private static bool HasExplicitColumnMapping(MappedField[] fields) { - var noneMapped = Array.TrueForAll(fields, f => f.mappedColumn == null); - var allMapped = Array.TrueForAll(fields, f => f.mappedColumn != null); + var noneMapped = Array.TrueForAll(fields, f => f.MappedColumn == null); + var allMapped = Array.TrueForAll(fields, f => f.MappedColumn != null); if (!allMapped && !noneMapped) { @@ -90,9 +89,9 @@ private static bool HasExplicitColumndMapping((string name, string? mappedColumn /// private sealed class ExplicitColumnMapping { - public ExplicitColumnMapping(ParquetRowReader parquetRowReader, (string name, string? mappedColumn, Type type, MemberInfo info)[] fields) + public ExplicitColumnMapping(ParquetRowReader parquetRowReader, MappedField[] fields) { - var allUnique = fields.GroupBy(x => x.mappedColumn).All(g => g.Count() == 1); + var allUnique = fields.GroupBy(x => x.MappedColumn).All(g => g.Count() == 1); if (!allUnique) { throw new ArgumentException("when using MapToColumnAttribute, each field must map to a unique column"); @@ -108,13 +107,13 @@ public ExplicitColumnMapping(ParquetRowReader parquetRowReader, (string for (var fieldIndex = 0; fieldIndex < fields.Length; ++fieldIndex) { - var mappedColumn = fields[fieldIndex].mappedColumn ?? throw new InvalidOperationException("mapped column name is null"); + var mappedColumn = fields[fieldIndex].MappedColumn ?? throw new InvalidOperationException("mapped column name is null"); if (!fileColumns.TryGetValue(mappedColumn, out _)) { throw new ArgumentException( - $"{typeof(TTuple)} maps field '{fields[fieldIndex].name}' to parquet column " + - $"'{fields[fieldIndex].mappedColumn}' but the target column does not exist in the input parquet file." + $"{typeof(TTuple)} maps field '{fields[fieldIndex].Name}' to parquet column " + + $"'{fields[fieldIndex].MappedColumn}' but the target column does not exist in the input parquet file." ); }