Skip to content

Commit

Permalink
[SPARK-33593][SQL] Vector reader got incorrect data with binary parti…
Browse files Browse the repository at this point in the history
…tion value

### What changes were proposed in this pull request?

Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
```scala
test("Parquet vector reader incorrect with binary partition value") {
  Seq(false, true).foreach(tag => {
    withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
      withTable("t1") {
        sql(
          """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
            | USING PARQUET PARTITIONED BY (part)""".stripMargin)
        sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
        if (tag) {
          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
            Row("a", "Spark SQL", ""))
        } else {
          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
            Row("a", "Spark SQL", "Spark SQL"))
        }
      }
    }
  })
}
```

### Why are the changes needed?
Fix data incorrect issue

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #30824 from AngersZhuuuu/SPARK-33593.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
AngersZhuuuu authored and dongjoon-hyun committed Dec 18, 2020
1 parent b0da2bc commit 0603913
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
} else if (t == DataTypes.BinaryType) {
col.putByteArray(0, row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
Expand Down Expand Up @@ -94,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3745,6 +3745,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
Seq("false", "true").foreach(value => {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t1") {
sql(
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
|USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", "Spark SQL"))
}
}

withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t2") {
sql(
"""CREATE TABLE t2(name STRING, id BINARY, part BINARY)
|USING ORC PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
})
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.UTF8String.fromString

class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {

import testImplicits._

private val dataSchema = StructType.fromDDL("col1 int, col2 int")
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
Expand Down Expand Up @@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
}
}

test("SPARK-33593: partition column types") {
withTempPath { dir =>
Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
val partitionValues = new GenericInternalRow(Array(v))
val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
val vectorizedReader = new OrcColumnarBatchReader(4096)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

try {
vectorizedReader.initialize(fileSplit, taskAttemptContext)
vectorizedReader.initBatch(
orcFileSchema,
schema.toArray,
Array(0, -1),
Array(-1, 0),
partitionValues)
vectorizedReader.nextKeyValue()
val row = vectorizedReader.getCurrentValue.getRow(0)

// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]]
sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,14 +790,15 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
Expand Down Expand Up @@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
assert(actual == expected)
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
Expand Down

0 comments on commit 0603913

Please sign in to comment.