forked from oleksandrabovkun/spark-ui-simulator-experiments
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Exp #1241-S - maxPartitionBytes.scala
295 lines (194 loc) · 10.8 KB
/
Exp #1241-S - maxPartitionBytes.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Databricks notebook source
// MAGIC %md
// MAGIC <table>
// MAGIC <tr>
// MAGIC <td></td>
// MAGIC <td>VM</td>
// MAGIC <td>Quantity</td>
// MAGIC <td>Total Cores</td>
// MAGIC <td>Total RAM</td>
// MAGIC </tr>
// MAGIC <tr>
// MAGIC <td>Driver:</td>
// MAGIC <td>**i3.xlarge**</td>
// MAGIC <td>**1**</td>
// MAGIC <td>**4 cores**</td>
// MAGIC <td>**30.5 GB**</td>
// MAGIC </tr>
// MAGIC <tr>
// MAGIC <td>Workers:</td>
// MAGIC <td>**i3.xlarge**</td>
// MAGIC <td>**2**</td>
// MAGIC <td>**8 cores**</td>
// MAGIC <td>**61 GB**</td>
// MAGIC </tr>
// MAGIC </table>
// COMMAND ----------
sc.setJobDescription("Step A-1: Basic initialization")
// Disable the Delta IO Cache (reduce side affects)
spark.conf.set("spark.databricks.io.cache.enabled", "false")
// What is the maximum size of each spark-partition (default value)?
val defaultMaxPartitionBytes = spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b","").toLong
// What is the cost in bytes for each file (default value)?
val openCostInBytes = spark.conf.get("spark.sql.files.openCostInBytes").toLong
displayHTML(f"""<table>
<tr><td>Max Partition Bytes:</td><td><b>$defaultMaxPartitionBytes</b> Bytes or <b>${defaultMaxPartitionBytes/1024/1024}</b> MB</td></tr>
<tr><td>Open Cost In Bytes: </td><td><b>$openCostInBytes</b> Bytes or <b>${openCostInBytes/1024/1024}</b> MB</td></tr>
</table>""")
// COMMAND ----------
sc.setJobDescription("Step A-2: Utility Function")
def predictNumPartitions(files:Seq[com.databricks.backend.daemon.dbutils.FileInfo]) = {
val openCost:Long = spark.conf.get("spark.sql.files.openCostInBytes").toLong
val maxPartitionBytes:Long = spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b","").toLong
val actualBytes = files.map(_.size).sum // Total size of the dataset on disk
val paddedBytes = actualBytes + (files.length * openCost) // Final size with padding from openCost
val bytesPerCore:Long = (paddedBytes/sc.defaultParallelism).toLong // The number of bytes per core
val maxOfCostBPC:Long = Math.max(openCost, bytesPerCore) // Larger of openCost and bytesPerCore
val targetSize:Long = Math.min(maxPartitionBytes , maxOfCostBPC) // Smaller of maxPartitionBytes and maxOfCostBPC
val partions = paddedBytes.toDouble / targetSize.toDouble // The final number of partitions (needs to be rounded up)
// Utility function to style each row
def row(label:String, value:Long, extra:String=""):String = f"""<tr><td>$label:</td><td style="text-align:right; font-weight:bold">$value%,d</td><td style="padding-left:1em">$extra</td></tr>"""
displayHTML("<table>" +
row("File Count", files.size) +
row("Actual Bytes", actualBytes) +
row("Padded Bytes", paddedBytes, "Actual_Bytes + (File_Count * Open_Cost)") +
row("Average Size", (paddedBytes/files.size).toLong) +
"""<tr><td colspan="2" style="border-top:1px solid black"> </td></tr>""" +
row("Open Cost", openCost, "spark.sql.files.openCostInBytes") +
row("Bytes-Per-Core", bytesPerCore) +
row("Max Cost", maxOfCostBPC, "(max of Open_Cost & Bytes-Per-Core)") +
"""<tr><td colspan="2" style="border-top:1px solid black"> </td></tr>""" +
row("Max Partition Bytes", maxPartitionBytes, "spark.sql.files.maxPartitionBytes") +
row("Target Size", targetSize, "(min of Max_Cost & Max_Partition_Bytes)") +
"""<tr><td colspan="2" style="border-top:1px solid black"> </td></tr>""" +
row("Number of Partions", Math.ceil(partions).toLong, f"($partions from Padded_Bytes / Target_Size)") +
"</table>")
}
// COMMAND ----------
// MAGIC %md # Test with different maxPartitionBytes
// COMMAND ----------
sc.setJobDescription("Step B: List Files")
// Source directory for this experiment's dataset
val trxPath = s"dbfs:/mnt/training/global-sales/solutions/1990-to-2009.parquet"
// Providing the schema precludes side effects that would otherwise skew benchmarks
val trxSchema = "transacted_at timestamp, trx_id integer, retailer_id integer, description string, amount decimal(38,2), city_id integer, new_at timestamp"
// All the parquet files in this dataset
val trxFiles = dbutils.fs.ls(trxPath).filter(_.name.endsWith(".parquet"))
display(trxFiles)
// COMMAND ----------
sc.setJobDescription("Step C: Read at 1x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 1}b"
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step D: Read at 2x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 2}b"
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step E: Read at 4x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 4}b"
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step F: Read at 8x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 8}b" // ~1 GB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step G: Read at 16x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 16}b" // ~2 GB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step H: Read at 32x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 32}b" // ~4 GB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
sc.setJobDescription("Step I: Read at 64x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 64}b" // ~8 GB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
// MAGIC %md # One Partition Per File
// COMMAND ----------
sc.setJobDescription("Step J-1: List One-To-One Files")
// Source directory for this experiment's dataset
val otoPath = "dbfs:/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet"
// The list of files and schema for this dataset
val otoFiles = dbutils.fs.ls(otoPath).filter(_.name.endsWith(".parquet"))
// Providing the schema precludes side effects that would otherwise skew benchmarks
val otoSchema = "timestamp string, site string, requests integer"
display(otoFiles)
// COMMAND ----------
sc.setJobDescription("Step J-2: Read One-To-One at 1x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 1}b" // ~128 MB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(otoFiles)
spark.read.schema(otoSchema).parquet(otoPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
// MAGIC %md
// MAGIC # Multiple Files to One Partition
// COMMAND ----------
sc.setJobDescription("Step K-1: List Many-To-One Files")
// Source directory for this experiment's dataset
val mtoPath = "dbfs:/mnt/training/crime-data-2016/Crime-Data-Philadelphia-2016.parquet"
// All the parquet files in this dataset
val mtoFiles = dbutils.fs.ls(mtoPath).filter(_.name.endsWith(".parquet"))
// Providing the schema precludes side effects that would otherwise skew benchmarks
val mtoSchema = "district integer, dispatch_date_time timestamp, dispatch_date timestamp, dispatch_time string, hour integer, unique_id long, location_block string, ucr_general integer, text_general_code string, point_x double, point_y double, lat double, lng double, ucr_general_description string"
display(mtoFiles)
// COMMAND ----------
sc.setJobDescription("Step K-2: Read Many-To-One at 1x")
val maxPartitionBytesConf = f"${defaultMaxPartitionBytes * 1}b" // ~128 MB
spark.conf.set("spark.sql.files.maxPartitionBytes", maxPartitionBytesConf)
predictNumPartitions(mtoFiles)
spark.read.schema(mtoSchema).parquet(mtoPath) // Load the transactions table
.write.format("noop").mode("overwrite").save() // Test with a noop write
// COMMAND ----------
// MAGIC %md
// MAGIC # Auto-tune maxPartitionBytes
// COMMAND ----------
sc.setJobDescription("Step L-1: Autotune maxPartitionBytes Function")
def autoTuneMaxPartitionBytes(format:String, path:String, schema:String, maxSteps:Int, startingBytes:Long=134217728):Long = {
var cores = sc.defaultParallelism
var maxPartitionBytes:Long = startingBytes
val originalMaxPartitionBytes = spark.conf.get("spark.sql.files.maxPartitionBytes")
for (step <- 0 to maxSteps) {
maxPartitionBytes = maxPartitionBytes + (step * 1024 * 1024)
val maxPartitionMB = maxPartitionBytes / 1024 / 1024
spark.conf.set("spark.sql.files.maxPartitionBytes", f"${maxPartitionBytes}b")
val partitions = spark.read.format(format).schema(schema).load(path).rdd.getNumPartitions
if (partitions % cores == 0) {
println("*** Found it! ***")
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
return maxPartitionBytes
} else {
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
}
}
spark.conf.set("spark.sql.files.maxPartitionBytes", originalMaxPartitionBytes)
throw new IllegalArgumentException("An appropriate maxPartitionBytes was not found")
}
// COMMAND ----------
sc.setJobDescription("Step L-2: Autotuned maxPartitionBytes")
val maxPartitionBytes = autoTuneMaxPartitionBytes("parquet", trxPath, trxSchema, 100)
println("-"*80)
println(f"Final Answer: $maxPartitionBytes%,d bytes or ${maxPartitionBytes/1024/1024}%,d MB")
println("-"*80)