diff --git a/src/main/scala/org/sparklinedata/druid/DruidRDD.scala b/src/main/scala/org/sparklinedata/druid/DruidRDD.scala index 7b6dc85..e2f003d 100644 --- a/src/main/scala/org/sparklinedata/druid/DruidRDD.scala +++ b/src/main/scala/org/sparklinedata/druid/DruidRDD.scala @@ -139,19 +139,24 @@ class DruidRDD(sqlContext: SQLContext, * the ''Spark Executor'', so it is safe to return an empty iterator. * 4. Always clear this Druid Query from the TaskCancelHandler */ + var cancelCallback : TaskCancelHandler.TaskCancelHolder = null + var dr: CloseableIterator[ResultRow] = null + var client : DruidQueryServerClient = null val qryId = mQry.context.map(_.queryId).getOrElse(s"q-${System.nanoTime()}") - val cancelCallback = TaskCancelHandler.registerQueryId(qryId, context) - val client = p.queryClient(useSmile, httpMaxPerRoute, httpMaxTotal) - client.setCancellableHolder(cancelCallback) - - val qrySTime = System.currentTimeMillis() - val qrySTimeStr = s"${new java.util.Date()}" - var dr : CloseableIterator[ResultRow] = null + var qrySTime = System.currentTimeMillis() + var qrySTimeStr = s"${new java.util.Date()}" try { + cancelCallback = TaskCancelHandler.registerQueryId(qryId, context) + client = p.queryClient(useSmile, httpMaxPerRoute, httpMaxTotal) + client.setCancellableHolder(cancelCallback) + qrySTime = System.currentTimeMillis() + qrySTimeStr = s"${new java.util.Date()}" dr = mQry.executeQuery(client) } catch { - case _ if cancelCallback.wasCancelTriggered => { dr = new DummyResultIterator() } - case e : Throwable => throw e + case _ if cancelCallback.wasCancelTriggered && client != null => { + dr = new DummyResultIterator() + } + case e: Throwable => throw e } finally { TaskCancelHandler.clearQueryId(qryId)