Skip to content

Commit

Permalink
yin's comment: outputOrdering, join suite refine
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 15, 2015
1 parent ec8061b commit f515cd2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
// True iff any of the children are incorrectly sorted.
def needsAnySort: Boolean =
operator.requiredChildOrdering.zip(operator.children).exists {
case (required, child) => required.nonEmpty && required != child
case (required, child) => required.nonEmpty && required != child.outputOrdering
}

// True iff outputPartitionings of children are compatible with each other.
Expand Down Expand Up @@ -233,7 +233,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
}
}

if (meetsRequirements && compatible && !needsAnySort) {
if (meetsRequirements && compatible && !needsAnySort) {
operator
} else {
// At least one child does not satisfies its required data distribution or
Expand Down
12 changes: 11 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
case j: CartesianProduct => j
case j: BroadcastNestedLoopJoin => j
case j: BroadcastLeftSemiJoinHash => j
case j: ShuffledHashJoin => j
case j: SortMergeJoin => j
}

Expand Down Expand Up @@ -110,11 +109,22 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
cacheManager.clearCache()
sql("CACHE TABLE testData")

val SORTMERGEJOIN_ENABLED: Boolean = conf.sortMergeJoinEnabled
Seq(
("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a and key = 2", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a where key = 2", classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
conf.setConf("spark.sql.planner.sortMergeJoin", "true")
Seq(
("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a and key = 2", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a where key = 2", classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString)
}

sql("UNCACHE TABLE testData")
}
Expand Down

0 comments on commit f515cd2

Please sign in to comment.