From cfa37b026a4f2a313ec7113e3ab529261fda4a4e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 12 May 2019 19:59:56 +0900 Subject: [PATCH] [SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch ## What changes were proposed in this pull request? To move DS v2 API to the catalyst module, we can't refer to an internal class (`MutableColumnarRow`) in `ColumnarBatch`. This PR creates a read-only version of `MutableColumnarRow`, and use it in `ColumnarBatch`. close https://github.com/apache/spark/pull/24546 ## How was this patch tested? existing tests Closes #24581 from cloud-fan/mutable-row. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../spark/sql/vectorized/ColumnarBatch.java | 175 +++++++++++++++++- .../vectorized/MutableColumnarRow.java | 44 ++--- 2 files changed, 189 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 07546a54013ec..9f917ea11d72a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -20,7 +20,10 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; /** * This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this @@ -33,7 +36,7 @@ public final class ColumnarBatch { private final ColumnVector[] columns; // Staging row returned from `getRow`. - private final MutableColumnarRow row; + private final ColumnarBatchRow row; /** * Called to close all the columns in this batch. It is not valid to access the data after @@ -50,7 +53,7 @@ public void close() { */ public Iterator rowIterator() { final int maxRows = numRows; - final MutableColumnarRow row = new MutableColumnarRow(columns); + final ColumnarBatchRow row = new ColumnarBatchRow(columns); return new Iterator() { int rowId = 0; @@ -108,6 +111,170 @@ public InternalRow getRow(int rowId) { public ColumnarBatch(ColumnVector[] columns) { this.columns = columns; - this.row = new MutableColumnarRow(columns); + this.row = new ColumnarBatchRow(columns); } } + +/** + * An internal class, which wraps an array of {@link ColumnVector} and provides a row view. + */ +class ColumnarBatchRow extends InternalRow { + public int rowId; + private final ColumnVector[] columns; + + ColumnarBatchRow(ColumnVector[] columns) { + this.columns = columns; + } + + @Override + public int numFields() { return columns.length; } + + @Override + public InternalRow copy() { + GenericInternalRow row = new GenericInternalRow(columns.length); + for (int i = 0; i < numFields(); i++) { + if (isNullAt(i)) { + row.setNullAt(i); + } else { + DataType dt = columns[i].dataType(); + if (dt instanceof BooleanType) { + row.setBoolean(i, getBoolean(i)); + } else if (dt instanceof ByteType) { + row.setByte(i, getByte(i)); + } else if (dt instanceof ShortType) { + row.setShort(i, getShort(i)); + } else if (dt instanceof IntegerType) { + row.setInt(i, getInt(i)); + } else if (dt instanceof LongType) { + row.setLong(i, getLong(i)); + } else if (dt instanceof FloatType) { + row.setFloat(i, getFloat(i)); + } else if (dt instanceof DoubleType) { + row.setDouble(i, getDouble(i)); + } else if (dt instanceof StringType) { + row.update(i, getUTF8String(i).copy()); + } else if (dt instanceof BinaryType) { + row.update(i, getBinary(i)); + } else if (dt instanceof DecimalType) { + DecimalType t = (DecimalType)dt; + row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision()); + } else if (dt instanceof DateType) { + row.setInt(i, getInt(i)); + } else if (dt instanceof TimestampType) { + row.setLong(i, getLong(i)); + } else { + throw new RuntimeException("Not implemented. " + dt); + } + } + } + return row; + } + + @Override + public boolean anyNull() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); } + + @Override + public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); } + + @Override + public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); } + + @Override + public short getShort(int ordinal) { return columns[ordinal].getShort(rowId); } + + @Override + public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); } + + @Override + public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); } + + @Override + public float getFloat(int ordinal) { return columns[ordinal].getFloat(rowId); } + + @Override + public double getDouble(int ordinal) { return columns[ordinal].getDouble(rowId); } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return columns[ordinal].getDecimal(rowId, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return columns[ordinal].getUTF8String(rowId); + } + + @Override + public byte[] getBinary(int ordinal) { + return columns[ordinal].getBinary(rowId); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return columns[ordinal].getInterval(rowId); + } + + @Override + public ColumnarRow getStruct(int ordinal, int numFields) { + return columns[ordinal].getStruct(rowId); + } + + @Override + public ColumnarArray getArray(int ordinal) { + return columns[ordinal].getArray(rowId); + } + + @Override + public ColumnarMap getMap(int ordinal) { + return columns[ordinal].getMap(rowId); + } + + @Override + public Object get(int ordinal, DataType dataType) { + if (dataType instanceof BooleanType) { + return getBoolean(ordinal); + } else if (dataType instanceof ByteType) { + return getByte(ordinal); + } else if (dataType instanceof ShortType) { + return getShort(ordinal); + } else if (dataType instanceof IntegerType) { + return getInt(ordinal); + } else if (dataType instanceof LongType) { + return getLong(ordinal); + } else if (dataType instanceof FloatType) { + return getFloat(ordinal); + } else if (dataType instanceof DoubleType) { + return getDouble(ordinal); + } else if (dataType instanceof StringType) { + return getUTF8String(ordinal); + } else if (dataType instanceof BinaryType) { + return getBinary(ordinal); + } else if (dataType instanceof DecimalType) { + DecimalType t = (DecimalType) dataType; + return getDecimal(ordinal, t.precision(), t.scale()); + } else if (dataType instanceof DateType) { + return getInt(ordinal); + } else if (dataType instanceof TimestampType) { + return getLong(ordinal); + } else if (dataType instanceof ArrayType) { + return getArray(ordinal); + } else if (dataType instanceof StructType) { + return getStruct(ordinal, ((StructType)dataType).fields().length); + } else if (dataType instanceof MapType) { + return getMap(ordinal); + } else { + throw new UnsupportedOperationException("Datatype not supported " + dataType); + } + } + + @Override + public void update(int ordinal, Object value) { throw new UnsupportedOperationException(); } + + @Override + public void setNullAt(int ordinal) { throw new UnsupportedOperationException(); } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java index 4e4242fe8d9b9..fca7e36859126 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java @@ -26,7 +26,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.sql.vectorized.ColumnarRow; -import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -39,17 +38,10 @@ */ public final class MutableColumnarRow extends InternalRow { public int rowId; - private final ColumnVector[] columns; - private final WritableColumnVector[] writableColumns; - - public MutableColumnarRow(ColumnVector[] columns) { - this.columns = columns; - this.writableColumns = null; - } + private final WritableColumnVector[] columns; public MutableColumnarRow(WritableColumnVector[] writableColumns) { this.columns = writableColumns; - this.writableColumns = writableColumns; } @Override @@ -228,54 +220,54 @@ public void update(int ordinal, Object value) { @Override public void setNullAt(int ordinal) { - writableColumns[ordinal].putNull(rowId); + columns[ordinal].putNull(rowId); } @Override public void setBoolean(int ordinal, boolean value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putBoolean(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putBoolean(rowId, value); } @Override public void setByte(int ordinal, byte value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putByte(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putByte(rowId, value); } @Override public void setShort(int ordinal, short value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putShort(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putShort(rowId, value); } @Override public void setInt(int ordinal, int value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putInt(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putInt(rowId, value); } @Override public void setLong(int ordinal, long value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putLong(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putLong(rowId, value); } @Override public void setFloat(int ordinal, float value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putFloat(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putFloat(rowId, value); } @Override public void setDouble(int ordinal, double value) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putDouble(rowId, value); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putDouble(rowId, value); } @Override public void setDecimal(int ordinal, Decimal value, int precision) { - writableColumns[ordinal].putNotNull(rowId); - writableColumns[ordinal].putDecimal(rowId, value, precision); + columns[ordinal].putNotNull(rowId); + columns[ordinal].putDecimal(rowId, value, precision); } }