Skip to content

Commit

Permalink
SPARK-5498:fix bug when query the data when partition schema does not…
Browse files Browse the repository at this point in the history
… match table schema
  • Loading branch information
jeanlyn committed Feb 2, 2015
1 parent 1ca0a10 commit 3b27af3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
Expand Down Expand Up @@ -188,9 +188,13 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
//get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, Some(tableSerDe))
}
}.toSeq

Expand Down Expand Up @@ -264,15 +268,31 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param convertdeserializer The `Deserializer` covert the `deserializer`
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
mutableRow: MutableRow,
convertdeserializer: Option[Deserializer] = None): Iterator[Row] = {

val soi = convertdeserializer match {
case Some(convert) =>
//check need to convert
if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) {
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}
else {
ObjectInspectorConverters.getConvertedOI(
deserializer.getObjectInspector(), convert.getObjectInspector(), true)
.asInstanceOf[StructObjectInspector]
}
case None =>
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}

val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
Expand Down Expand Up @@ -315,9 +335,23 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}

val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
deserializer.getObjectInspector, soi)

// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
val raw = convertdeserializer match {
case Some(convert) =>
if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) {
deserializer.deserialize(value)
}
//If partition schema does not match table schema, update the row to match
else {
partTblObjectInspectorConverter.convert(deserializer.deserialize(value))
}
case None =>
deserializer.deserialize(value)
}
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,19 @@ class InsertIntoHiveTableSuite extends QueryTest {

sql("DROP TABLE hiveTableWithStructValue")
}

test("SPARK-5498:partition schema does not match table schema"){
val testData = TestHive.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString)))
testData.registerTempTable("testData")
val tmpDir = Files.createTempDir()
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.toSchemaRDD.collect.toSeq
)
sql("DROP TABLE table_with_partition")

}
}

0 comments on commit 3b27af3

Please sign in to comment.