diff --git a/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-commons.js b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-commons.js index c29568ec52be9..9dab659fb7ca0 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-commons.js +++ b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-commons.js @@ -63,6 +63,73 @@ function applyNotApplicableCheck(value){ } } +/* +* Utility function to convert milliseconds value in human readable +* form Eg "2 days 14 hrs 2 mins" +*/ +function formatDurationVerbose(ms) { + + function stringify(num, unit) { + if (num <= 0) { + return ""; + } else if (num == 1) { + return num + " "+ unit; + } else { + return num + " "+ unit+'s'; + } + } + + var second = 1000; + var minute = 60 * second; + var hour = 60 * minute; + var day = 24 * hour; + var week = 7 * day; + var year = 365 * day; + + var msString = ""; + if (ms >= second && ms % second == 0) { + msString = ""; + } else { + msString = (ms % second) + " ms"; + } + + var secString = stringify(parseInt((ms % minute) / second), "sec"); + var minString = stringify(parseInt((ms % hour) / minute), "min"); + var hrString = stringify(parseInt((ms % day) / hour), "hr"); + var dayString = stringify(parseInt((ms % week) / day), "day"); + var wkString = stringify(parseInt((ms % year) / week), "wk"); + var yrString = stringify(parseInt(ms / year), "yr"); + + var finalString = msString; + + if(ms >= second ) { + finalString = secString + " " + finalString; + } + + if(ms >= minute ) { + finalString = minString + " " + finalString; + } + + if(ms >= hour ) { + finalString = hrString + " " + finalString; + } + + if(ms >= day ) { + finalString = dayString + " " + hrString + " " + minString; + } + + if(ms >= week ) { + finalString = wkString + " " + finalString; + } + + if(ms >= year ) { + finalString = yrString + " " + wkString + " " + hrString; + } + + return finalString; + +} + /* * Utility function to convert given value in Bytes to KB or MB or GB or TB * diff --git a/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.css b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.css new file mode 100644 index 0000000000000..2621385fd4628 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.css @@ -0,0 +1,172 @@ +/* Snappy streaming CSS */ + +#AutoUpdateErrorMsgContainer { + position: absolute; + width: 100%; + margin-top: -60px; +} + +#AutoUpdateErrorMsg { + width: 30%; + max-height: 60px; + background-color: #F8DFDF; + border: 2px solid red; + border-radius: 10px; + z-index: 2; + position: relative; + margin: 5px auto; + padding: 0px 10px; + overflow: auto; + display: none; + text-align: center; + font-weight: bold; +} + +.main-container { + width: 100%; + margin-top: 15px; +} + +.left-navigation-panel { + float: left; + width: 15%; + min-width: 250px; + height: 100%; + border: solid #B1B1B1 1px; + background-color: #F1F1F1; +} + +.right-details-panel { + width: 84%; + height: 100%; + float: right; + padding-left: 10px; + border: solid #B1B1B1 1px; + background-color: #F1F1F1; +} + +.vertical-menu-heading { + width: 100%; +} + +.vertical-menu-heading div { + padding: 12px; + font-weight: bold; + font-size: large; + text-align: center; + background: #A0A0A0; +} + +.vertical-menu { + width: 100%; +} + +.vertical-menu a { + background-color: #EEE; + color: black; + display: block; + padding: 12px; + text-decoration: none; +} + +.vertical-menu a:hover { + background-color: #E5E5E5; +} + +.vertical-menu a.active { + background-color: #CCC; + color: black; +} + +.details-section { + text-align: center; + padding-left: 5px; + padding-right: 5px; +} + +.basic-details { + width: 98%; + min-width: 250px; + float: left; + margin: 10px; + border: solid 2px darkgray; + border-radius: 10px; + line-height: 25px; +} + +.basic-details-title { + float: left; + padding: 10px; + width: 50%; + font-size: medium; + font-weight: bold; +} + +.basic-details > div { + text-align: left; + width: 25%; + float: left; +} + +.basic-details-value { + padding: 10px; +} + +.stats-block { + min-width: 200px; + width: 15%; + height: 100px; + display: inline-block; + margin: 10px; + border: solid 2px darkgray; + border-radius: 10px; +} + +.stats-block > div { + margin: 10px; + width: auto; + height: 80%; +} + +.stats-block-title { + height: 50px; + font-size: large; + font-weight: bold; +} + +.stats-block-value { + font-size: 20px; +} + +.graph-container { + min-width: 250px; + width: 23%; + height: 200px; + display: inline-block; + margin: 10px; + border: solid 1px darkgray; + /*box-shadow: 5px 5px 5px grey;*/ +} + +#selectedQueryTitle { + float: left; + margin: 10px; + padding: 10px; + font-size: 18px; + font-weight: bold; + text-align: left; +} + +#selectedQueryName { + float: left; + margin: 10px; + padding: 10px; + font-size: 18px; + /*font-weight: bold;*/ + text-align: left; +} + +/* datatable row selection */ +table.dataTable tbody tr.queryselected { + background-color: #c6ccd7; +} \ No newline at end of file diff --git a/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.js b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.js new file mode 100644 index 0000000000000..9693b16184b72 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/snappydata/snappy-streaming.js @@ -0,0 +1,505 @@ + +function displayQueryStatistics(queryId) { + var queryStats = {}; + if (streamingQueriesGridData.length > 0) { + if (selectedQueryUUID == "") { + queryStats = streamingQueriesGridData[0]; + } else { + queryStats = streamingQueriesGridData.find(obj => obj.queryUUID == queryId); + if (queryStats == undefined) { + queryStats = streamingQueriesGridData[0]; + } + } + } else { // return if data is not present + return; + } + + // set current selected query and highlight it in query navigation panel + selectedQueryUUID = queryStats.queryUUID; + + var divList = $('#streamingQueriesGrid tbody tr td div'); + for (var i=0 ; i< divList.length ; i++) { + if (divList[i].innerText == selectedQueryUUID) { + var tr = divList[i].parentNode.parentNode; + $(tr).toggleClass('queryselected'); + break; + } + } + + $("#selectedQueryName").html(queryStats.queryName); + $("#startDateTime").html(queryStats.queryStartTimeText); + $("#uptime").html( + formatDurationVerbose(queryStats.queryUptime).toLocaleString(navigator.language)); + $("#triggerInterval").html( + formatDurationVerbose(queryStats.trendEventsInterval).toLocaleString(navigator.language)); + $("#numBatchesProcessed").html(queryStats.numBatchesProcessed); + var statusText = ""; + if (queryStats.isActive) { + statusText = 'Active'; + } else { + statusText = 'Inactive'; + } + $("#status").html(statusText); + + $("#totalInputRows").html(queryStats.totalInputRows.toLocaleString(navigator.language)); + + var qIRPSTrend = queryStats.inputRowsPerSecondTrend; + $("#currInputRowsPerSec").html( + qIRPSTrend[qIRPSTrend.length - 1].toLocaleString(navigator.language)); + + var qPRPSTrend = queryStats.processedRowsPerSecondTrend; + $("#currProcessedRowsPerSec").html( + qPRPSTrend[qPRPSTrend.length - 1].toLocaleString(navigator.language)); + + var qTPT = queryStats.totalProcessingTime; + $("#totalProcessingTime").html( + formatDurationVerbose(qTPT).toLocaleString(navigator.language)); + + var qAPT = queryStats.avgProcessingTime; + $("#avgProcessingTime").html( + formatDurationVerbose(qAPT).toLocaleString(navigator.language)); + + updateCharts(queryStats); + + $("#sourcesDetailsContainer").html(generateSourcesStats(queryStats.sources)); + $("#sinkDetailsContainer").html(generateSinkStats(queryStats.sink)); + +} + +function generateSourcesStats(sources) { + selectedQuerySourcesGridData = sources; + selectedQuerySourcesGrid.clear().rows.add(selectedQuerySourcesGridData).draw(); +} + +function generateSinkStats(sink) { + selectedQuerySinkGridData = [sink]; + selectedQuerySinkGrid.clear().rows.add(selectedQuerySinkGridData).draw(); +} + +// Streaming Sources +const SOURCETYPE_JVM = "JVMSOURCE"; +const SOURCETYPE_JDBC = "JDBCSOURCE"; +const SOURCETYPE_FILESTREAM = "FILESTREAMSOURCE"; +const SOURCETYPE_TEXTSOCKET = "TEXTSOCKETSOURCE"; +const SOURCETYPE_MEMORY = "MEMORYSTREAM"; +const SOURCETYPE_STREAMING = "STREAMINGSOURCE"; +const SOURCETYPE_KAFKA = "KAFKASOURCE"; + +// Streaming Sinks +const SINKTYPE_CONSOLE = "CONSOLESINK"; +const SINKTYPE_MEMORY = "MEMORYSINK"; +const SINKTYPE_FOREACH = "FOREACHSINK"; +const SINKTYPE_FILESTREAM = "FILESTREAMSINK"; +const SINKTYPE_SNAPPYSTORE = "SNAPPYSTORESINK"; +const SINKTYPE_KAFKA = "KAFKASINK"; +const SINKTYPE_CSV = "CSVSINK"; +const SINKTYPE_JMX = "JMXSINK"; +const SINKTYPE_SLF4J = "SLF4JSINK"; +const SINKTYPE_METRICSSERVLET = "METRICSSERVLET"; +const SINKTYPE_GRAPHITE = "GRAPHITESINK"; +const SINKTYPE_GANGLIA = "GANGLIASINK"; + +function getStreamingSourceType(srcDesc) { + var srcType = ""; + if (srcDesc.toUpperCase().includes(SOURCETYPE_JVM)) { + srcType = "JVM"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_JDBC)) { + srcType = "JDBC"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_FILESTREAM)) { + srcType = "File Stream"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_TEXTSOCKET)) { + srcType = "Text Socket"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_MEMORY)) { + srcType = "Memory"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_STREAMING)) { + srcType = "Streaming"; + } else if (srcDesc.toUpperCase().includes(SOURCETYPE_KAFKA)) { + srcType = "KAFKA"; + } + return srcType; +} + +function getStreamingSinkType(sinkDesc) { + var sinkType = ""; + if (sinkDesc.toUpperCase().includes(SINKTYPE_CONSOLE)) { + sinkType = "Console"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_MEMORY)) { + sinkType = "Memory"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_FOREACH)) { + sinkType = "ForEach"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_FILESTREAM)) { + sinkType = "File Stream"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_SNAPPYSTORE)) { + sinkType = "Snappy Store"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_KAFKA)) { + sinkType = "KAFKA"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_CSV)) { + sinkType = "CSV"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_JMX)) { + sinkType = "JMX"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_SLF4J)) { + sinkType = "SLF4J"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_METRICSSERVLET)) { + sinkType = "Metrics Servlet"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_GRAPHITE)) { + sinkType = "Graphite"; + } else if (sinkDesc.toUpperCase().includes(SINKTYPE_GANGLIA)) { + sinkType = "Ganglia"; + } + return sinkType; +} + +function updateCharts(queryStats) { + // Load charts library if not already loaded + if(!isGoogleChartLoaded) { + // Set error message + $("#googleChartsErrorMsg").show(); + return; + } + + var numInputRowsChartData = new google.visualization.DataTable(); + numInputRowsChartData.addColumn('datetime', 'Time of Day'); + numInputRowsChartData.addColumn('number', 'Input Records'); + + var inputVsProcessedRowsChartData = new google.visualization.DataTable(); + inputVsProcessedRowsChartData.addColumn('datetime', 'Time of Day'); + inputVsProcessedRowsChartData.addColumn('number', 'Input Records Per Sec'); + inputVsProcessedRowsChartData.addColumn('number', 'Processed Records Per Sec'); + + var processingTimeChartData = new google.visualization.DataTable(); + processingTimeChartData.addColumn('datetime', 'Time of Day'); + processingTimeChartData.addColumn('number', 'Processing Threshold'); + processingTimeChartData.addColumn('number', 'Processing Time'); + + var stateOperatorsStatsChartData = new google.visualization.DataTable(); + stateOperatorsStatsChartData.addColumn('datetime', 'Time of Day'); + stateOperatorsStatsChartData.addColumn('number', 'Total Records'); + + var intervalValue = queryStats.trendEventsInterval; + var timeLine = queryStats.timeLine; + var numInputRowsTrend = queryStats.numInputRowsTrend; + var inputRowsPerSecondTrend = queryStats.inputRowsPerSecondTrend; + var processedRowsPerSecondTrend = queryStats.processedRowsPerSecondTrend; + var processingTimeTrend = queryStats.processingTimeTrend; + var stateOpNumRowsTotalTrend = queryStats.stateOpNumRowsTotalTrend; + + for(var i=0 ; i < timeLine.length ; i++) { + var timeX = new Date(timeLine[i]); + + numInputRowsChartData.addRow([ + timeX, + numInputRowsTrend[i]]); + + inputVsProcessedRowsChartData.addRow([ + timeX, + inputRowsPerSecondTrend[i], + processedRowsPerSecondTrend[i]]); + + processingTimeChartData.addRow([ + timeX, + intervalValue, + processingTimeTrend[i]]); + + stateOperatorsStatsChartData.addRow([ + timeX, + stateOpNumRowsTotalTrend[i]]); + } + + numInputRowsChartOptions = { + title: 'Input Records', + // curveType: 'function', + legend: { position: 'bottom' }, + colors:['#2139EC'], + crosshair: { trigger: 'focus' }, + hAxis: { + format: 'HH:mm' + } + }; + + inputVsProcessedRowsChartOptions = { + title: 'Input Rate vs Processing Rate', + // curveType: 'function', + legend: { position: 'bottom' }, + colors:['#2139EC', '#E67E22'], + crosshair: { trigger: 'focus' }, + hAxis: { + format: 'HH:mm' + } + }; + + processingTimeChartOptions = { + title: 'Processing Time', + // curveType: 'function', + legend: { position: 'bottom' }, + colors:['#ff0000', '#2139EC'], + crosshair: { trigger: 'focus' }, + hAxis: { + format: 'HH:mm' + }, + series: { + 0: { + lineWidth: 1, + visibleInLegend: false, + pointsVisible: false + } + } + }; + + stateOperatorsStatsChartOptions = { + title: 'Aggregation States', + // curveType: 'function', + legend: { position: 'bottom' }, + colors:['#2139EC'], + crosshair: { trigger: 'focus' }, + hAxis: { + format: 'HH:mm' + } + }; + + // display state operator chart and other charts resizing accordingly + if(stateOpNumRowsTotalTrend.length == 0) { + $('#stateOperatorContainer').css("display", "none"); + $('#inputTrendsContainer').css("width", "31%"); + $('#processingTrendContainer').css("width", "31%"); + $('#processingTimeContainer').css("width", "31%"); + } else { + $('#inputTrendsContainer').css("width", "23%"); + $('#processingTrendContainer').css("width", "23%"); + $('#processingTimeContainer').css("width", "23%"); + $('#stateOperatorContainer').css("display", ""); + $('#stateOperatorContainer').css("width", "23%"); + var stateOperatorsStatsChart = new google.visualization.LineChart( + document.getElementById('stateOperatorContainer')); + stateOperatorsStatsChart.draw(stateOperatorsStatsChartData, + stateOperatorsStatsChartOptions); + } + + var numInputRowsChart = new google.visualization.LineChart( + document.getElementById('inputTrendsContainer')); + numInputRowsChart.draw(numInputRowsChartData, + numInputRowsChartOptions); + + var inputVsProcessedRowsChart = new google.visualization.LineChart( + document.getElementById('processingTrendContainer')); + inputVsProcessedRowsChart.draw(inputVsProcessedRowsChartData, + inputVsProcessedRowsChartOptions); + + var processingTimeChart = new google.visualization.LineChart( + document.getElementById('processingTimeContainer')); + processingTimeChart.draw(processingTimeChartData, + processingTimeChartOptions); + +} + +function getQuerySourcesGridConf() { + // Streaming Queries Source Grid Data Table Configurations + var querySourcesGridConf = { + data: selectedQuerySourcesGridData, + "dom": '', + "columns": [ + { // Source type + data: function(row, type) { + var descHtml = '
' + + getStreamingSourceType(row.description) + + '
'; + return descHtml; + }, + "orderable": true + }, + { // Source description + data: function(row, type) { + var descHtml = '
' + + row.description + + '
'; + return descHtml; + }, + "orderable": true + }, + { // Input Rows + data: function(row, type) { + var irValue = ""; + if (isNaN(row.numInputRows)) { + irValue = "NA"; + } else{ + irValue = row.numInputRows.toLocaleString(navigator.language); + } + var irHtml = '
' + + irValue + + '
'; + return irHtml; + }, + "orderable": false + }, + { // Input Rows Per Second + data: function(row, type) { + var irpsValue = ""; + if (isNaN(row.inputRowsPerSecond)) { + irpsValue = "NA"; + } else{ + irpsValue = Math.round(row.inputRowsPerSecond).toLocaleString(navigator.language); + } + var irpsHtml = '
' + + irpsValue + + '
'; + return irpsHtml; + }, + "orderable": false + }, + { // Processed Rows Per Second + data: function(row, type) { + var prpsValue = ""; + if (isNaN(row.processedRowsPerSecond)) { + prpsValue = "NA"; + } else{ + prpsValue = Math.round(row.processedRowsPerSecond).toLocaleString(navigator.language); + } + var prpsHtml = '
' + + prpsValue + + '
'; + return prpsHtml; + }, + "orderable": false + } + ] + } + return querySourcesGridConf; +} + +function getQuerySinkGridConf() { + // Streaming Queries Sink Grid Data Table Configurations + var querySinkGridConf = { + data: selectedQuerySinkGridData, + "dom": '', + "columns": [ + { // Sink type + data: function(row, type) { + var descHtml = '
' + + getStreamingSinkType(row.description) + + '
'; + return descHtml; + }, + "orderable": true + }, + { // Sink description + data: function(row, type) { + var descHtml = '
' + + row.description + + '
'; + return descHtml; + }, + "orderable": true + } + ] + } + return querySinkGridConf; +} + +function getStreamingQueriesGridConf() { + // Streaming Queries Grid Data Table Configurations + var streamingQueriesGridConf = { + data: streamingQueriesGridData, + "dom": '', + "columns": [ + { // Query Names + data: function(row, type) { + var qNameHtml = '
' + row.queryUUID + '
' + + '
' + + row.queryName + + '
'; + return qNameHtml; + }, + "orderable": true + } + ] + } + return streamingQueriesGridConf; +} + +function addDataTableSingleRowSelectionHandler(tableId) { + $('#' + tableId + ' tbody').on( 'click', 'tr', function () { + $('#' + tableId + ' tbody').children('.queryselected').toggleClass('queryselected'); + // $(this).toggleClass('queryselected'); + displayQueryStatistics($(this).children().children().first().text()); + } ); +} + +function loadStreamingStatsInfo() { + + if(!isGoogleChartLoaded) { + $.ajax({ + url: "https://www.gstatic.com/charts/loader.js", + dataType: "script", + success: function() { + loadGoogleCharts(); + } + }); + } + + $.ajax({ + url:"/snappy-streaming/services/streams", + dataType: 'json', + // timeout: 5000, + success: function (response, status, jqXHR) { + // Hide error message, if displayed + $("#AutoUpdateErrorMsg").hide(); + + streamingQueriesGridData = response[0].allQueries; + streamingQueriesGrid.clear().rows.add(streamingQueriesGridData).draw(); + + // Display currently selected queries stats + displayQueryStatistics(selectedQueryUUID); + + }, + error: ajaxRequestErrorHandler + }); +} + +function loadGoogleCharts() { + + if((typeof google === 'object' && typeof google.charts === 'object')) { + $("#googleChartsErrorMsg").hide(); + google.charts.load('current', {'packages':['corechart']}); + google.charts.setOnLoadCallback(googleChartsLoaded); + isGoogleChartLoaded = true; + } else { + $("#googleChartsErrorMsg").show(); + } + +} + +function googleChartsLoaded() { + loadStreamingStatsInfo(); +} + +var isGoogleChartLoaded = false; +var streamingQueriesGrid; +var streamingQueriesGridData = []; +var selectedQueryUUID = ""; +var selectedQuerySourcesGrid; +var selectedQuerySourcesGridData = []; +var selectedQuerySinkGrid; +var selectedQuerySinkGridData = []; + +$(document).ready(function() { + + loadGoogleCharts(); + + $.ajaxSetup({ + cache : false + }); + + // Members Grid Data Table + streamingQueriesGrid = $('#streamingQueriesGrid').DataTable( getStreamingQueriesGridConf() ); + addDataTableSingleRowSelectionHandler('streamingQueriesGrid'); + + selectedQuerySourcesGrid = $('#querySourcesGrid').DataTable( getQuerySourcesGridConf() ); + selectedQuerySinkGrid = $('#querySinkGrid').DataTable( getQuerySinkGridConf() ); + + var streamingStatsUpdateInterval = setInterval(function() { + loadStreamingStatsInfo(); + }, 5000); + + + +}); \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 315f9a28c76c6..4e4545df78409 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -645,6 +645,22 @@ object SQLConf { .intConf .createWithDefault(100) + // For SnappyData + val STREAMING_UI_RUNNING_QUERIES_DISPLAY_LIMIT = + SQLConfigBuilder("spark.sql.streaming.uiRunningQueriesDisplayLimit") + .doc("The number of running streaming queries to be displayed on UI." + + "Default value is 20") + .intConf + .createWithDefault(20) + + // For SnappyData + val STREAMING_UI_TRENDS_MAX_SAMPLE_SIZE = + SQLConfigBuilder("spark.sql.streaming.uiTrendsMaxSampleSize") + .doc("The number of maximum historical data points to be displayed on UI." + + "Default value is 60 (i.e 60 data points)") + .intConf + .createWithDefault(60) + val NDV_MAX_ERROR = SQLConfigBuilder("spark.sql.statistics.ndv.maxError") .internal() @@ -720,6 +736,11 @@ class SQLConf extends Serializable with Logging { def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION) + def streamingUIRunningQueriesDisplayLimit: Int = + getConf(STREAMING_UI_RUNNING_QUERIES_DISPLAY_LIMIT) + + def streamingUITrendsMaxSampleSize: Int = getConf(STREAMING_UI_TRENDS_MAX_SAMPLE_SIZE) + def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f3dde480eabe0..97caf55bb4850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -46,6 +46,8 @@ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.status.api.v1.SnappyStreamingApiRootResource +import org.apache.spark.ui.SnappyStreamingTab import org.apache.spark.util.Utils @@ -711,6 +713,38 @@ class SparkSession private( } } + private def updateUI() = { + val ssqListener = new SnappyStreamingQueryListener(sparkContext) + this.streams.addListener(ssqListener) + + if (sparkContext.ui.isDefined) { + logInfo("Updating Web UI to add structure streaming tab.") + sparkContext.ui.foreach(ui => { + var structStreamTabPresent: Boolean = false + val tabsList = ui.getTabs + // Add remaining tabs in tabs list + tabsList.foreach(tab => { + // Check if Structure Streaming Tab is present or not + if (tab.prefix.equalsIgnoreCase("structurestreaming")) { + structStreamTabPresent = true + logInfo("Structure Streaming UI Tab is already present.") + } + }) + // Add Structure Streaming Tab, iff not present + if (!structStreamTabPresent) { + logInfo("Creating Structure Streaming UI Tab") + // Streaming web service + ui.attachHandler(SnappyStreamingApiRootResource.getServletHandler(ui)) + // Streaming tab + new SnappyStreamingTab(ui, ssqListener) + } + }) + logInfo("Updating Web UI to add structure streaming tab is Done.") + } + } + + updateUI(); + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6d8ba79317f97..cc871a84d1203 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -246,7 +246,7 @@ class StreamExecution( } // `postEvent` does not throw non fatal exception. - postEvent(new QueryStartedEvent(id, runId, name)) + postEvent(new QueryStartedEvent(id, runId, name, trigger)) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala new file mode 100644 index 0000000000000..1aca5daf80e1e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingQueryListener.scala @@ -0,0 +1,54 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkContext + +class SnappyStreamingQueryListener(sparkContext: SparkContext) extends StreamingQueryListener { + + val streamingRepo = StreamingRepository.getInstance + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + val queryName = { + if (event.name == null || event.name.isEmpty) { + event.id.toString + } else { + event.name + } + } + + streamingRepo.addQuery(event.id, + new StreamingQueryStatistics( + event.id, + queryName, + event.runId, + System.currentTimeMillis(), + event.trigger)) + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + streamingRepo.updateQuery(event.progress) + } + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + streamingRepo.setQueryStatus(event.id, false) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 6b871b1fe6857..703865fe0b878 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -96,7 +96,8 @@ object StreamingQueryListener { class QueryStartedEvent private[sql]( val id: UUID, val runId: UUID, - val name: String) extends Event + val name: String, + val trigger: Trigger = ProcessingTime(0L)) extends Event /** * :: Experimental :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala new file mode 100644 index 0000000000000..9f2da07eea14c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingRepository.scala @@ -0,0 +1,227 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package org.apache.spark.sql.streaming + +import java.text.SimpleDateFormat +import java.util.{Date, TimeZone, UUID} + +import scala.collection.mutable.HashMap + +import org.apache.commons.collections.buffer.CircularFifoBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.ui.UIUtils + +class StreamingRepository extends Logging { + + private lazy val MAX_RUNNING_QUERIES_TO_RETAIN = + SparkSession.getActiveSession.get.sqlContext.conf.streamingUIRunningQueriesDisplayLimit + + private val allQueries = HashMap.empty[UUID, StreamingQueryStatistics] + + def getAllQueries: HashMap[UUID, StreamingQueryStatistics] = this.allQueries + + def addQuery(qid: UUID, sqs: StreamingQueryStatistics): Unit = { + if (allQueries.size < MAX_RUNNING_QUERIES_TO_RETAIN) { + this.allQueries.put(qid, sqs) + } else { + var qidToRemove: Option[UUID] = None + var queryStartTime: Option[Long] = None + val qidList = this.allQueries.keySet + qidList.foreach(qid => { + val sqs = this.allQueries.get(qid).get + if (!sqs.isActive) { + if (queryStartTime.isEmpty) { + queryStartTime = Some(sqs.queryStartTime) + qidToRemove = Some(sqs.queryUUID) + } else if (sqs.queryStartTime < queryStartTime.get) { + queryStartTime = Some(sqs.queryStartTime) + qidToRemove = Some(sqs.queryUUID) + } + } + }) + + if (qidToRemove.nonEmpty) { + this.allQueries.remove(qidToRemove.get) + this.allQueries.put(qid, sqs) + } else { + logWarning(s" Can not add new streaming queries as limit of " + + "running streaming queries to be displayed is reached to max limit" + + MAX_RUNNING_QUERIES_TO_RETAIN) + } + } + } + + def updateQuery(sqp: StreamingQueryProgress): Unit = { + if (this.allQueries.contains(sqp.id)) { + val sqs = this.allQueries.get(sqp.id).get + sqs.updateQueryStatistics(sqp) + } else { + logWarning("Streaming query entry is not present in streaming queries repository object.") + } + } + + def setQueryStatus(qid: UUID, status: Boolean): Unit = { + this.allQueries.get(qid).get.setStatus(status) + } +} + +object StreamingRepository { + + private val _instance: StreamingRepository = new StreamingRepository + + def getInstance: StreamingRepository = _instance +} + + +class StreamingQueryStatistics ( + qId: UUID, + qName: String, + runId: UUID, + startTime: Long, + trigger: Trigger = ProcessingTime(0L)) { + + private val MAX_SAMPLE_SIZE = + SparkSession.getActiveSession.get.sqlContext.conf.streamingUITrendsMaxSampleSize + + private val simpleDateFormat = new SimpleDateFormat("dd-MMM-YYYY hh:mm:ss") + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + + val queryUUID: UUID = qId + val queryName: String = qName + val queryStartTime: Long = startTime + val queryStartTimeText: String = simpleDateFormat.format(startTime) + var queryUptime: Long = 0L + var queryUptimeText: String = "" + + var runUUID: UUID = runId + val trendEventsInterval: Long = trigger.asInstanceOf[ProcessingTime].intervalMs + + var isActive: Boolean = true + + var currentBatchId: Long = -1L + + var sources = Array.empty[SourceProgress] + var sink: SinkProgress = null + + var totalInputRows: Long = 0L + var avgInputRowsPerSec: Double = 0.0 + var avgProcessedRowsPerSec: Double = 0.0 + var totalProcessingTime: Long = 0L + var avgProcessingTime: Double = 0.0 + var numBatchesProcessed: Long = 0L + + val timeLine = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val numInputRowsTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val inputRowsPerSecondTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val processedRowsPerSecondTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val processingTimeTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val batchIds = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + + var currStateOpNumRowsTotal = 0L + var currStateOpNumRowsUpdated = 0L + val stateOpNumRowsTotalTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + val stateOpNumRowsUpdatedTrend = new CircularFifoBuffer(MAX_SAMPLE_SIZE) + + def updateQueryStatistics(progress: StreamingQueryProgress): Unit = { + + if (this.currentBatchId < progress.batchId) { + this.numBatchesProcessed = this.numBatchesProcessed + 1 + } + + this.currentBatchId = progress.batchId + this.batchIds.add(progress.batchId) + + val currDateTime: Date = timestampFormat.parse(progress.timestamp) + this.queryUptime = currDateTime.getTime - this.queryStartTime + this.queryUptimeText = UIUtils.formatDurationVerbose(this.queryUptime) + + this.timeLine.add(currDateTime.getTime) + + val tmpNumInpRows = { + if (progress.numInputRows.isNaN) 0 else progress.numInputRows + } + this.numInputRowsTrend.add(tmpNumInpRows) + this.totalInputRows = this.totalInputRows + tmpNumInpRows + + val tmpInputRowsPerSec = { + if (progress.inputRowsPerSecond.isNaN) 0.0 else progress.inputRowsPerSecond + } + this.inputRowsPerSecondTrend.add(tmpInputRowsPerSec) + this.avgInputRowsPerSec = calcAvgOfGivenTrend(this.inputRowsPerSecondTrend) + + val tmpProcessedRowsPerSec = { + if (progress.processedRowsPerSecond.isNaN) 0.0 else progress.processedRowsPerSecond + } + this.processedRowsPerSecondTrend.add(tmpProcessedRowsPerSec) + this.avgProcessedRowsPerSec = calcAvgOfGivenTrend(this.processedRowsPerSecondTrend) + + val tmpProcessingTime = progress.durationMs.get("triggerExecution") + this.processingTimeTrend.add(tmpProcessingTime) + this.totalProcessingTime = this.totalProcessingTime + tmpProcessingTime + this.avgProcessingTime = this.totalProcessingTime / this.numBatchesProcessed + + this.sources = progress.sources + this.sink = progress.sink + + val stateOperators = progress.stateOperators + if (stateOperators.size > 0) { + + var sumAllSTNumRowsTotal = 0L + var sumAllSTNumRowsUpdated = 0L + + stateOperators.foreach(so => { + sumAllSTNumRowsTotal = sumAllSTNumRowsTotal + so.numRowsTotal + sumAllSTNumRowsUpdated = sumAllSTNumRowsUpdated + so.numRowsUpdated + }) + + if (currStateOpNumRowsTotal < sumAllSTNumRowsTotal) { + this.currStateOpNumRowsTotal = sumAllSTNumRowsTotal + } + this.stateOpNumRowsTotalTrend.add(sumAllSTNumRowsTotal) + + if (currStateOpNumRowsUpdated < sumAllSTNumRowsUpdated) { + this.currStateOpNumRowsUpdated = sumAllSTNumRowsUpdated + } + this.stateOpNumRowsUpdatedTrend.add(sumAllSTNumRowsUpdated) + + } + } + + def calcAvgOfGivenTrend (trend: CircularFifoBuffer) : Double = { + val arrValues = trend.toArray() + var sumOfElements = 0.0 + + arrValues.foreach(value => { + sumOfElements = sumOfElements + value.asInstanceOf[Double] + }) + + val avgValue = sumOfElements / arrValues.size + + avgValue + } + + def setStatus (status: Boolean): Unit = { + this.isActive = status + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala new file mode 100644 index 0000000000000..dbce172bdcf81 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SnappyStreamingApiRootResource.scala @@ -0,0 +1,62 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs._ + +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} +import org.glassfish.jersey.server.ServerProperties +import org.glassfish.jersey.servlet.ServletContainer + + +/** + * Main entry point for serving snappy/spark web application data as json, using JAX-RS. + * + * Each resource should have endpoints that return **public** classes defined in snappy-api.scala. + * Mima binary compatibility checks ensure that we don't inadvertently make changes that break the + * api. + * The returned objects are automatically converted to json by jackson with JacksonMessageWriter. + * In addition, there are a number of tests in HistoryServerSuite that compare the json to "golden + * files". Any changes and additions should be reflected there as well -- see the notes in + * HistoryServerSuite. + */ + +// todo : need to add tests to test below apis + +@Path("/services") +class SnappyStreamingApiRootResource extends ApiRequestContext { + + @Path("streams") + def getStreams(): StreamsInfoResource = { + new StreamsInfoResource + } +} + +private[spark] object SnappyStreamingApiRootResource { + + def getServletHandler(uiRoot: UIRoot): ServletContextHandler = { + val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) + jerseyContext.setContextPath("/snappy-streaming") + val holder: ServletHolder = new ServletHolder(classOf[ServletContainer]) + holder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.spark.status.api.v1") + UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot) + jerseyContext.addServlet(holder, "/*") + jerseyContext + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala new file mode 100644 index 0000000000000..182dd04a2a8b3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/StreamsInfoResource.scala @@ -0,0 +1,39 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.core.MediaType +import javax.ws.rs.{GET, Produces} + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.streaming.StreamingRepository + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class StreamsInfoResource { + @GET + def streamInfo(): Seq[StreamsSummary] = { + val streamingRepo = StreamingRepository.getInstance + + val streamsBuff: ListBuffer[StreamsSummary] = ListBuffer.empty[StreamsSummary] + streamsBuff += new StreamsSummary (streamingRepo.getAllQueries.values.toList) + + streamsBuff.toList + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala new file mode 100644 index 0000000000000..e40b141d7f6eb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/streamapi.scala @@ -0,0 +1,28 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.status.api.v1 + +import org.apache.spark.sql.streaming.StreamingQueryStatistics + + +class StreamsSummary private[spark]( + // val activeQueries: mutable.HashMap[UUID, String], + // val inactiveQueries: mutable.HashMap[UUID, String], + val allQueries: Seq[StreamingQueryStatistics] +) \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala new file mode 100644 index 0000000000000..1964ae9e204a6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStreamingTab.scala @@ -0,0 +1,38 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package org.apache.spark.ui + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.streaming.SnappyStreamingQueryListener + +class SnappyStreamingTab (sparkUI: SparkUI, streamingListener: SnappyStreamingQueryListener) + extends SparkUITab(sparkUI, "structurestreaming") with Logging { + + override val name = "Structured Streaming" + + val parent = sparkUI + val listener = streamingListener + + attachPage(new SnappyStructuredStreamingPage(this)) + // Attach Tab + parent.attachTab(this) + // parent.attachHandler(SnappyStreamingApiRootResource.getServletHandler(parent)) + +} diff --git a/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala new file mode 100644 index 0000000000000..9005a4a1231ac --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/ui/SnappyStructuredStreamingPage.scala @@ -0,0 +1,373 @@ +/* + * Changes for SnappyData data platform. + * + * Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package org.apache.spark.ui + + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.internal.Logging + +private[ui] class SnappyStructuredStreamingPage(parent: SnappyStreamingTab) + extends WebUIPage("") with Logging { + + def commonHeaderNodesSnappy: Seq[Node] = { + + + + + + + } + + override def render(request: HttpServletRequest): Seq[Node] = { + + val pageHeaderText: String = SnappyStructuredStreamingPage.pageHeaderText + + val pageContent = commonHeaderNodesSnappy ++ createMainContent + + UIUtils.headerSparkPage(pageHeaderText, pageContent, parent, Some(500), useDataTables = true) + + } + + private def createMainContent: Seq[Node] = { + val navPanel = createNavigationPanel + val detailsPanel = createQueryDetailsPanel + val connErrorMsgNode = { +
+
+
+
+ } + val mainPanel = { +
+ {navPanel ++ detailsPanel} +
+ } + + connErrorMsgNode ++ mainPanel + + } + + private def createNavigationPanel: Seq[Node] = { +
+
+ + + + + + +
+ + { SnappyStructuredStreamingPage.leftNavPanelTitle } + +
+
+
+ } + + private def createSourcesTable: Seq[Node] = { +
+ + + + + + + + + + +
+ + { SnappyStructuredStreamingPage.streamingStats("srcType") } + + + + { SnappyStructuredStreamingPage.streamingStats("srcDescription") } + + + + { SnappyStructuredStreamingPage.streamingStats("srcInputRecords") } + + + + { SnappyStructuredStreamingPage.streamingStats("srcInputRate") } + + + + { SnappyStructuredStreamingPage.streamingStats("srcProcessingRate") } + +
+
+ } + + private def createSinkTable: Seq[Node] = { +
+ + + + + + + +
+ + { SnappyStructuredStreamingPage.streamingStats("snkType") } + + + + { SnappyStructuredStreamingPage.streamingStats("snkDescription") } + +
+
+ } + + private def createQueryDetailsPanel: Seq[Node] = { +
+ {createQueryDetailsEntry} +
+ } + + private def createQueryDetailsEntry: Seq[Node] = { +
+
+
+ { SnappyStructuredStreamingPage.streamingStats("queryName") }: +
+
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("startDateTime") } +
+
 
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("uptime") } +
+
 
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("triggerInterval") } +
+
 
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("batchesProcessed") } +
+
 
+
+
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("status") } +
+
 
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("totalInputRows") } +
+
 
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("currInputRowsPerSec") } +
+
 
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("currProcessedRowsPerSec") } +
+
 
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("totalProcessingTime") } +
+
 
+
+
+
+
+
+ { SnappyStructuredStreamingPage.streamingStats("avgProcessingTime") } +
+
 
+
+
+
+
+ +
+
+
+
+
+
+
+
+
+
+ +
+
+
+
+ { SnappyStructuredStreamingPage.sourcesTitle } +
+
+
+
+ { createSourcesTable } +
+
+
+
+ { SnappyStructuredStreamingPage.sinkTitle } +
+
+
+
+ { createSinkTable } +
+
+ } +} + +object SnappyStructuredStreamingPage { + val pageHeaderText = "Structured Streaming Queries" + + val streamingStats = scala.collection.mutable.HashMap.empty[String, Any] + streamingStats += ("queryName" -> "Query Name") + streamingStats += ("startDateTime" -> "Start Date & Time") + streamingStats += ("uptime" -> "Uptime") + streamingStats += ("status" -> "Status") + streamingStats += ("triggerInterval" -> "Trigger Interval") + streamingStats += ("batchesProcessed" -> "Batches Processed") + streamingStats += ("totalInputRows" -> "Total Input Records") + streamingStats += ("currInputRowsPerSec" -> "Current Input Rate") + streamingStats += ("currProcessedRowsPerSec" -> "Current Processing Rate") + streamingStats += ("totalProcessingTime" -> "Total Batch Processing Time") + streamingStats += ("avgProcessingTime" -> "Avg. Batch Processing Time") + streamingStats += ("srcType" -> "Type") + streamingStats += ("srcDescription" -> "Description") + streamingStats += ("srcInputRecords" -> "Input Records") + streamingStats += ("srcInputRate" -> "Input Rate") + streamingStats += ("srcProcessingRate" -> "Processing Rate") + streamingStats += ("snkType" -> "Type") + streamingStats += ("snkDescription" -> "Description") + + val tooltips = scala.collection.mutable.HashMap.empty[String, String] + tooltips += ("leftNavPanelTitle" -> "Streaming Query Names") + tooltips += ("queryName" -> "Streaming Query Name") + tooltips += ("startDateTime" -> "Date & time when streaming query started its execution") + tooltips += ("uptime" -> "Total time since streaming query started its execution") + tooltips += ("triggerInterval" -> "Configured triggering interval for batches") + tooltips += ("batchesProcessed" -> "Number of batches processed since execution its started") + tooltips += ("status" -> "Streaming query status (Active / Inactive)") + tooltips += ("totalInputRows" -> "Total number of input records received since execution started") + tooltips += ("currInputRowsPerSec" -> "Records / second received in current trigger interval") + tooltips += ("currProcessedRowsPerSec" -> + "Records processed / second in current trigger interval") + tooltips += ("totalProcessingTime" -> + "Total processing time of all batches received since execution is started") + tooltips += ("avgProcessingTime" -> "Average processing time per batch") + tooltips += ("sources" -> "Streaming queries sources") + tooltips += ("srcType" -> "Type of streaming query source") + tooltips += ("srcDescription" -> "Description of streaming query source") + tooltips += ("srcInputRecords" -> "Number of records received from source in current interval") + tooltips += ("srcInputRate" -> + "Number of records / second received from source in current interval") + tooltips += ("srcProcessingRate" -> "Number of records processed / second in current interval") + tooltips += ("sink" -> "Streaming queries sink") + tooltips += ("snkDescription" -> "Description of streaming query sink") + tooltips += ("snkType" -> "Type of streaming query sink") + + val googleChartsErrorMsg = "Error while loading charts. Please check your internet connection." + + val leftNavPanelTitle = "Query Names" + val sourcesTitle = "Sources" + val sinkTitle = "Sink" + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index f0d40e607742d..2e06754cf2ef3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -208,9 +208,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(newEvent.id === event.id) assert(newEvent.runId === event.runId) assert(newEvent.name === event.name) + assert(newEvent.trigger === event.trigger) } - testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name")) + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", + ProcessingTime("1 second"))) testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null)) }