';
+ return qNameHtml;
+ },
+ "orderable": true
+ }
+ ]
+ }
+ return streamingQueriesGridConf;
+}
+
+function addDataTableSingleRowSelectionHandler(tableId) {
+ $('#' + tableId + ' tbody').on( 'click', 'tr', function () {
+ $('#' + tableId + ' tbody').children('.queryselected').toggleClass('queryselected');
+ // $(this).toggleClass('queryselected');
+ displayQueryStatistics($(this).children().children().first().text());
+ } );
+}
+
+function loadStreamingStatsInfo() {
+
+ if(!isGoogleChartLoaded) {
+ $.ajax({
+ url: "https://www.gstatic.com/charts/loader.js",
+ dataType: "script",
+ success: function() {
+ loadGoogleCharts();
+ }
+ });
+ }
+
+ $.ajax({
+ url:"/snappy-streaming/services/streams",
+ dataType: 'json',
+ // timeout: 5000,
+ success: function (response, status, jqXHR) {
+ // Hide error message, if displayed
+ $("#AutoUpdateErrorMsg").hide();
+
+ streamingQueriesGridData = response[0].allQueries;
+ streamingQueriesGrid.clear().rows.add(streamingQueriesGridData).draw();
+
+ // Display currently selected queries stats
+ displayQueryStatistics(selectedQueryUUID);
+
+ },
+ error: ajaxRequestErrorHandler
+ });
+}
+
+function loadGoogleCharts() {
+
+ if((typeof google === 'object' && typeof google.charts === 'object')) {
+ $("#googleChartsErrorMsg").hide();
+ google.charts.load('current', {'packages':['corechart']});
+ google.charts.setOnLoadCallback(googleChartsLoaded);
+ isGoogleChartLoaded = true;
+ } else {
+ $("#googleChartsErrorMsg").show();
+ }
+
+}
+
+function googleChartsLoaded() {
+ loadStreamingStatsInfo();
+}
+
+var isGoogleChartLoaded = false;
+var streamingQueriesGrid;
+var streamingQueriesGridData = [];
+var selectedQueryUUID = "";
+var selectedQuerySourcesGrid;
+var selectedQuerySourcesGridData = [];
+var selectedQuerySinkGrid;
+var selectedQuerySinkGridData = [];
+
+$(document).ready(function() {
+
+ loadGoogleCharts();
+
+ $.ajaxSetup({
+ cache : false
+ });
+
+ // Members Grid Data Table
+ streamingQueriesGrid = $('#streamingQueriesGrid').DataTable( getStreamingQueriesGridConf() );
+ addDataTableSingleRowSelectionHandler('streamingQueriesGrid');
+
+ selectedQuerySourcesGrid = $('#querySourcesGrid').DataTable( getQuerySourcesGridConf() );
+ selectedQuerySinkGrid = $('#querySinkGrid').DataTable( getQuerySinkGridConf() );
+
+ var streamingStatsUpdateInterval = setInterval(function() {
+ loadStreamingStatsInfo();
+ }, 5000);
+
+
+
+});
\ No newline at end of file
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 315f9a28c76c6..4e4545df78409 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -645,6 +645,22 @@ object SQLConf {
.intConf
.createWithDefault(100)
+ // For SnappyData
+ val STREAMING_UI_RUNNING_QUERIES_DISPLAY_LIMIT =
+ SQLConfigBuilder("spark.sql.streaming.uiRunningQueriesDisplayLimit")
+ .doc("The number of running streaming queries to be displayed on UI." +
+ "Default value is 20")
+ .intConf
+ .createWithDefault(20)
+
+ // For SnappyData
+ val STREAMING_UI_TRENDS_MAX_SAMPLE_SIZE =
+ SQLConfigBuilder("spark.sql.streaming.uiTrendsMaxSampleSize")
+ .doc("The number of maximum historical data points to be displayed on UI." +
+ "Default value is 60 (i.e 60 data points)")
+ .intConf
+ .createWithDefault(60)
+
val NDV_MAX_ERROR =
SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
.internal()
@@ -720,6 +736,11 @@ class SQLConf extends Serializable with Logging {
def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
+ def streamingUIRunningQueriesDisplayLimit: Int =
+ getConf(STREAMING_UI_RUNNING_QUERIES_DISPLAY_LIMIT)
+
+ def streamingUITrendsMaxSampleSize: Int = getConf(STREAMING_UI_TRENDS_MAX_SAMPLE_SIZE)
+
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f3dde480eabe0..97caf55bb4850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -46,6 +46,8 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
+import org.apache.spark.status.api.v1.SnappyStreamingApiRootResource
+import org.apache.spark.ui.SnappyStreamingTab
import org.apache.spark.util.Utils
@@ -711,6 +713,38 @@ class SparkSession private(
}
}
+ private def updateUI() = {
+ val ssqListener = new SnappyStreamingQueryListener(sparkContext)
+ this.streams.addListener(ssqListener)
+
+ if (sparkContext.ui.isDefined) {
+ logInfo("Updating Web UI to add structure streaming tab.")
+ sparkContext.ui.foreach(ui => {
+ var structStreamTabPresent: Boolean = false
+ val tabsList = ui.getTabs
+ // Add remaining tabs in tabs list
+ tabsList.foreach(tab => {
+ // Check if Structure Streaming Tab is present or not
+ if (tab.prefix.equalsIgnoreCase("structurestreaming")) {
+ structStreamTabPresent = true
+ logInfo("Structure Streaming UI Tab is already present.")
+ }
+ })
+ // Add Structure Streaming Tab, iff not present
+ if (!structStreamTabPresent) {
+ logInfo("Creating Structure Streaming UI Tab")
+ // Streaming web service
+ ui.attachHandler(SnappyStreamingApiRootResource.getServletHandler(ui))
+ // Streaming tab
+ new SnappyStreamingTab(ui, ssqListener)
+ }
+ })
+ logInfo("Updating Web UI to add structure streaming tab is Done.")
+ }
+ }
+
+ updateUI();
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6d8ba79317f97..cc871a84d1203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -246,7 +246,7 @@ class StreamExecution(
}
// `postEvent` does not throw non fatal exception.
- postEvent(new QueryStartedEvent(id, runId, name))
+ postEvent(new QueryStartedEvent(id, runId, name, trigger))
// Unblock starting thread
startLatch.countDown()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala
new file mode 100644
index 0000000000000..1aca5daf80e1e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala
@@ -0,0 +1,54 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkContext
+
+class SnappyStreamingQueryListener(sparkContext: SparkContext) extends StreamingQueryListener {
+
+ val streamingRepo = StreamingRepository.getInstance
+
+ override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+ val queryName = {
+ if (event.name == null || event.name.isEmpty) {
+ event.id.toString
+ } else {
+ event.name
+ }
+ }
+
+ streamingRepo.addQuery(event.id,
+ new StreamingQueryStatistics(
+ event.id,
+ queryName,
+ event.runId,
+ System.currentTimeMillis(),
+ event.trigger))
+ }
+
+ override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+ streamingRepo.updateQuery(event.progress)
+ }
+
+ override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
+ streamingRepo.setQueryStatus(event.id, false)
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 6b871b1fe6857..703865fe0b878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -96,7 +96,8 @@ object StreamingQueryListener {
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
- val name: String) extends Event
+ val name: String,
+ val trigger: Trigger = ProcessingTime(0L)) extends Event
/**
* :: Experimental ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala
new file mode 100644
index 0000000000000..9f2da07eea14c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala
@@ -0,0 +1,227 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.text.SimpleDateFormat
+import java.util.{Date, TimeZone, UUID}
+
+import scala.collection.mutable.HashMap
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.ui.UIUtils
+
+class StreamingRepository extends Logging {
+
+ private lazy val MAX_RUNNING_QUERIES_TO_RETAIN =
+ SparkSession.getActiveSession.get.sqlContext.conf.streamingUIRunningQueriesDisplayLimit
+
+ private val allQueries = HashMap.empty[UUID, StreamingQueryStatistics]
+
+ def getAllQueries: HashMap[UUID, StreamingQueryStatistics] = this.allQueries
+
+ def addQuery(qid: UUID, sqs: StreamingQueryStatistics): Unit = {
+ if (allQueries.size < MAX_RUNNING_QUERIES_TO_RETAIN) {
+ this.allQueries.put(qid, sqs)
+ } else {
+ var qidToRemove: Option[UUID] = None
+ var queryStartTime: Option[Long] = None
+ val qidList = this.allQueries.keySet
+ qidList.foreach(qid => {
+ val sqs = this.allQueries.get(qid).get
+ if (!sqs.isActive) {
+ if (queryStartTime.isEmpty) {
+ queryStartTime = Some(sqs.queryStartTime)
+ qidToRemove = Some(sqs.queryUUID)
+ } else if (sqs.queryStartTime < queryStartTime.get) {
+ queryStartTime = Some(sqs.queryStartTime)
+ qidToRemove = Some(sqs.queryUUID)
+ }
+ }
+ })
+
+ if (qidToRemove.nonEmpty) {
+ this.allQueries.remove(qidToRemove.get)
+ this.allQueries.put(qid, sqs)
+ } else {
+ logWarning(s" Can not add new streaming queries as limit of " +
+ "running streaming queries to be displayed is reached to max limit" +
+ MAX_RUNNING_QUERIES_TO_RETAIN)
+ }
+ }
+ }
+
+ def updateQuery(sqp: StreamingQueryProgress): Unit = {
+ if (this.allQueries.contains(sqp.id)) {
+ val sqs = this.allQueries.get(sqp.id).get
+ sqs.updateQueryStatistics(sqp)
+ } else {
+ logWarning("Streaming query entry is not present in streaming queries repository object.")
+ }
+ }
+
+ def setQueryStatus(qid: UUID, status: Boolean): Unit = {
+ this.allQueries.get(qid).get.setStatus(status)
+ }
+}
+
+object StreamingRepository {
+
+ private val _instance: StreamingRepository = new StreamingRepository
+
+ def getInstance: StreamingRepository = _instance
+}
+
+
+class StreamingQueryStatistics (
+ qId: UUID,
+ qName: String,
+ runId: UUID,
+ startTime: Long,
+ trigger: Trigger = ProcessingTime(0L)) {
+
+ private val MAX_SAMPLE_SIZE =
+ SparkSession.getActiveSession.get.sqlContext.conf.streamingUITrendsMaxSampleSize
+
+ private val simpleDateFormat = new SimpleDateFormat("dd-MMM-YYYY hh:mm:ss")
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+
+ val queryUUID: UUID = qId
+ val queryName: String = qName
+ val queryStartTime: Long = startTime
+ val queryStartTimeText: String = simpleDateFormat.format(startTime)
+ var queryUptime: Long = 0L
+ var queryUptimeText: String = ""
+
+ var runUUID: UUID = runId
+ val trendEventsInterval: Long = trigger.asInstanceOf[ProcessingTime].intervalMs
+
+ var isActive: Boolean = true
+
+ var currentBatchId: Long = -1L
+
+ var sources = Array.empty[SourceProgress]
+ var sink: SinkProgress = null
+
+ var totalInputRows: Long = 0L
+ var avgInputRowsPerSec: Double = 0.0
+ var avgProcessedRowsPerSec: Double = 0.0
+ var totalProcessingTime: Long = 0L
+ var avgProcessingTime: Double = 0.0
+ var numBatchesProcessed: Long = 0L
+
+ val timeLine = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val numInputRowsTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val inputRowsPerSecondTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val processedRowsPerSecondTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val processingTimeTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val batchIds = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+
+ var currStateOpNumRowsTotal = 0L
+ var currStateOpNumRowsUpdated = 0L
+ val stateOpNumRowsTotalTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+ val stateOpNumRowsUpdatedTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE)
+
+ def updateQueryStatistics(progress: StreamingQueryProgress): Unit = {
+
+ if (this.currentBatchId < progress.batchId) {
+ this.numBatchesProcessed = this.numBatchesProcessed + 1
+ }
+
+ this.currentBatchId = progress.batchId
+ this.batchIds.add(progress.batchId)
+
+ val currDateTime: Date = timestampFormat.parse(progress.timestamp)
+ this.queryUptime = currDateTime.getTime - this.queryStartTime
+ this.queryUptimeText = UIUtils.formatDurationVerbose(this.queryUptime)
+
+ this.timeLine.add(currDateTime.getTime)
+
+ val tmpNumInpRows = {
+ if (progress.numInputRows.isNaN) 0 else progress.numInputRows
+ }
+ this.numInputRowsTrend.add(tmpNumInpRows)
+ this.totalInputRows = this.totalInputRows + tmpNumInpRows
+
+ val tmpInputRowsPerSec = {
+ if (progress.inputRowsPerSecond.isNaN) 0.0 else progress.inputRowsPerSecond
+ }
+ this.inputRowsPerSecondTrend.add(tmpInputRowsPerSec)
+ this.avgInputRowsPerSec = calcAvgOfGivenTrend(this.inputRowsPerSecondTrend)
+
+ val tmpProcessedRowsPerSec = {
+ if (progress.processedRowsPerSecond.isNaN) 0.0 else progress.processedRowsPerSecond
+ }
+ this.processedRowsPerSecondTrend.add(tmpProcessedRowsPerSec)
+ this.avgProcessedRowsPerSec = calcAvgOfGivenTrend(this.processedRowsPerSecondTrend)
+
+ val tmpProcessingTime = progress.durationMs.get("triggerExecution")
+ this.processingTimeTrend.add(tmpProcessingTime)
+ this.totalProcessingTime = this.totalProcessingTime + tmpProcessingTime
+ this.avgProcessingTime = this.totalProcessingTime / this.numBatchesProcessed
+
+ this.sources = progress.sources
+ this.sink = progress.sink
+
+ val stateOperators = progress.stateOperators
+ if (stateOperators.size > 0) {
+
+ var sumAllSTNumRowsTotal = 0L
+ var sumAllSTNumRowsUpdated = 0L
+
+ stateOperators.foreach(so => {
+ sumAllSTNumRowsTotal = sumAllSTNumRowsTotal + so.numRowsTotal
+ sumAllSTNumRowsUpdated = sumAllSTNumRowsUpdated + so.numRowsUpdated
+ })
+
+ if (currStateOpNumRowsTotal < sumAllSTNumRowsTotal) {
+ this.currStateOpNumRowsTotal = sumAllSTNumRowsTotal
+ }
+ this.stateOpNumRowsTotalTrend.add(sumAllSTNumRowsTotal)
+
+ if (currStateOpNumRowsUpdated < sumAllSTNumRowsUpdated) {
+ this.currStateOpNumRowsUpdated = sumAllSTNumRowsUpdated
+ }
+ this.stateOpNumRowsUpdatedTrend.add(sumAllSTNumRowsUpdated)
+
+ }
+ }
+
+ def calcAvgOfGivenTrend (trend: CircularFifoBuffer) : Double = {
+ val arrValues = trend.toArray()
+ var sumOfElements = 0.0
+
+ arrValues.foreach(value => {
+ sumOfElements = sumOfElements + value.asInstanceOf[Double]
+ })
+
+ val avgValue = sumOfElements / arrValues.size
+
+ avgValue
+ }
+
+ def setStatus (status: Boolean): Unit = {
+ this.isActive = status
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala
new file mode 100644
index 0000000000000..dbce172bdcf81
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala
@@ -0,0 +1,62 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs._
+
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+import org.glassfish.jersey.server.ServerProperties
+import org.glassfish.jersey.servlet.ServletContainer
+
+
+/**
+ * Main entry point for serving snappy/spark web application data as json, using JAX-RS.
+ *
+ * Each resource should have endpoints that return **public** classes defined in snappy-api.scala.
+ * Mima binary compatibility checks ensure that we don't inadvertently make changes that break the
+ * api.
+ * The returned objects are automatically converted to json by jackson with JacksonMessageWriter.
+ * In addition, there are a number of tests in HistoryServerSuite that compare the json to "golden
+ * files". Any changes and additions should be reflected there as well -- see the notes in
+ * HistoryServerSuite.
+ */
+
+// todo : need to add tests to test below apis
+
+@Path("/services")
+class SnappyStreamingApiRootResource extends ApiRequestContext {
+
+ @Path("streams")
+ def getStreams(): StreamsInfoResource = {
+ new StreamsInfoResource
+ }
+}
+
+private[spark] object SnappyStreamingApiRootResource {
+
+ def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
+ val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+ jerseyContext.setContextPath("/snappy-streaming")
+ val holder: ServletHolder = new ServletHolder(classOf[ServletContainer])
+ holder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.spark.status.api.v1")
+ UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
+ jerseyContext.addServlet(holder, "/*")
+ jerseyContext
+ }
+}
\ No newline at end of file
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala
new file mode 100644
index 0000000000000..182dd04a2a8b3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala
@@ -0,0 +1,39 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.core.MediaType
+import javax.ws.rs.{GET, Produces}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.streaming.StreamingRepository
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class StreamsInfoResource {
+ @GET
+ def streamInfo(): Seq[StreamsSummary] = {
+ val streamingRepo = StreamingRepository.getInstance
+
+ val streamsBuff: ListBuffer[StreamsSummary] = ListBuffer.empty[StreamsSummary]
+ streamsBuff += new StreamsSummary (streamingRepo.getAllQueries.values.toList)
+
+ streamsBuff.toList
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala
new file mode 100644
index 0000000000000..e40b141d7f6eb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala
@@ -0,0 +1,28 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.spark.status.api.v1
+
+import org.apache.spark.sql.streaming.StreamingQueryStatistics
+
+
+class StreamsSummary private[spark](
+ // val activeQueries: mutable.HashMap[UUID, String],
+ // val inactiveQueries: mutable.HashMap[UUID, String],
+ val allQueries: Seq[StreamingQueryStatistics]
+)
\ No newline at end of file
diff --git a/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala
new file mode 100644
index 0000000000000..1964ae9e204a6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala
@@ -0,0 +1,38 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.streaming.SnappyStreamingQueryListener
+
+class SnappyStreamingTab (sparkUI: SparkUI, streamingListener: SnappyStreamingQueryListener)
+ extends SparkUITab(sparkUI, "structurestreaming") with Logging {
+
+ override val name = "Structured Streaming"
+
+ val parent = sparkUI
+ val listener = streamingListener
+
+ attachPage(new SnappyStructuredStreamingPage(this))
+ // Attach Tab
+ parent.attachTab(this)
+ // parent.attachHandler(SnappyStreamingApiRootResource.getServletHandler(parent))
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala
new file mode 100644
index 0000000000000..9005a4a1231ac
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala
@@ -0,0 +1,373 @@
+/*
+ * Changes for SnappyData data platform.
+ *
+ * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.spark.ui
+
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.internal.Logging
+
+private[ui] class SnappyStructuredStreamingPage(parent: SnappyStreamingTab)
+ extends WebUIPage("") with Logging {
+
+ def commonHeaderNodesSnappy: Seq[Node] = {
+
+
+
+
+
+
+ }
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+
+ val pageHeaderText: String = SnappyStructuredStreamingPage.pageHeaderText
+
+ val pageContent = commonHeaderNodesSnappy ++ createMainContent
+
+ UIUtils.headerSparkPage(pageHeaderText, pageContent, parent, Some(500), useDataTables = true)
+
+ }
+
+ private def createMainContent: Seq[Node] = {
+ val navPanel = createNavigationPanel
+ val detailsPanel = createQueryDetailsPanel
+ val connErrorMsgNode = {
+