Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading string array values #282

Merged
merged 5 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions csharp.test/TestLogicalTypeRoundtrip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,86 @@ public static void TestArrayOfEmptyStringArraysRoundtrip()
Assert.AreEqual(expected, allData);
}

[Test]
public static void TestLargeStringArrays()
{
// This test was added after finding that when we buffer ByteArray values without immediately converting
// them, we can later get AccessViolationExceptions thrown due to trying to convert ByteArrays that end
// up pointing to memory that was freed when the internal Arrow library read a new page of data.
// This test didn't reproduce the AccessViolationExceptions but did read garbage data.

const int numArrays = 1_000;
const int arrayLength = 100;
const int dataLength = numArrays * arrayLength;

var chars = "0123456789abcdefghijklmnopqrstuvwxyz".ToArray();
var random = new Random(0);

string GetRandomString() => string.Join(
"", Enumerable.Range(0, random!.Next(50, 101)).Select(_ => chars![random.Next(chars.Length)]));

var stringValues = Enumerable.Range(0, 10)
.Select(_ => GetRandomString())
.ToArray();
var stringData = Enumerable.Range(0, dataLength)
.Select(_ => stringValues[random.Next(0, stringValues.Length)])
.ToArray();

var defLevels = new short[dataLength];
var repLevels = new short[dataLength];
for (var i = 0; i < dataLength; ++i)
{
repLevels[i] = (short) (i % arrayLength == 0 ? 0 : 1);
defLevels[i] = 3;
}

var expected = Enumerable.Range(0, numArrays)
.Select(arrayIdx => stringData.AsSpan(arrayIdx * arrayLength, arrayLength).ToArray())
.ToArray();

using var buffer = new ResizableBuffer();
using (var outStream = new BufferOutputStream(buffer))
{
using var propertiesBuilder = new WriterPropertiesBuilder();
propertiesBuilder.DisableDictionary();
propertiesBuilder.Encoding(Encoding.Plain);
propertiesBuilder.Compression(Compression.Snappy);
propertiesBuilder.DataPagesize(1024);
using var writerProperties = propertiesBuilder.Build();

using var fileWriter = new ParquetFileWriter(outStream, new Column[] {new Column<string[]>("a")},
writerProperties);
using var rowGroupWriter = fileWriter.AppendRowGroup();
using var colWriter = (ColumnWriter<ByteArray>) rowGroupWriter.NextColumn();

// We write values with the low-level column writer rather than a LogicalColumnWriter, as due to
// the way the LogicalColumnWriter interacts with the ColumnWriter, all leaf-level arrays end up in
// the same data page and so data written with a ParquetSharp LogicalColumnWriter doesn't reproduce
// the issue with invalid data being read.
const int batchSize = 64;
for (var offset = 0; offset < dataLength; offset += batchSize)
{
using var byteBuffer = new ByteBuffer(1024);
var thisBatchSize = Math.Min(batchSize, dataLength - offset);
var batchStringValues = stringData.AsSpan(offset, thisBatchSize);
var batchDefLevels = defLevels.AsSpan(offset, thisBatchSize);
var batchRepLevels = repLevels.AsSpan(offset, thisBatchSize);
var batchPhysicalValues = batchStringValues.ToArray().Select(s => LogicalWrite.FromString(s, byteBuffer)).ToArray();
colWriter.WriteBatch(thisBatchSize, batchDefLevels, batchRepLevels, batchPhysicalValues);
}

fileWriter.Close();
}

using var inStream = new BufferReader(buffer);
using var fileReader = new ParquetFileReader(inStream);
using var rowGroup = fileReader.RowGroup(0);
using var columnReader = rowGroup.Column(0).LogicalReader<string[]>();

var values = columnReader.ReadAll(expected.Length);
Assert.That(values, Is.EqualTo(expected));
}

private static ExpectedColumn[] CreateExpectedColumns()
{
return new[]
Expand Down
48 changes: 40 additions & 8 deletions csharp/BufferedReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,28 @@ namespace ParquetSharp
/// <summary>
/// Buffer the reads from the low-level Parquet API when dealing with multi-level structs.
/// </summary>
internal sealed class BufferedReader<TPhysicalvalue> where TPhysicalvalue : unmanaged
internal sealed class BufferedReader<TLogical, TPhysical> where TPhysical : unmanaged
{
public BufferedReader(ColumnReader reader, TPhysicalvalue[] values, short[]? defLevels, short[]? repLevels)
public BufferedReader(
ColumnReader reader,
LogicalRead<TLogical, TPhysical>.Converter converter,
TPhysical[] values,
short[]? defLevels,
short[]? repLevels,
short leafDefinitionLevel,
bool nullableLeafValues)
{
_columnReader = reader;
_converter = converter;
_values = values;
_defLevels = defLevels;
_repLevels = repLevels;
_leafDefinitionLevel = leafDefinitionLevel;
_logicalValues = new TLogical[values.Length];
_nullableLeafValues = nullableLeafValues;
}

public TPhysicalvalue ReadValue()
public TLogical ReadValue()
{
if (_valueIndex >= _numValues)
{
Expand All @@ -25,7 +36,8 @@ public TPhysicalvalue ReadValue()
}
}

return _values[_valueIndex++];
var valueIndex = _nullableLeafValues ? _valueIndex : _valueIndex++;
return _logicalValues[valueIndex];
}

public (short DefLevel, short RepLevel) GetCurrentDefinition()
Expand All @@ -49,11 +61,15 @@ public TPhysicalvalue ReadValue()
public void NextDefinition()
{
_levelIndex++;
if (_nullableLeafValues)
{
_valueIndex++;
}
}

private bool FillBuffer()
{
var columnReader = (ColumnReader<TPhysicalvalue>) _columnReader;
var columnReader = (ColumnReader<TPhysical>) _columnReader;

if (_levelIndex < _numLevels || _valueIndex < _numValues)
{
Expand All @@ -62,18 +78,34 @@ private bool FillBuffer()

if (columnReader.HasNext)
{
_numLevels = columnReader.ReadBatch(_values.Length, _defLevels, _repLevels, _values, out _numValues);
_numLevels = columnReader.ReadBatch(_values.Length, _defLevels, _repLevels, _values, out var numValues);
_valueIndex = 0;
_levelIndex = 0;
// For non-nullable leaf values, converters will ignore definition levels and produce compacted
// values, otherwise definition levels are used and the number of values will match the number of levels.
_numValues = _nullableLeafValues ? _numLevels : numValues;
// It's important that we immediately convert the read values. In the case of ByteArray physical values,
// these are pointers to internal Arrow memory that may be invalidated if we perform any other operation
// on the column reader, for example calling HasNext will trigger a new page load if the Arrow column
// reader is at the end of a data page.
_converter(
_values.AsSpan(0, (int) _numValues),
_defLevels.AsSpan(0, (int) _numLevels),
_logicalValues.AsSpan(0, (int) _numValues),
_leafDefinitionLevel);
}

return _numValues > 0 || _numLevels > 0;
return _numLevels > 0;
}

private readonly ColumnReader _columnReader;
private readonly TPhysicalvalue[] _values;
private readonly LogicalRead<TLogical, TPhysical>.Converter _converter;
private readonly TPhysical[] _values;
private readonly TLogical[] _logicalValues;
private readonly short[]? _defLevels;
private readonly short[]? _repLevels;
private readonly short _leafDefinitionLevel;
private readonly bool _nullableLeafValues;

private long _numValues;
private int _valueIndex;
Expand Down
46 changes: 22 additions & 24 deletions csharp/LogicalColumnReader.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using ParquetSharp.Schema;

namespace ParquetSharp
Expand Down Expand Up @@ -144,9 +145,11 @@ internal LogicalColumnReader(ColumnReader columnReader, int bufferLength)
{
var converterFactory = columnReader.LogicalReadConverterFactory;

_bufferedReader = new BufferedReader<TPhysical>(Source, (TPhysical[]) Buffer, DefLevels, RepLevels);
_directReader = (LogicalRead<TLogical, TPhysical>.DirectReader?) converterFactory.GetDirectReader<TLogical, TPhysical>();
_converter = (LogicalRead<TLogical, TPhysical>.Converter) converterFactory.GetConverter<TLogical, TPhysical>(ColumnDescriptor, columnReader.ColumnChunkMetaData);
var leafDefinitionLevel = (short) SchemaNodesPath!.Count(n => n.Repetition != Repetition.Required);
var nullableLeafValues = ColumnDescriptor.SchemaNode.Repetition == Repetition.Optional;
_bufferedReader = new BufferedReader<TLogical, TPhysical>(Source, _converter, (TPhysical[]) Buffer, DefLevels, RepLevels, leafDefinitionLevel, nullableLeafValues);
_directReader = (LogicalRead<TLogical, TPhysical>.DirectReader?) converterFactory.GetDirectReader<TLogical, TPhysical>();
}

/*
Expand Down Expand Up @@ -186,7 +189,7 @@ public override int ReadBatch(Span<TElement> destination)
// Handle arrays
if (elementType != typeof(byte[]) && elementType.IsArray)
{
var result = (Span<TElement>) (TElement[]) ReadArray(schemaNodes, typeof(TElement), _converter, _bufferedReader, destination.Length, 0, definitionLevel);
var result = (Span<TElement>) (TElement[]) ReadArray(schemaNodes, typeof(TElement), _bufferedReader, destination.Length, 0, definitionLevel);
result.CopyTo(destination);
return result.Length;
}
Expand All @@ -205,8 +208,8 @@ public override int ReadBatch(Span<TElement> destination)
}

private static Array ReadArray(
ReadOnlySpan<Node> schemaNodes, Type elementType, LogicalRead<TLogical, TPhysical>.Converter converter,
BufferedReader<TPhysical> valueReader, int numArrayEntriesToRead, short repetitionLevel, short definitionLevel)
ReadOnlySpan<Node> schemaNodes, Type elementType, BufferedReader<TLogical, TPhysical> valueReader,
int numArrayEntriesToRead, short repetitionLevel, short definitionLevel)
{
// Handle structs
var (definitionLevelDelta, schemaSlice) = StructSkip(schemaNodes);
Expand All @@ -220,7 +223,8 @@ private static Array ReadArray(
if (schemaNodes[0] is GroupNode {LogicalType: ListLogicalType, Repetition: Repetition.Optional} &&
schemaNodes[1] is GroupNode {LogicalType: NoneLogicalType, Repetition: Repetition.Repeated})
{
return ReadArrayIntermediateLevel(schemaNodes, valueReader, elementType, converter, numArrayEntriesToRead, repetitionLevel, definitionLevel);
return ReadArrayIntermediateLevel(
schemaNodes, valueReader, elementType, numArrayEntriesToRead, repetitionLevel, definitionLevel);
}
}

Expand All @@ -229,14 +233,15 @@ private static Array ReadArray(

if (schemaNodes.Length == 1)
{
return ReadArrayLeafLevel(schemaNodes[0], valueReader, converter, repetitionLevel, definitionLevel);
return ReadArrayLeafLevel(schemaNodes[0], valueReader, repetitionLevel, definitionLevel);
}

throw new Exception("ParquetSharp does not understand the schema used");
}

private static Array ReadArrayIntermediateLevel(ReadOnlySpan<Node> schemaNodes, BufferedReader<TPhysical> valueReader, Type elementType,
LogicalRead<TLogical, TPhysical>.Converter converter, int numArrayEntriesToRead, short repetitionLevel, short definitionLevel)
private static Array ReadArrayIntermediateLevel(
ReadOnlySpan<Node> schemaNodes, BufferedReader<TLogical, TPhysical> valueReader, Type elementType,
int numArrayEntriesToRead, short repetitionLevel, short definitionLevel)
{
var acc = new List<Array?>();

Expand All @@ -248,7 +253,7 @@ private static Array ReadArrayIntermediateLevel(ReadOnlySpan<Node> schemaNodes,

if (defn.DefLevel >= definitionLevel + 2)
{
newItem = ReadArray(schemaNodes.Slice(2), elementType.GetElementType(), converter, valueReader, -1, (short) (repetitionLevel + 1), (short) (definitionLevel + 2));
newItem = ReadArray(schemaNodes.Slice(2), elementType.GetElementType(), valueReader, -1, (short) (repetitionLevel + 1), (short) (definitionLevel + 2));
}
else
{
Expand All @@ -270,10 +275,9 @@ private static Array ReadArrayIntermediateLevel(ReadOnlySpan<Node> schemaNodes,
return ListToArray(acc, elementType);
}

private static Array ReadArrayLeafLevel(Node node, BufferedReader<TPhysical> valueReader, LogicalRead<TLogical, TPhysical>.Converter converter, short repetitionLevel, short definitionLevel)
private static Array ReadArrayLeafLevel(Node node, BufferedReader<TLogical, TPhysical> valueReader, short repetitionLevel, short definitionLevel)
{
var defnLevel = new List<short>();
var values = new List<TPhysical>();
var values = new List<TLogical>();
var firstValue = true;
var innerNodeIsOptional = node.Repetition == Repetition.Optional;
definitionLevel += (short) (innerNodeIsOptional ? 1 : 0);
Expand All @@ -287,28 +291,22 @@ private static Array ReadArrayLeafLevel(Node node, BufferedReader<TPhysical> val
break;
}

if (defn.DefLevel == definitionLevel)
if (defn.DefLevel == definitionLevel || innerNodeIsOptional)
{
// Note that when the inner node is optional,
// the converter will handle definition levels and create null values.
values.Add(valueReader.ReadValue());
}
else if (innerNodeIsOptional && (defn.DefLevel != definitionLevel))
{
// Value is null, skip
}
else
{
throw new Exception("Definition levels read from file do not match up with schema.");
}

defnLevel.Add(defn.DefLevel);

valueReader.NextDefinition();
firstValue = false;
}

var dest = new TLogical[defnLevel.Count];
converter(values.ToArray(), defnLevel.ToArray(), dest, definitionLevel);
return dest;
return values.ToArray();
}

private static Array ListToArray(List<Array?> list, Type elementType)
Expand Down Expand Up @@ -379,7 +377,7 @@ private int ReadBatchSimple<TTLogical>(
return rowsRead;
}

private readonly BufferedReader<TPhysical> _bufferedReader;
private readonly BufferedReader<TLogical, TPhysical> _bufferedReader;
private readonly LogicalRead<TLogical, TPhysical>.DirectReader? _directReader;
private readonly LogicalRead<TLogical, TPhysical>.Converter _converter;
}
Expand Down