From 3b27af36f82580c2171df965140c9a14e62fd5f0 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Fri, 30 Jan 2015 21:48:21 +0800 Subject: [PATCH] SPARK-5498:fix bug when query the data when partition schema does not match table schema --- .../apache/spark/sql/hive/TableReader.scala | 44 ++++++++++++++++--- .../sql/hive/InsertIntoHiveTableSuite.scala | 15 +++++++ 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index c368715f7c6f5..8f955b699981e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc} import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} @@ -188,9 +188,13 @@ class HadoopTableReader( val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() deserializer.initialize(hconf, partProps) + //get the table deserializer + val tableSerDe = tableDesc.getDeserializerClass.newInstance() + tableSerDe.initialize(hconf, tableDesc.getProperties) // fill the non partition key attributes - HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) + HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, + mutableRow, Some(tableSerDe)) } }.toSeq @@ -264,15 +268,31 @@ private[hive] object HadoopTableReader extends HiveInspectors { * @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding * positions in the output schema * @param mutableRow A reusable `MutableRow` that should be filled + * @param convertdeserializer The `Deserializer` covert the `deserializer` * @return An `Iterator[Row]` transformed from `iterator` */ def fillObject( iterator: Iterator[Writable], deserializer: Deserializer, nonPartitionKeyAttrs: Seq[(Attribute, Int)], - mutableRow: MutableRow): Iterator[Row] = { + mutableRow: MutableRow, + convertdeserializer: Option[Deserializer] = None): Iterator[Row] = { + + val soi = convertdeserializer match { + case Some(convert) => + //check need to convert + if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) { + deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] + } + else { + ObjectInspectorConverters.getConvertedOI( + deserializer.getObjectInspector(), convert.getObjectInspector(), true) + .asInstanceOf[StructObjectInspector] + } + case None => + deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] + } - val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal }.unzip @@ -315,9 +335,23 @@ private[hive] object HadoopTableReader extends HiveInspectors { } } + val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( + deserializer.getObjectInspector, soi) + // Map each tuple to a row object iterator.map { value => - val raw = deserializer.deserialize(value) + val raw = convertdeserializer match { + case Some(convert) => + if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) { + deserializer.deserialize(value) + } + //If partition schema does not match table schema, update the row to match + else { + partTblObjectInspectorConverter.convert(deserializer.deserialize(value)) + } + case None => + deserializer.deserialize(value) + } var i = 0 while (i < fieldRefs.length) { val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 4dd96bd5a1b77..de87da1b7f82e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -172,4 +172,19 @@ class InsertIntoHiveTableSuite extends QueryTest { sql("DROP TABLE hiveTableWithStructValue") } + + test("SPARK-5498:partition schema does not match table schema"){ + val testData = TestHive.sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))) + testData.registerTempTable("testData") + val tmpDir = Files.createTempDir() + sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData") + sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT") + checkAnswer(sql("select key,value from table_with_partition where ds='1' "), + testData.toSchemaRDD.collect.toSeq + ) + sql("DROP TABLE table_with_partition") + + } }