Skip to content

Commit

Permalink
Add application start and end metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pulakk authored and LucaCanali committed Jun 12, 2024
1 parent 7810d76 commit e2c9c9d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
51 changes: 29 additions & 22 deletions docs/Flight_recorder_mode_InfluxDBSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ docker run --name influx --network=host -d influxdb:1.8.10
- Start Spark with the InfluxDBSink Listener
```
bin/spark-shell \
--name MyAppName
--conf spark.sparkmeasure.influxdbURL="http://localhost:8086" \
--conf spark.extraListeners=ch.cern.sparkmeasure.InfluxDBSink,ch.cern.sparkmeasure.InfluxDBSinkExtended \
--conf spark.sparkmeasure.influxdbStagemetrics=true
Expand Down Expand Up @@ -112,11 +113,14 @@ Using database sparkmeasure
name: measurements
name
----
applications_ended
applications_started
executors_started
jobs_ended
jobs_started
queries_ended
queries_started
stage_metrics
stages_ended
stages_started
task_metrics
Expand All @@ -126,34 +130,37 @@ tasks_started
> show series
key
---
executors_started,applicationId=noAppId
jobs_ended,applicationId=local-1660122150941
jobs_started,applicationId=local-1660122150941
queries_ended,applicationId=local-1660122150941
queries_started,applicationId=local-1660122150941
stage_metrics,applicationId=local-1660122150941
stages_ended,applicationId=local-1660122150941
stages_started,applicationId=local-1660122150941
task_metrics,applicationId=local-1660122150941
tasks_ended,applicationId=local-1660122150941
tasks_started,applicationId=local-1660122150941
applications_ended,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
applications_started,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
executors_started,applicationId=noAppId,spark.app.name=noAppName,spark.dynamicAllocation.enabled=false
jobs_ended,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
jobs_started,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
queries_ended,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
queries_started,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
stage_metrics,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
stages_ended,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
stages_started,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
task_metrics,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
tasks_ended,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
tasks_started,applicationId=local-1718107775270,spark.app.name=MyAppName,spark.dynamicAllocation.enabled=false
> select * from queries_started
name: queries_started
time applicationId description queryId
---- ------------- ----------- -------
1660122211786000000 local-1660122150941 show at <console>:23 0
time applicationId description queryId spark.app.name spark.dynamicAllocation.enabled
---- ------------- ----------- ------- -------------- -------------------------------
1718107788602000000 local-1718107775270 show at <console>:23 0 MyAppName false
> select * from /executors/
name: executors_started
time applicationId executorHost executorId totalCores
---- ------------- ------------ ---------- ----------
1660122151091000000 noAppId pcitdbgpu1.dyndns.cern.ch driver 8
time applicationId executorHost executorId spark.app.name spark.dynamicAllocation.enabled totalCores
---- ------------- ------------ ---------- -------------- ------------------------------- ----------
1718107775713000000 noAppId ais-dev1.alphonso.tv driver noAppName false 32
> select * from stage_metrics
name: stage_metrics
time applicationId attemptNumber bytesRead bytesWritten completionTime executorCpuTime executorDeserializeCpuTime executorDeserializeTime executorRunTime failureReason jvmGCTime memoryBytesSpilled peakExecutionMemory recordsRead recordsWritten resultSerializationTime resultSize shuffleBytesWritten shuffleFetchWaitTime shuffleLocalBlocksFetched shuffleLocalBytesRead shuffleRecordsRead shuffleRecordsWritten shuffleRemoteBlocksFetched shuffleRemoteBytesRead shuffleRemoteBytesReadToDisk shuffleTotalBlocksFetched shuffleTotalBytesRead shuffleWriteTime stageId submissionTime
---- ------------- ------------- --------- ------------ -------------- --------------- -------------------------- ----------------------- --------------- ------------- --------- ------------------ ------------------- ----------- -------------- ----------------------- ---------- ------------------- -------------------- ------------------------- --------------------- ------------------ --------------------- -------------------------- ---------------------- ---------------------------- ------------------------- --------------------- ---------------- ------- --------------
1660122213061000000 local-1660122150941 0 0 0 1660122213061 82265168 1039200151 3171 301 144 0 0 1000 0 7 12119 0 0 0 0 0 0 0 0 0 0 0 0 0 1660122212450
1660122213630000000 local-1660122150941 0 0 0 1660122213630 2180200879 49937229 155 2491 0 0 0 1000 0 7 16134 472 0 0 0 0 8 0 0 0 0 0 65588303 1 1660122213240
1660122213764000000 local-1660122150941 0 0 0 1660122213764 38334075 2650585 2 39 0 0 0 0 0 0 2667 0 0 8 472 8 0 0 0 0 8 472 0 3 1660122213711
time applicationId attemptNumber bytesRead bytesWritten completionTime executorCpuTime executorDeserializeCpuTime executorDeserializeTime executorRunTime failureReason jvmGCTime memoryBytesSpilled peakExecutionMemory recordsRead recordsWritten resultSerializationTime resultSize shuffleBytesWritten shuffleFetchWaitTime shuffleLocalBlocksFetched shuffleLocalBytesRead shuffleRecordsRead shuffleRecordsWritten shuffleRemoteBlocksFetched shuffleRemoteBytesRead shuffleRemoteBytesReadToDisk shuffleTotalBlocksFetched shuffleTotalBytesRead shuffleWriteTime spark.app.name spark.dynamicAllocation.enabled stageId submissionTime
---- ------------- ------------- --------- ------------ -------------- --------------- -------------------------- ----------------------- --------------- ------------- --------- ------------------ ------------------- ----------- -------------- ----------------------- ---------- ------------------- -------------------- ------------------------- --------------------- ------------------ --------------------- -------------------------- ---------------------- ---------------------------- ------------------------- --------------------- ---------------- -------------- ------------------------------- ------- --------------
1718107790934000000 local-1718107775270 0 0 0 1718107790934 600768868 6769045421 30598 4508 9248 0 0 1000 0 273 50592 0 0 0 0 0 0 0 0 0 0 0 0 MyAppName false 0 1718107789534
1718107791715000000 local-1718107775270 0 0 0 1718107791715 7957128399 312604886 958 14359 0 0 0 1000 0 35 64934 1880 0 0 0 0 32 0 0 0 0 0 419858111 MyAppName false 1 1718107791154
1718107791910000000 local-1718107775270 0 0 0 1718107791910 81533655 4458869 4 90 0 0 0 0 0 0 4006 0 0 32 1880 32 0 0 0 0 32 1880 0 MyAppName false 3 1718107791799
```

## List of the Task Metrics collected by InfluxDBSink
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {

var appId = "noAppId"
var appName = "noAppName"
var dynamicAllocationEnabled = conf.get("spark.dynamicAllocation.enabled", "false")

appId = SparkSession.getActiveSession match {
case Some(sparkSession) => sparkSession.sparkContext.applicationId
Expand All @@ -89,6 +90,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("executors_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.addField("executorId", executorId)
.addField("executorHost", executorInfo.executorHost)
.addField("totalCores", executorInfo.totalCores)
Expand All @@ -104,6 +106,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("stages_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.addField("stageId", stageId)
.addField("attemptNUmber", attemptNumber)
.time(submissionTime, TimeUnit.MILLISECONDS)
Expand All @@ -120,6 +123,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point1 = Point.measurement("stages_ended")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(completionTime, TimeUnit.MILLISECONDS)
.addField("stageId", stageId)
.addField("attemptNumber", attemptNumber)
Expand All @@ -132,6 +136,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point2 = Point.measurement("stage_metrics")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(completionTime, TimeUnit.MILLISECONDS)
.addField("stageId", stageId)
.addField("attemptNumber", attemptNumber)
Expand Down Expand Up @@ -179,6 +184,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("queries_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(startTime, TimeUnit.MILLISECONDS)
.addField("description", description)
.addField("queryId", queryId)
Expand All @@ -192,6 +198,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("queries_ended")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(endTime, TimeUnit.MILLISECONDS)
.addField("queryId", queryId)
.build()
Expand All @@ -208,6 +215,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("jobs_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(startTime, TimeUnit.MILLISECONDS)
.addField("jobID", jobId)
.build()
Expand All @@ -221,6 +229,7 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
val point = Point.measurement("jobs_ended")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(completionTime, TimeUnit.MILLISECONDS)
.addField("jobID", jobId)
.build()
Expand All @@ -230,10 +239,34 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
appId = applicationStart.appId.getOrElse("noAppId")
appName = applicationStart.appName

val point = Point.measurement("applications_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(applicationStart.time, TimeUnit.MILLISECONDS)
.field("startTime", conf.getLong("spark.app.startTime", 0))
.field("submitTime", conf.getLong("spark.app.submitTime", 0))
.field("totalCoresRequested", conf.getLong("spark.cores.max", 0))
.field("sparkDriverHost", conf.get("spark.driver.host", ""))
.field("sparkDriverPort", conf.getInt("spark.driver.port", 0))
.field("deployMode", conf.get("spark.submit.deployMode", ""))
.build()
database.write(point)
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logger.info(s"Spark application ended, timestamp = ${applicationEnd.time}, closing InfluxDB connection.")

val point = Point.measurement("applications_ended")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(applicationEnd.time, TimeUnit.MILLISECONDS)
.field("duration", (applicationEnd.time - conf.getLong("spark.app.startTime", 0))/1000)
.build()
database.write(point)

influxDB.close()
}

Expand All @@ -252,6 +285,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf
val point = Point.measurement("tasks_started")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(taskInfo.launchTime, TimeUnit.MICROSECONDS)
.addField("taskId", taskInfo.taskId)
.addField("attemptNumber", taskInfo.attemptNumber)
Expand All @@ -267,6 +301,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf
val point1 = Point.measurement("tasks_ended")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(taskInfo.finishTime, TimeUnit.MILLISECONDS)
.addField("taskId", taskInfo.taskId)
.addField("attemptNumber", taskInfo.attemptNumber)
Expand All @@ -278,6 +313,7 @@ class InfluxDBSinkExtended(conf: SparkConf) extends InfluxDBSink(conf: SparkConf
val point2 = Point.measurement("task_metrics")
.tag("applicationId", appId)
.tag("spark.app.name", appName)
.tag("spark.dynamicAllocation.enabled", dynamicAllocationEnabled)
.time(taskInfo.finishTime, TimeUnit.MILLISECONDS)
// task info
.addField("taskId", taskInfo.taskId)
Expand Down

0 comments on commit e2c9c9d

Please sign in to comment.