Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: add support for null vectors #10953

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ac6440a
#10275 - fix NullPointerException
sl255051 May 8, 2024
becf6f7
Change how the unit test asserts the correct exception is thrown
sl255051 May 8, 2024
4e2cb86
Remove test dependency on Apache Spark
sl255051 May 8, 2024
1193d02
Merge branch 'main' into issue-10275
sl255051 May 28, 2024
12bc3de
Add new unit test
sl255051 Jun 11, 2024
d8f3e13
Merge branch 'apache:main' into issue-10275
slessard Jun 11, 2024
bb4e010
Add comments to unit test
sl255051 Jun 12, 2024
6e7a1aa
Merge branch 'issue-10275' of https://github.com/slessard/iceberg int…
sl255051 Jun 12, 2024
28451a5
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
24a9932
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
9bcb2b1
Address code review comments
sl255051 Jun 14, 2024
7a25b52
Merge branch 'apache:main' into issue-10275
slessard Jun 14, 2024
a31bf94
Merge branch 'main' into issue-10275
sl255051 Jul 29, 2024
44a7f91
Merge branch 'main' into issue-10275
sl255051 Aug 5, 2024
c2eaf24
Merge branch 'apache:main' into issue-10275
slessard Aug 9, 2024
e323db7
DRAFT: alternate solution 2: hack in support for NullVector
slessard Aug 10, 2024
061ab02
Merge branch 'apache:main' into issue-10275-alt2
slessard Aug 12, 2024
bf0c905
Issue 10275 - Add rough draft vector support for null columns
slessard Aug 13, 2024
5610dd4
Merge branch 'issue-10275-alt2' into issue-10275-alt3
slessard Aug 13, 2024
a13415d
Merge branch 'main' into issue-10275-alt3
slessard Aug 13, 2024
2eaa63f
Merge branch 'main' into issue-10275-alt3
slessard Aug 16, 2024
62108da
remove obsolete comment; adapt unit test to match new functionality
slessard Aug 16, 2024
7115e93
Merge branch 'apache:main' into issue-10275-alt3
slessard Aug 16, 2024
08bb07c
Address code review feedback
slessard Sep 5, 2024
442b381
Add a NullabilityHolder instance to the NullVector instance
slessard Sep 5, 2024
5e7668e
Merge branch 'apache:main' into issue-10275-alt3
slessard Sep 6, 2024
83913a0
Remove test class GenericArrowVectorAccessorFactoryTest
slessard Sep 9, 2024
e2b428e
Fix compile error; format source code
slessard Sep 9, 2024
7ffa7ed
Address code review comments
slessard Sep 11, 2024
cda0423
Adopt changes suggested by @nastra in code review
slessard Sep 17, 2024
9aec9e5
Update unit test to add a second row to the table being tested
slessard Sep 17, 2024
0c87dc7
Code cleanup
slessard Sep 19, 2024
e5eebd0
Undo adding a second row to the table
slessard Sep 20, 2024
fe60793
Expand calls to checkAllVectorTypes and checkAllVectorValues
slessard Sep 20, 2024
1a3896b
replace hard-coded magic values with descriptively named variables
slessard Sep 20, 2024
5c3b460
Add unit tests for VectorHolder
slessard Sep 24, 2024
a2df95c
Update `isDummy` method to remove one condition that would never be r…
slessard Sep 24, 2024
bbc776d
Fix code style issues
slessard Sep 25, 2024
2bf5b2f
Update VectorHolder unit tests for isDummy method
slessard Sep 26, 2024
1edd680
Convert to fluent assertions
slessard Sep 26, 2024
e1b3931
inline variables that are only used once; remove `this.` prefix
slessard Sep 26, 2024
e574623
Merge branch 'main' into issue-10275-alt3
slessard Sep 26, 2024
c8bcc1c
Update arrow/src/main/java/org/apache/iceberg/arrow/vectorized/Vector…
nastra Sep 27, 2024
da9e514
Only create a NullVector when the constant value is null
slessard Sep 27, 2024
fe83726
Merge remote-tracking branch 'origin/issue-10275-alt3' into issue-102…
slessard Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
// Handle null vector for constant case
columnVectors[i] = new ColumnVector(vectorHolders[i]);
}
return new ColumnarBatch(numRowsToRead, columnVectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand Down Expand Up @@ -177,6 +178,7 @@ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVecto
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlainVectorAccessor(
FieldVector vector, PrimitiveType primitive) {
Preconditions.checkArgument(null != vector, "Invalid field vector: null");
if (vector instanceof BitVector) {
return new BooleanAccessor<>((BitVector) vector);
} else if (vector instanceof IntVector) {
Expand Down Expand Up @@ -220,6 +222,8 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlai
}
return new FixedSizeBinaryAccessor<>(
(FixedSizeBinaryVector) vector, stringFactorySupplier.get());
} else if (vector instanceof NullVector) {
return new NullAccessor<>((NullVector) vector);
slessard marked this conversation as resolved.
Show resolved Hide resolved
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
}
Expand All @@ -244,6 +248,15 @@ public final boolean getBoolean(int rowId) {
}
}

private static class NullAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

NullAccessor(NullVector vector) {
super(vector);
}
}

private static class IntAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullVector;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -140,12 +141,18 @@ public static class ConstantVectorHolder<T> extends VectorHolder {
private final int numRows;

public ConstantVectorHolder(int numRows) {
super(new NullVector("_dummy_", numRows), null, new NullabilityHolder(numRows));
nullabilityHolder().setNulls(0, numRows);
this.numRows = numRows;
this.constantValue = null;
}

public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) {
super(icebergField);
super(
new NullVector(icebergField.name(), numRows),
slessard marked this conversation as resolved.
Show resolved Hide resolved
slessard marked this conversation as resolved.
Show resolved Hide resolved
icebergField,
new NullabilityHolder(numRows));
nullabilityHolder().setNulls(0, numRows);
this.numRows = numRows;
this.constantValue = constantValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ public static VectorizedArrowReader nulls() {
return NullVectorReader.INSTANCE;
}

public static VectorizedArrowReader nulls(Types.NestedField icebergField) {
return new NullVectorReader(icebergField);
}

public static VectorizedArrowReader positions() {
return new PositionVectorReader(false);
}
Expand All @@ -464,11 +468,15 @@ public static VectorizedArrowReader positionsWithSetArrowValidityVector() {
}

private static final class NullVectorReader extends VectorizedArrowReader {
private static final NullVectorReader INSTANCE = new NullVectorReader();
private static final NullVectorReader INSTANCE = new NullVectorReader(null);

private NullVectorReader(Types.NestedField icebergField) {
super(icebergField);
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.dummyHolder(numValsToRead);
return new VectorHolder.ConstantVectorHolder<>(icebergField(), numValsToRead, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public VectorizedReader<?> message(
} else if (reader != null) {
reorderedFields.add(reader);
} else {
reorderedFields.add(VectorizedArrowReader.nulls());
nastra marked this conversation as resolved.
Show resolved Hide resolved
reorderedFields.add(VectorizedArrowReader.nulls(field));
}
}
return vectorizedReader(reorderedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -70,6 +71,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand Down Expand Up @@ -262,6 +264,120 @@ public void testReadColumnFilter2() throws Exception {
scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp"));
}

@Test
public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception {
rowsWritten = Lists.newArrayList();
tables = new HadoopTables();

List<Field> expectedFields =
ImmutableList.of(
new Field("a", new FieldType(false, MinorType.INT.getType(), null), null),
new Field("b", new FieldType(true, MinorType.INT.getType(), null), null),
new Field("z", new FieldType(true, MinorType.NULL.getType(), null), null));
org.apache.arrow.vector.types.pojo.Schema expectedSchema =
new org.apache.arrow.vector.types.pojo.Schema(expectedFields);

Schema originalSchema =
new Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()),
Types.NestedField.optional(2, "b", Types.IntegerType.get()));

PartitionSpec spec = PartitionSpec.builderFor(originalSchema).build();
Table table = tables.create(originalSchema, spec, tableLocation);

// Add one record to the table
{
slessard marked this conversation as resolved.
Show resolved Hide resolved
GenericRecord rec = GenericRecord.create(originalSchema);
rec.setField("a", 1);
List<GenericRecord> genericRecords = Lists.newArrayList();
genericRecords.add(rec);

AppendFiles appendFiles = table.newAppend();
appendFiles.appendFile(writeParquetFile(table, genericRecords));
appendFiles.commit();
}

// Alter the table schema by adding a new, optional column.
// Do not add any data for this new column in the one existing row in the table, i.e. no default value
UpdateSchema updateSchema = table.updateSchema().addColumn("z", Types.IntegerType.get());
Schema newSchema = updateSchema.apply();
updateSchema.commit();

// Add one more record to the table
{
slessard marked this conversation as resolved.
Show resolved Hide resolved
GenericRecord rec = GenericRecord.create(newSchema);
rec.setField("a", 2);
rec.setField("b", 2);
rec.setField("z", 2);
List<GenericRecord> genericRecords = Lists.newArrayList();
genericRecords.add(rec);

AppendFiles appendFiles = table.newAppend();
appendFiles.appendFile(writeParquetFile(table, genericRecords));
appendFiles.commit();
}

// Select all columns, all rows from the table
TableScan scan = table.newScan().select("*");

Set<String> columns = ImmutableSet.of("a", "b", "z");
// Read the data and verify that the returned ColumnarBatches match expected rows.
try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) {
int rowIndex = 0;
for (ColumnarBatch batch : itr) {
List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1);
rowIndex++;

assertThat(batch.numRows()).isEqualTo(1);
assertThat(batch.numCols()).isEqualTo(columns.size());

checkColumnarArrayValues(
1,
expectedRows,
batch,
0,
columns,
"a",
(records, i) -> records.get(i).getField("a"),
ColumnVector::getInt);
checkColumnarArrayValues(
1,
expectedRows,
batch,
1,
columns,
"b",
(records, i) -> records.get(i).getField("b"),
(columnVector, i) -> columnVector.isNullAt(i) ? null : columnVector.getInt(i));
checkColumnarArrayValues(
1,
expectedRows,
batch,
2,
columns,
"z",
(records, i) -> records.get(i).getField("z"),
(columnVector, i) -> columnVector.isNullAt(i) ? null : columnVector.getInt(i));
}
}

// Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.
slessard marked this conversation as resolved.
Show resolved Hide resolved
try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) {
int totalRows = 0;
int rowIndex = 0;
for (ColumnarBatch batch : itr) {
List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1);
rowIndex++;
VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
assertThat(root.getSchema()).isEqualTo(expectedSchema);
checkAllVectorTypes(root, columns);
checkAllVectorValues(1, expectedRows, root, columns);
totalRows += root.getRowCount();
assertThat(totalRows).isEqualTo(1);
}
}
}

/**
* The test asserts that {@link CloseableIterator#hasNext()} returned by the {@link ArrowReader}
* is idempotent.
Expand Down