-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1495][SQL]add support for left semi join #837
Changes from all commits
14cff80
83a3c8a
8d4a121
4c726e5
5ec6fa4
035b73e
6713c09
d39cd12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,6 +140,137 @@ case class HashJoin( | |
} | ||
} | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize that we aren't particularly good about this in most of the other physical operators, but could you add some Scala doc here about how this operator works and what the expected performance characteristics are? Same below. The goal of the Scala doc for physical operators should be to make it easy for people to understand query plans that are printed out by EXPLAIN. |
||
* Build the right table's join keys into a HashSet, and iteratively go through the left | ||
* table, to find the if join keys are in the Hash set. | ||
*/ | ||
@DeveloperApi | ||
case class LeftSemiJoinHash( | ||
leftKeys: Seq[Expression], | ||
rightKeys: Seq[Expression], | ||
left: SparkPlan, | ||
right: SparkPlan) extends BinaryNode { | ||
|
||
override def outputPartitioning: Partitioning = left.outputPartitioning | ||
|
||
override def requiredChildDistribution = | ||
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil | ||
|
||
val (buildPlan, streamedPlan) = (right, left) | ||
val (buildKeys, streamedKeys) = (rightKeys, leftKeys) | ||
|
||
def output = left.output | ||
|
||
@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) | ||
@transient lazy val streamSideKeyGenerator = | ||
() => new MutableProjection(streamedKeys, streamedPlan.output) | ||
|
||
def execute() = { | ||
|
||
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => | ||
val hashTable = new java.util.HashSet[Row]() | ||
var currentRow: Row = null | ||
|
||
// Create a Hash set of buildKeys | ||
while (buildIter.hasNext) { | ||
currentRow = buildIter.next() | ||
val rowKey = buildSideKeyGenerator(currentRow) | ||
if(!rowKey.anyNull) { | ||
val keyExists = hashTable.contains(rowKey) | ||
if (!keyExists) { | ||
hashTable.add(rowKey) | ||
} | ||
} | ||
} | ||
|
||
new Iterator[Row] { | ||
private[this] var currentStreamedRow: Row = _ | ||
private[this] var currentHashMatched: Boolean = false | ||
|
||
private[this] val joinKeys = streamSideKeyGenerator() | ||
|
||
override final def hasNext: Boolean = | ||
streamIter.hasNext && fetchNext() | ||
|
||
override final def next() = { | ||
currentStreamedRow | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct if the operator is created with BuildLeft instead of BuildRight? I think that would turn it into a RightSemiJoin. Perhaps we should just remove the option to build on the other side. I think you can then also safely simplify this to use a HashSet instead of a HashMap, which will reduce memory consumption significantly. |
||
} | ||
|
||
/** | ||
* Searches the streamed iterator for the next row that has at least one match in hashtable. | ||
* | ||
* @return true if the search is successful, and false the streamed iterator runs out of | ||
* tuples. | ||
*/ | ||
private final def fetchNext(): Boolean = { | ||
currentHashMatched = false | ||
while (!currentHashMatched && streamIter.hasNext) { | ||
currentStreamedRow = streamIter.next() | ||
if (!joinKeys(currentStreamedRow).anyNull) { | ||
currentHashMatched = hashTable.contains(joinKeys.currentValue) | ||
} | ||
} | ||
currentHashMatched | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Using BroadcastNestedLoopJoin to calculate left semi join result when there's no join keys | ||
* for hash join. | ||
*/ | ||
@DeveloperApi | ||
case class LeftSemiJoinBNL( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this operator is exercised by the included test cases. We should add a test where the join condition can be calculated with hash keys. |
||
streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression]) | ||
(@transient sc: SparkContext) | ||
extends BinaryNode { | ||
// TODO: Override requiredChildDistribution. | ||
|
||
override def outputPartitioning: Partitioning = streamed.outputPartitioning | ||
|
||
override def otherCopyArgs = sc :: Nil | ||
|
||
def output = left.output | ||
|
||
/** The Streamed Relation */ | ||
def left = streamed | ||
/** The Broadcast relation */ | ||
def right = broadcast | ||
|
||
@transient lazy val boundCondition = | ||
InterpretedPredicate( | ||
condition | ||
.map(c => BindReferences.bindReference(c, left.output ++ right.output)) | ||
.getOrElse(Literal(true))) | ||
|
||
|
||
def execute() = { | ||
val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) | ||
|
||
streamed.execute().mapPartitions { streamedIter => | ||
val joinedRow = new JoinedRow | ||
|
||
streamedIter.filter(streamedRow => { | ||
var i = 0 | ||
var matched = false | ||
|
||
while (i < broadcastedRelation.value.size && !matched) { | ||
val broadcastedRow = broadcastedRelation.value(i) | ||
if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { | ||
matched = true | ||
} | ||
i += 1 | ||
} | ||
matched | ||
}) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
Hank 2 | ||
Hank 2 | ||
Joe 2 | ||
Joe 2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
Hank 2 | ||
Hank 2 | ||
Joe 2 | ||
Joe 2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
2 Tie | ||
2 Tie |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
1 | ||
1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
1 | ||
1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 | ||
1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
1 | ||
1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general we should avoid making too many assumptions in the planner about what optimizations have occurred. For example, in the future we might avoid pushing down predicates that are very expensive to evaluate as it might be cheaper to run them on an already filtered set of data. However, in the case of LEFT SEMI JOIN, I think it is actually okay to push all evaluation into the join condition, even if they only refer to the left table. Is that true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think LEFT SEMI JOIN would not suffer by pushing down predicates.