Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
weiqingy committed Nov 6, 2016
1 parent 5684b27 commit 0a0cc25
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,34 @@ object HBaseFilter extends Logging{
HRF(Array(ScanRange.empty[Array[Byte]]), TypedFilter(Some(filter), FilterType.Atomic), true)
case In(attribute: String, values: Array[Any]) =>
//converting a "key in (x1, x2, x3..) filter to (key == x1) or (key == x2) or ...
val ranges = new ArrayBuffer[ScanRange[Array[Byte]]]()
/*val ranges = new ArrayBuffer[ScanRange[Array[Byte]]]()
values.foreach{
value =>
val sparkFilter = EqualTo(attribute, value)
val hbaseFilter = buildFilter(sparkFilter, relation)
ranges ++= hbaseFilter.ranges
}
HRF[Array[Byte]](ranges.toArray, TypedFilter.empty, true)
HRF[Array[Byte]](ranges.toArray, TypedFilter.empty, true)*/

val ret1 = values.map{v => buildFilter(EqualTo(attribute, v), relation)}

val ret = ret1.reduce[HRF[Array[Byte]]]{
case (lhs, rhs) => or(lhs,rhs)
}
ret.isHandled = true
ret

case Not(In(attribute: String, values: Array[Any])) =>
//converting a "not(key in (x1, x2, x3..)) filter to (key != x1) and (key != x2) and ..
val hrf = values.map{v => buildFilter(Not(EqualTo(attribute, v)),relation)}
.reduceOption[HRF[Array[Byte]]]{
val ret1 = values.map{v => buildFilter(Not(EqualTo(attribute, v)),relation)}

val hrf = ret1.reduceOption[HRF[Array[Byte]]]{
case (lhs, rhs) => and(lhs,rhs)
}.getOrElse(HRF.empty[Array[Byte]])

hrf.isHandled = false
hrf

case _ => HRF.empty[Array[Byte]]
}
logDebug(s"""start filter $filter: ${f.ranges.map(_.toString).mkString(" ")}""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,18 @@ object ScanRange {
def or[T](
left: Array[ScanRange[T]],
right: Array[ScanRange[T]])(implicit ordering: Ordering[T]): Array[ScanRange[T]] = {
left.foldLeft(right){ case (x, y) =>
ScanRange.or(y, x)

if(left.size <= right.size) {
left.foldLeft(right) { case (x, y) =>
ScanRange.or(y, x)
}
} else {
or(right, left)
}

/*left.foldLeft(right) { case (x, y) =>
ScanRange.or(y, x)
}*/
}

// Construct multi-dimensional scan ranges.
Expand Down
42 changes: 22 additions & 20 deletions core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ class DefaultSourceSuite extends SHC with Logging {
.save()
}

test("NOT IN filter stack overflow") {
val df = withCatalog(catalog)
val items = (0 to 2000).map{i => s"xaz$i"}
val filterNotInItems = items

val s = df.filter(not($"col0" isin(filterNotInItems:_*))).select("col0")
s.explain(true)
s.show
assert(s.count() == df.count())
}

test("IN filter stack overflow") {
val df = withCatalog(catalog)
val items = (0 to 4).map{i => s"xaz$i"}
val filterInItems = Seq("row001") ++: items

val s = df.filter($"col0" isin(filterInItems:_*)).select("col0")
s.explain(true)
s.show()
assert(s.count() == 1)
}

test("empty column") {
val df = withCatalog(catalog)
df.registerTempTable("table0")
Expand All @@ -114,27 +136,7 @@ class DefaultSourceSuite extends SHC with Logging {
assert(s.count() == 1)
}

test("IN filter stack overflow") {
val df = withCatalog(catalog)
val items = (0 to 2000).map{i => s"xaz$i"}
val filterInItems = Seq("row001") ++: items

val s = df.filter($"col0" isin(filterInItems:_*)).select("col0")
s.explain(true)
s.show()
assert(s.count() == 1)
}

test("NOT IN filter stack overflow") {
val df = withCatalog(catalog)
val items = (0 to 2000).map{i => s"xaz$i"}
val filterNotInItems = items

val s = df.filter(not($"col0" isin(filterNotInItems:_*))).select("col0")
s.explain(true)
s.show
assert(s.count() == df.count())
}

test("IN filter, RDD") {
val scan = prunedFilterScan(catalog)
Expand Down

0 comments on commit 0a0cc25

Please sign in to comment.