From 853f068a59cd6873d3cd2a306256fc651794963d Mon Sep 17 00:00:00 2001 From: pulakk Date: Mon, 10 Jun 2024 23:22:43 +0530 Subject: [PATCH] Add spark.app.name tag to InfluxDBSink metrics --- .../scala/ch/cern/sparkmeasure/InfluxDBSink.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala b/src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala index 037a67f..f504da2 100644 --- a/src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala @@ -75,6 +75,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { } var appId = "noAppId" + var appName = "noAppName" appId = SparkSession.getActiveSession match { case Some(sparkSession) => sparkSession.sparkContext.applicationId @@ -87,6 +88,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val startTime = executorAdded.time val point = Point.measurement("executors_started") .tag("applicationId", appId) + .tag("spark.app.name", appName) .addField("executorId", executorId) .addField("executorHost", executorInfo.executorHost) .addField("totalCores", executorInfo.totalCores) @@ -101,6 +103,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val stageId = stageSubmitted.stageInfo.stageId val point = Point.measurement("stages_started") .tag("applicationId", appId) + .tag("spark.app.name", appName) .addField("stageId", stageId) .addField("attemptNUmber", attemptNumber) .time(submissionTime, TimeUnit.MILLISECONDS) @@ -116,6 +119,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val point1 = Point.measurement("stages_ended") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(completionTime, TimeUnit.MILLISECONDS) .addField("stageId", stageId) .addField("attemptNumber", attemptNumber) @@ -127,6 +131,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val taskmetrics = stageCompleted.stageInfo.taskMetrics val point2 = Point.measurement("stage_metrics") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(completionTime, TimeUnit.MILLISECONDS) .addField("stageId", stageId) .addField("attemptNumber", attemptNumber) @@ -173,6 +178,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val point = Point.measurement("queries_started") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(startTime, TimeUnit.MILLISECONDS) .addField("description", description) .addField("queryId", queryId) @@ -185,6 +191,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val point = Point.measurement("queries_ended") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(endTime, TimeUnit.MILLISECONDS) .addField("queryId", queryId) .build() @@ -200,6 +207,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val point = Point.measurement("jobs_started") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(startTime, TimeUnit.MILLISECONDS) .addField("jobID", jobId) .build() @@ -212,6 +220,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { val point = Point.measurement("jobs_ended") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(completionTime, TimeUnit.MILLISECONDS) .addField("jobID", jobId) .build() @@ -220,7 +229,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { appId = applicationStart.appId.getOrElse("noAppId") - // val appName = applicationStart.appName + appName = applicationStart.appName } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { @@ -242,6 +251,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf val taskInfo = taskStart.taskInfo val point = Point.measurement("tasks_started") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(taskInfo.launchTime, TimeUnit.MICROSECONDS) .addField("taskId", taskInfo.taskId) .addField("attemptNumber", taskInfo.attemptNumber) @@ -256,6 +266,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf val point1 = Point.measurement("tasks_ended") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(taskInfo.finishTime, TimeUnit.MILLISECONDS) .addField("taskId", taskInfo.taskId) .addField("attemptNumber", taskInfo.attemptNumber) @@ -266,6 +277,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf val point2 = Point.measurement("task_metrics") .tag("applicationId", appId) + .tag("spark.app.name", appName) .time(taskInfo.finishTime, TimeUnit.MILLISECONDS) // task info .addField("taskId", taskInfo.taskId)