Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
fix selectItr pagination issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Harish Butani committed Dec 24, 2016
1 parent aed1c08 commit d25c2cf
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.json4s._
import org.json4s.jackson.Json4sScalaModule
import org.json4s.jackson.JsonMethods._
import org.sparklinedata.druid.client._
import org.sparklinedata.druid.{CloseableIterator, SelectSpec, Utils}
import org.sparklinedata.druid._
import scala.collection.mutable.{Map => MMap}


private class DruidSelectResultIterator(val useSmile : Boolean,
Expand All @@ -52,10 +53,16 @@ private class DruidSelectResultIterator(val useSmile : Boolean,
private var jValDeser: JsonDeserializer[JValue] = _
private var t: JsonToken = _
private var currSelectResultContainerTS: String = _
private var nextPagingIdentifiers: Map[String, Int] = _
private val nextPagingIdentifiers = MMap[String, Int]()

consumeNextStream(is)

private def setNextPagingIdentifiers(resultPgIds : Map[String, Int]) = {
resultPgIds.foreach {
case (k,v) => nextPagingIdentifiers(k) = v
}
}

private def transferState(nextIt : DruidSelectResultIterator) : Unit = {
m = nextIt.m
jF = nextIt.jF
Expand All @@ -66,7 +73,7 @@ private class DruidSelectResultIterator(val useSmile : Boolean,
jValDeser = nextIt.jValDeser
t = nextIt.t
currSelectResultContainerTS = nextIt.currSelectResultContainerTS
nextPagingIdentifiers = nextIt.nextPagingIdentifiers
setNextPagingIdentifiers(nextIt.nextPagingIdentifiers.toMap)
}

def consumeNextStream(is: InputStream): Unit = {
Expand All @@ -92,7 +99,7 @@ private class DruidSelectResultIterator(val useSmile : Boolean,
t = jp.nextToken() // 1.2.1.v pIds value // START_OBJECT
// t = jp.nextToken // FIELD_NAME
val pagingIdentifiersJV: JsonAST.JValue = jValDeser.deserialize(jp, ctxt)
nextPagingIdentifiers = pagingIdentifiersJV.extract[Map[String, Int]]
setNextPagingIdentifiers(pagingIdentifiersJV.extract[Map[String, Int]])

// nextPagingIdentifiers = nextPagingIdentifiers.map {
// case (s,i) if !selectSpec.descending => (s, i + 1)
Expand All @@ -114,12 +121,12 @@ private class DruidSelectResultIterator(val useSmile : Boolean,
null
} else {
thisRoundHadData = false
val nextSelectSpec = selectSpec.withPagingIdentifier(nextPagingIdentifiers)
transferState(
val nextSelectSpec = selectSpec.withPagingIdentifier(nextPagingIdentifiers.toMap)
transferState(
nextSelectSpec.executeQuery(druidQuerySvrConn).asInstanceOf[DruidSelectResultIterator]
)
getNext
}
)
getNext
}
} else {
val o: JsonAST.JValue = jValDeser.deserialize(jp, ctxt)
val r = o.extract[SelectResultRow]
Expand All @@ -132,6 +139,10 @@ private class DruidSelectResultIterator(val useSmile : Boolean,
override protected def close(): Unit = {
onDone()
}

override def toString : String = {
s"DruidSelectResultIterator : ${hashCode()}"
}
}

private class DruidSelectResultIterator2(val useSmile : Boolean,
Expand All @@ -146,17 +157,26 @@ private class DruidSelectResultIterator2(val useSmile : Boolean,
import Utils._

protected var thisRoundHadData: Boolean = false
private val nextPagingIdentifiers = MMap[String, Int]()

consumeNextStream(is)

var onDone = initialOnDone
var currResult: SelectResult = _
var currIt: Iterator[SelectResultRow] = _

private def setNextPagingIdentifiers(resultPgIds : Map[String, Int]) = {
resultPgIds.foreach {
case (k,v) => nextPagingIdentifiers(k) = v
}
}

private def transferState(nextIt : DruidSelectResultIterator2) : Unit = {
thisRoundHadData = nextIt.thisRoundHadData
onDone = nextIt.onDone
currResult = nextIt.currResult
currIt = nextIt.currIt
setNextPagingIdentifiers(nextIt.currResult.pagingIdentifiers)
}

private def consumeNextStream(is: InputStream): Unit = {
Expand All @@ -168,6 +188,7 @@ private class DruidSelectResultIterator2(val useSmile : Boolean,
val jV = parse(s)
currResult = jV.extract[SelectResultContainer].result
currIt = currResult.events.iterator
setNextPagingIdentifiers(currResult.pagingIdentifiers)
}

override protected def getNext(): SelectResultRow = {
Expand All @@ -181,7 +202,7 @@ private class DruidSelectResultIterator2(val useSmile : Boolean,
null
} else {
thisRoundHadData = false
val nextSelectSpec = selectSpec.withPagingIdentifier(currResult.pagingIdentifiers)
val nextSelectSpec = selectSpec.withPagingIdentifier(nextPagingIdentifiers.toMap)
transferState(
nextSelectSpec.executeQuery(druidQuerySvrConn).asInstanceOf[DruidSelectResultIterator2]
)
Expand All @@ -190,7 +211,11 @@ private class DruidSelectResultIterator2(val useSmile : Boolean,
}
}

override protected def close(): Unit = onDone
override protected def close(): Unit = onDone()

override def toString : String = {
s"DruidSelectResultIterator2 : ${hashCode()}"
}
}

object DruidSelectResultIterator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,19 @@ class SelectQueryTest extends StarSchemaBaseTest with BeforeAndAfterAll with Log
true
)

test("selectIterator",
"""
|select count(*)
|from (
|select sin(o_orderkey/100)
|from orderLineItemPartSupplier_select
|where l_shipdate <= date '2000-02-01'
|group by sin(o_orderkey/100)
|) q
""".stripMargin,
1,
true,
true
)

}

0 comments on commit d25c2cf

Please sign in to comment.