Skip to content

Commit

Permalink
clean the code
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanlyn committed Feb 7, 2015
1 parent 2a91a87 commit c879aa1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 36 deletions.
47 changes: 18 additions & 29 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
Expand Down Expand Up @@ -115,7 +116,7 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
Expand Down Expand Up @@ -194,7 +195,7 @@ class HadoopTableReader(

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, Some(tableSerDe))
mutableRow, tableSerDe)
}
}.toSeq

Expand Down Expand Up @@ -264,37 +265,27 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* Transform all given raw `Writable`s into `Row`s.
*
* @param iterator Iterator of all `Writable`s to be transformed
* @param deserializer The `Deserializer` associated with the input `Writable`
* @param rawDeser The `Deserializer` associated with the input `Writable`
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param convertdeserializer The `Deserializer` covert the `deserializer`
* @param tableDeser Table Deserializer
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow,
convertdeserializer: Option[Deserializer] = None): Iterator[Row] = {
tableDeser: Deserializer): Iterator[Row] = {

val soi = convertdeserializer match {
case Some(convert) =>
// check need to convert
if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) {
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}
else {
HiveShim.getConvertedOI(
deserializer.getObjectInspector(),
convert.getObjectInspector()).asInstanceOf[StructObjectInspector]
}
case None =>
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}
val soi = HiveShim.getConvertedOI(
rawDeser.getObjectInspector, tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]

val inputFields = soi.getAllStructFieldRefs

val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
(inputFields.get(ordinal), ordinal)
}.unzip

// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
Expand Down Expand Up @@ -335,17 +326,15 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}

/**
* when the soi and deserializer.getObjectInspector is equal,
* we will get `IdentityConverter`,which mean it won't convert the
* value when schema match
*/
val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
deserializer.getObjectInspector, soi)
val converter = if (rawDeser == tableDeser) {
new IdentityConverter
} else {
ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)
}

// Map each tuple to a row object
iterator.map { value =>
val raw = partTblObjectInspectorConverter.convert(deserializer.deserialize(value))
val raw = converter.convert(rawDeser.deserialize(value))
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")

//test schema is the same
// test schema the same between partition and table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.toSchemaRDD.collect.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,9 @@ private[hive] object HiveShim {
}
}

// make getConvertedOI compatible between 0.12.0 and 0.13.1
def getConvertedOI(inputOI: ObjectInspector,
outputOI: ObjectInspector,
equalsCheck: java.lang.Boolean =
new java.lang.Boolean(true)): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, equalsCheck)
outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
}

def prepareWritable(w: Writable): Writable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ private[hive] object HiveShim {
}
}

// make getConvertedOI compatible between 0.12.0 and 0.13.1
def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
}
Expand Down

0 comments on commit c879aa1

Please sign in to comment.