forked from oleksandrabovkun/spark-ui-simulator-experiments
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Exp #0000-S - Exploring the Spark UI.scala
83 lines (54 loc) · 2.17 KB
/
Exp #0000-S - Exploring the Spark UI.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Databricks notebook source
sc.setJobDescription("Step A: Basic initialization")
val dataSourcePath = s"dbfs:/mnt/training/wikipedia/pagecounts/staging_parquet_en_only_clean"
dbutils.fs.ls(dataSourcePath).map(_.name).foreach(println)
// COMMAND ----------
sc.setJobDescription("Step B: Read and cache the initial DataFrame")
val initialDF = spark
.read
.parquet(dataSourcePath)
.cache()
initialDF.foreach(x => ())
// COMMAND ----------
sc.setJobDescription("Step C: A bunch of random transformations")
import org.apache.spark.sql.functions.upper
val someDF = initialDF
.withColumn("first", upper($"article".substr(0,1)) )
.where( $"first".isin("A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z") )
.groupBy($"project", $"first").sum()
.drop("sum(bytes_served)")
.orderBy($"first", $"project")
.select($"first", $"project", $"sum(requests)".as("total"))
.filter($"total" > 10000)
val total = someDF.count().toInt
// COMMAND ----------
sc.setJobDescription("Step D: Take N records")
val all = someDF.take(total)
// COMMAND ----------
sc.setJobDescription("Step E: Create a really big DataFrame")
var bigDF = initialDF
for (i <- 0 until 7) {
bigDF = bigDF.union(bigDF).repartition(sc.defaultParallelism)
}
bigDF.write.format("noop").mode("overwrite").save()
// COMMAND ----------
sc.setJobDescription("Step F: Streaming Job")
import org.apache.spark.sql.functions.window
spark.conf.set("spark.sql.shuffle.partitions", 8)
val dataPath = "dbfs:/mnt/training/definitive-guide/data/activity-data-stream.json"
val dataSchema = "Recorded_At timestamp, Device string, Index long, Model string, User string, _corrupt_record String, gt string, x double, y double, z double"
val streamingDF = spark
.readStream
.option("maxFilesPerTrigger", 1)
.schema(dataSchema)
.json(dataPath)
.groupBy($"Device", window($"Recorded_At", "20 seconds"))
.count
.select($"window.start".as("start"), $"Device", $"count")
display(streamingDF, streamName = "Sample_Stream")
// COMMAND ----------
sc.setJobDescription("Step G: Stop stream after 30 sec")
Thread.sleep(1000*30)
for (stream <- spark.streams.active) {
stream.stop()
}