From 6358768d41829269f4af47f1198a140bfa4f3f6b Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Mon, 1 Jun 2020 22:30:46 +0300 Subject: [PATCH] Added notes on monitoring Spark applications via Python and the Spark UI --- notes/stv/20200601-spark-monitoring.txt | 145 ++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 notes/stv/20200601-spark-monitoring.txt diff --git a/notes/stv/20200601-spark-monitoring.txt b/notes/stv/20200601-spark-monitoring.txt new file mode 100644 index 00000000..2366ee7f --- /dev/null +++ b/notes/stv/20200601-spark-monitoring.txt @@ -0,0 +1,145 @@ +# +# +# +# Copyright (c) 2020, ROE (http://www.roe.ac.uk/) +# +# This information is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This information is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# + + +# Monitoring and Using the Spark UI +# --------------------------------- + +In the UI: + + +- The Jobs Tab + +Shows currently executing and completed jobs, number of tasks per job, duration of executiion etc.. + + + +- The Stages Tab + +Shows us all the stages for all jobs, as well as the amount of input processed and output created by all of the stages. Here we can monitor how much data is getting shuffled, which may indicate incorrect partitioning of our data. + + + +- DAG Page + +Shows how Spark schedules the stages of a job, which allows us to visualize potential bottlenecks and shuffle steps. + + + +- Storage Tab + +Use to find out the status of our cache, RDD storage level etc.. + +Use "SparkContext.getRDDStorageInfo()" to get information for storage for an RDD (i.e. size of cache in memory, size of cache on disk) + + + %spark.pyspark + from operator import truediv + storage_info = sc._jsc.sc().getRDDStorageInfo() + + [{ + "memSize": s.memSize(), + "numPartitions": s.numPartitions(), + "numCachedPartitions": s.numCachedPartitions(), + "fractionCached": truediv(s.numCachedPartitions(), s.numPartitions()) + } for s in storage_info] + + + +REST API: + +Use the following IP to get metrics in JSON format + + masterip=stv-dev-master + + http://${masterip:?}:8088/proxy/application_1588261403747_0012/api/v1/applications/application_1588261403747_0012/jobs + + +## Base Params + +%spark.pyspark +appid = sc._jsc.sc().applicationId() +host = "stv-dev-master" +port = "8088" +baseurl = "http://{0}:{1}/proxy/{2}/api/v1/applications/{2}/".format( + host, port, sc.applicationId +) + + + + +## Get Storage info +""" +Provides information about: + +Memory size. +Total number of partitions. +Number of cached partitions. +""" + + +import requests +url = baseurl + "storage/rdd/" +print(url) + +[r.json() for r in [ + requests.get("{0}{1}".format(url, rdd.get("id"))) for + rdd in requests.get(url).json() +] if r.status_code == 200] + + + + +## Get Job info + +import requests +url = baseurl + "jobs" +jobs_json = requests.get(url).json() +print([[obj.get("name"),obj.get("status")] for obj in jobs_json]) + + + +## Get Stages + +import requests +url = baseurl + "stages" +jobs_json = requests.get(url).json() +print(jobs_json) + + + +## Get Executors + +import requests +url = baseurl + "executors" +jobs_json = requests.get(url).json() +print(jobs_json) + + + + +# Debugging Query plans +# ---------------------------------- +We can get a description of an RDD and its history using the "toDebugString()" method on an RDD +We can also check the query plan through the Dataframe API using the "explain()" method. + + +