Skip to content

Commit

Permalink
[SPARK-6851][SQL] Create new instance for each converted parquet rela…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
marmbrus committed Apr 10, 2015
1 parent b9baa4c commit 22df77c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}

if (metastoreRelation.hiveQlTable.isPartitioned) {
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
Expand Down Expand Up @@ -314,6 +314,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

parquetRelation
}

result.newInstance()
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,90 @@ case class Nested3(f3: Int)
case class NestedArray2(b: Seq[Int])
case class NestedArray1(a: NestedArray2)

case class Order(
id: Int,
make: String,
`type`: String,
price: Int,
pdate: String,
customer: String,
city: String,
state: String,
month: Int)

/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
* valid, but Hive currently cannot execute it.
*/
class SQLQuerySuite extends QueryTest {

test("SPARK-6851: Self-joined converted parquet tables") {
val orders = Seq(
Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))

val orderUpdates = Seq(
Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))

orders.toDF.registerTempTable("orders1")
orderUpdates.toDF.registerTempTable("orderupdates1")

sql(
"""CREATE TABLE orders(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)

sql(
"""CREATE TABLE orderupdates(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)

sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")

checkAnswer(
sql(
"""
|select orders.state, orders.month
|from orders
|join (
| select distinct orders.state,orders.month
| from orders
| join orderupdates
| on orderupdates.id = orders.id) ao
| on ao.state = orders.state and ao.month = orders.month
""".stripMargin),
(1 to 6).map(_ => Row("CA", 20151)))
}

test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")
Expand Down Expand Up @@ -478,5 +555,4 @@ class SQLQuerySuite extends QueryTest {
sql("select d from dn union all select d * 2 from dn")
.queryExecution.analyzed
}

}

0 comments on commit 22df77c

Please sign in to comment.