From 26fa9555398559535f73baafbe648adf8f794b92 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 15 Nov 2014 01:50:58 +0800 Subject: [PATCH] Fixes complex type support in Hive 0.13.1 shim --- .../thriftserver/HiveThriftServer2Suite.scala | 33 ++++++++++++++++--- .../spark/sql/hive/thriftserver/Shim13.scala | 14 +++++--- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 55dfa599b70c0..e03514f2c0c41 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -22,6 +22,7 @@ import java.net.ServerSocket import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -275,9 +276,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { queries.foreach(statement.execute) - val resultSet = statement.executeQuery("SELECT key FROM test_4292") - Seq(238, 86, 311, 27, 165).foreach { key => + val resultSet = statement.executeQuery("SELECT key FROM test_4292") resultSet.next() assert(resultSet.getInt(1) === key) } @@ -295,13 +295,36 @@ class HiveThriftServer2Suite extends FunSuite with Logging { queries.foreach(statement.execute) - val resultSet = statement.executeQuery( - "SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1") - assertResult(Date.valueOf("2011-01-01")) { + val resultSet = statement.executeQuery( + "SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1") resultSet.next() resultSet.getDate(1) } } } + + test("SPARK-4407 regression: Complex type support") { + withJdbcStatement() { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + assertResult("""{238:"val_238"}""") { + val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1") + resultSet.next() + resultSet.getString(1) + } + + assertResult("""["238","val_238"]""") { + val resultSet = statement.executeQuery( + "SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1") + resultSet.next() + resultSet.getString(1) + } + } + } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 9caba6648ad52..a00795c411128 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -133,7 +133,11 @@ private[hive] class SparkExecuteStatementOperation( case TimestampType => to += from.getAs[Timestamp](ordinal) case StringType | BinaryType | _: ArrayType | _: StructType | _: MapType => - to += from.getAs[String](ordinal) + val hiveString = result + .queryExecution + .asInstanceOf[HiveContext#QueryExecution] + .toHiveString(from.get(ordinal) -> dataTypes(ordinal)) + to += hiveString } } @@ -141,9 +145,9 @@ private[hive] class SparkExecuteStatementOperation( validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) - val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) + val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) if (!iter.hasNext) { - reultRowSet + resultRowSet } else { // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int val maxRows = maxRowsL.toInt @@ -160,10 +164,10 @@ private[hive] class SparkExecuteStatementOperation( } curCol += 1 } - reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) + resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) curRow += 1 } - reultRowSet + resultRowSet } }