Skip to content

Commit

Permalink
fix new style and add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 10, 2015
1 parent 925203b commit 068c35d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: I
extends Expression
with Partitioning {

override def children = expressions
override def nullable = false
override def dataType = IntegerType
override def children: Seq[Expression] = expressions
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

private[this] lazy val clusteringSet = expressions.toSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.autoSortMergeJoin =>
val mergeJoin =
joins.SortMergeJoin(leftKeys, rightKeys, Inner, planLater(left), planLater(right))
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -33,7 +34,6 @@ import org.apache.spark.util.collection.CompactBuffer
case class SortMergeJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
left: SparkPlan,
right: SparkPlan) extends BinaryNode {

Expand All @@ -52,7 +52,7 @@ case class SortMergeJoin(
@transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output)
@transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output)

override def execute() = {
override def execute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.test.TestHive

/**
* Runs the test cases that are included in the hive distribution with sort merge join is true.
*/
class SortMergeCompatibilitySuite extends HiveCompatibilitySuite {
override def beforeAll() {
super.beforeAll()
TestHive.setConf(SQLConf.AUTO_SORTMERGEJOIN, "true")
}

override def afterAll() {
TestHive.setConf(SQLConf.AUTO_SORTMERGEJOIN, "false")
super.afterAll()
}

override def whiteList = Seq(
"auto_join0",
"auto_join1",
"auto_join10",
"auto_join11",
"auto_join12",
"auto_join13",
"auto_join14",
"auto_join14_hadoop20",
"auto_join15",
"auto_join17",
"auto_join18",
"auto_join19",
"auto_join2",
"auto_join20",
"auto_join21",
"auto_join22",
"auto_join23",
"auto_join24",
"auto_join25",
"auto_join26",
"auto_join27",
"auto_join28",
"auto_join3",
"auto_join30",
"auto_join31",
"auto_join32",
"auto_join4",
"auto_join5",
"auto_join6",
"auto_join7",
"auto_join8",
"auto_join9",
"auto_join_filters",
"auto_join_nulls",
"auto_join_reordering_values",
"auto_smb_mapjoin_14",
"auto_sortmerge_join_1",
"auto_sortmerge_join_10",
"auto_sortmerge_join_11",
"auto_sortmerge_join_12",
"auto_sortmerge_join_13",
"auto_sortmerge_join_14",
"auto_sortmerge_join_15",
"auto_sortmerge_join_16",
"auto_sortmerge_join_2",
"auto_sortmerge_join_3",
"auto_sortmerge_join_4",
"auto_sortmerge_join_5",
"auto_sortmerge_join_6",
"auto_sortmerge_join_7",
"auto_sortmerge_join_8",
"auto_sortmerge_join_9",
"join0",
"join1",
"join10",
"join11",
"join12",
"join13",
"join14",
"join14_hadoop20",
"join15",
"join16",
"join17",
"join18",
"join19",
"join2",
"join20",
"join21",
"join22",
"join23",
"join24",
"join25",
"join26",
"join27",
"join28",
"join29",
"join3",
"join30",
"join31",
"join32",
"join32_lessSize",
"join33",
"join34",
"join35",
"join36",
"join37",
"join38",
"join39",
"join4",
"join40",
"join41",
"join5",
"join6",
"join7",
"join8",
"join9",
"join_1to1",
"join_array",
"join_casesensitive",
"join_empty",
"join_filters",
"join_hive_626",
"join_map_ppr",
"join_nulls",
"join_nullsafe",
"join_rc",
"join_reorder2",
"join_reorder3",
"join_reorder4",
"join_star"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
expectedAnswer: Seq[Row],
ct: ClassTag[_]) = {
before()
conf.setConf("spark.sql.autoSortMergeJoin", "false")

var df = sql(query)

Expand Down Expand Up @@ -179,7 +178,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
}

conf.setConf("spark.sql.autoSortMergeJoin", "true")
after()
}

Expand Down

0 comments on commit 068c35d

Please sign in to comment.