Skip to content

Commit

Permalink
[SPARK-33593][SQL] Vector reader got incorrect data with binary parti…
Browse files Browse the repository at this point in the history
…tion value

Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
```scala
test("Parquet vector reader incorrect with binary partition value") {
  Seq(false, true).foreach(tag => {
    withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
      withTable("t1") {
        sql(
          """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
            | USING PARQUET PARTITIONED BY (part)""".stripMargin)
        sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
        if (tag) {
          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
            Row("a", "Spark SQL", ""))
        } else {
          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
            Row("a", "Spark SQL", "Spark SQL"))
        }
      }
    }
  })
}
```

Fix data incorrect issue

No

Added UT

Closes #30824 from AngersZhuuuu/SPARK-33593.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 0603913)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
AngersZhuuuu authored and dongjoon-hyun committed Dec 18, 2020
1 parent 0f9f950 commit 8a269c7
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
} else if (t == DataTypes.BinaryType) {
col.putByteArray(0, row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
Expand Down Expand Up @@ -94,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3732,6 +3732,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true))
}
}

test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
Seq("false", "true").foreach(value => {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t1") {
sql(
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
|USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", "Spark SQL"))
}
}

withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t2") {
sql(
"""CREATE TABLE t2(name STRING, id BINARY, part BINARY)
|USING ORC PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
})
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.UTF8String.fromString

class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {

import testImplicits._

private val dataSchema = StructType.fromDDL("col1 int, col2 int")
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
Expand Down Expand Up @@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
}
}

test("SPARK-33593: partition column types") {
withTempPath { dir =>
Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
val partitionValues = new GenericInternalRow(Array(v))
val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
val vectorizedReader = new OrcColumnarBatchReader(4096)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

try {
vectorizedReader.initialize(fileSplit, taskAttemptContext)
vectorizedReader.initBatch(
orcFileSchema,
schema.toArray,
Array(0, -1),
Array(-1, 0),
partitionValues)
vectorizedReader.nextKeyValue()
val row = vectorizedReader.getCurrentValue.getRow(0)

// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]]
sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,14 +790,15 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
Expand Down Expand Up @@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
assert(actual == expected)
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
Expand Down

0 comments on commit 8a269c7

Please sign in to comment.