Skip to content

Commit

Permalink
Fixes complex type support in Hive 0.13.1 shim
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Nov 15, 2014
1 parent a92882a commit 26fa955
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.ServerSocket
import java.sql.{Date, DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
Expand Down Expand Up @@ -275,9 +276,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

queries.foreach(statement.execute)

val resultSet = statement.executeQuery("SELECT key FROM test_4292")

Seq(238, 86, 311, 27, 165).foreach { key =>
val resultSet = statement.executeQuery("SELECT key FROM test_4292")
resultSet.next()
assert(resultSet.getInt(1) === key)
}
Expand All @@ -295,13 +295,36 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

queries.foreach(statement.execute)

val resultSet = statement.executeQuery(
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")

assertResult(Date.valueOf("2011-01-01")) {
val resultSet = statement.executeQuery(
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
resultSet.next()
resultSet.getDate(1)
}
}
}

test("SPARK-4407 regression: Complex type support") {
withJdbcStatement() { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")

queries.foreach(statement.execute)

assertResult("""{238:"val_238"}""") {
val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}

assertResult("""["238","val_238"]""") {
val resultSet = statement.executeQuery(
"SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,21 @@ private[hive] class SparkExecuteStatementOperation(
case TimestampType =>
to += from.getAs[Timestamp](ordinal)
case StringType | BinaryType | _: ArrayType | _: StructType | _: MapType =>
to += from.getAs[String](ordinal)
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString(from.get(ordinal) -> dataTypes(ordinal))
to += hiveString
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
reultRowSet
resultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
Expand All @@ -160,10 +164,10 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
}
reultRowSet
resultRowSet
}
}

Expand Down

0 comments on commit 26fa955

Please sign in to comment.