-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance #22275
[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance #22275
Conversation
if (partitionCount == numPartitions) { | ||
batchWriter.end() | ||
out.writeInt(batchOrder.length) | ||
// Batch order indices are from 0 to N-1 batches, sorted by order they arrived |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Batch order indices are from 0 to N-1 batches, sorted by order they arrived. Re-sort indices to the correct order to build a table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a slight change? // Re-order batches according to these indices to build a table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like // Sort by the output global batch indexes partition index, partition batch index tuple
?
When I was first read this code path I got confused my self so I think we should spend a bit of time on the comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, sounds good
python/pyspark/serializers.py
Outdated
@@ -187,9 +187,15 @@ def loads(self, obj): | |||
|
|||
class ArrowStreamSerializer(Serializer): | |||
""" | |||
Serializes Arrow record batches as a stream. | |||
Serializes Arrow record batches as a stream. Optionally load the ordering of the batches as a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is optional. Do we have other usage of this ArrowStreamSerializer
without the ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's also used in the createDataFrame
path, but that does only use dump_stream
. Still, it seemed best to make this an optional feature of the serializer.
Test build #95441 has finished for PR 22275 at commit
|
Thanks @viirya ! What are your thoughts @HyukjinKwon ? I consolidated the batch order serializer from before into the ArrowStreamSerializer to simplify a little. |
@holdenk I was wondering if you had any thoughts on this? Thanks! |
Sure, I'll take a look on Friday if it's not urgent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally, is this going to limit how much data to pass along because of the bit length of the index?
So the index passed to python is the RecordBatch index, not an element index, and it would limit the number of batches to Int.MAX. I wouldn't expect that would be likely and you can always set the number of batches to 1 per partition, so that would be the limiting factor then. WDYT @felixcheung ? |
got it. so the size of the each batch could grow. |
python/pyspark/sql/tests.py
Outdated
df = self.spark.range(64, numPartitions=8).toDF("a") | ||
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}): | ||
pdf, pdf_arrow = self._toPandas_arrow_toggle(df) | ||
self.assertPandasEqual(pdf, pdf_arrow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, is this test case "enough" to trigger any possible problem just by random? would increasing the number of batch or num record per batch increase the chance of streaming order or concurrency issue perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty similar to the kind of test case we could verify with something like hypothesis. Integrating hypothesis is probably too much work, but we could at least explore num partitions space in a loop quickly here. Would that help do you think @felixcheung ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, reducing memory overhead pressure in the driver is always welcome :) I got a bit confused reading the PR the first time through so I think it might make sense to look at how we can improve the readability a bit.
python/pyspark/serializers.py
Outdated
index = read_int(stream) | ||
self.batch_order.append(index) | ||
|
||
def get_batch_order_and_reset(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at _load_from_socket
I think I understand why this was done as a separate function here, but what about if the serializer its self returned either a tuple or re-ordered the batches its self?
I'm just trying to get a better understanding, not saying those are better designs.
python/pyspark/serializers.py
Outdated
@@ -208,8 +214,26 @@ def load_stream(self, stream): | |||
for batch in reader: | |||
yield batch | |||
|
|||
if self.load_batch_order: | |||
num = read_int(stream) | |||
self.batch_order = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to have get_batch_order_and_reset as a separate function, could we verify batch_order is None before we reset and throw here if it's not? Just thinking of future folks who might have to debug something here.
python/pyspark/sql/tests.py
Outdated
df = self.spark.range(64, numPartitions=8).toDF("a") | ||
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}): | ||
pdf, pdf_arrow = self._toPandas_arrow_toggle(df) | ||
self.assertPandasEqual(pdf, pdf_arrow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty similar to the kind of test case we could verify with something like hypothesis. Integrating hypothesis is probably too much work, but we could at least explore num partitions space in a loop quickly here. Would that help do you think @felixcheung ?
if (partitionCount == numPartitions) { | ||
batchWriter.end() | ||
out.writeInt(batchOrder.length) | ||
// Batch order indices are from 0 to N-1 batches, sorted by order they arrived |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like // Sort by the output global batch indexes partition index, partition batch index tuple
?
When I was first read this code path I got confused my self so I think we should spend a bit of time on the comment here.
// After last batch, end the stream | ||
if (lastIndex == results.length) { | ||
batchWriter.end() | ||
arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call i
something more descriptive like partition_batch_num or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup!
Thanks for the review @holdenk ! I haven't had time to followup, but I'll take a look through this and see what I can do about making things clearer. |
retest this please |
Test build #98202 has finished for PR 22275 at commit
|
d6fefee
to
7d19977
Compare
Apologies for the delay in circling back to this. I reorganized a little to simplify and expanded the comments to hopefully better describe the code. A quick summary of the changes: I changed the ArrowStreamSerializer to not have any state - that seemed to complicate things. So instead of saving the batch order indices, they are loaded on the last iteration of |
python/pyspark/sql/tests.py
Outdated
(64, 1, 64), # Test single partition, single batch | ||
(64, 1, 8), # Test single partition, multiple batches | ||
(30, 7, 2), # Test different sized partitions | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk and @felixcheung , I didn't do a loop but chose some different levels of partition numbers to be a bit more sure that partitions won't end up in order. I also added some other cases of different partition/batch ratios. Let me know if you think we need more to be sure here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we're guaranteeing out-of-order from the JVM. Could we delay on one of the early partitions to guarantee out of order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's not a guarantee, but with a large num of partitions, it's a pretty slim chance they will all be in order. I can also add a case with some delay. My only concern is how big to make the delay to be sure it's enough without adding wasted time to the tests.
How about we keep the case with a large number of partitions and add a case with 100ms delay on the first partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk , I updated the tests, please take another look when you get a chance. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new tests, I think 0.1 on one of partitions is enough.
Test build #98284 has finished for PR 22275 at commit
|
Test build #98285 has finished for PR 22275 at commit
|
Test build #98538 has finished for PR 22275 at commit
|
Test build #98624 has finished for PR 22275 at commit
|
Test build #98630 has finished for PR 22275 at commit
|
Test build #98629 has finished for PR 22275 at commit
|
ping @HyukjinKwon and @viirya to maybe take another look at the recent changes to make this cleaner, if you are able to. Thanks! |
python/pyspark/sql/tests.py
Outdated
|
||
def delay_first_part(partition_index, iterator): | ||
if partition_index == 0: | ||
time.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change to the tests, thanks @BryanCutler
LGTM the current change looks clearer. Thanks @BryanCutler |
Thanks for asking me. Will take a look within few days. Don't block because of me for clarification. I can take a look even after it got merged and we can make a followup change if there was an issue we missed. |
…-batches-SPARK-25274
Test build #99743 has finished for PR 22275 at commit
|
merged to master, thanks @holdenk @viirya and @felixcheung ! |
Sorry @BryanCutler for my super super late input. LGTM to me as well :D. |
Thanks @HyukjinKwon ! |
…ord batches to improve performance ## What changes were proposed in this pull request? When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in. This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased. Followup to apache#21546 ## How was this patch tested? Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python. ## Performance Tests - toPandas Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8 measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each. Test code ```python df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand()) for i in range(5): start = time.time() _ = df.toPandas() elapsed = time.time() - start ``` Spark config ``` spark.driver.memory 5g spark.executor.memory 5g spark.driver.maxResultSize 2g spark.sql.execution.arrow.enabled true ``` Current Master w/ Arrow stream | This PR ---------------------|------------ 5.16207 | 4.342533 5.133671 | 4.399408 5.147513 | 4.468471 5.105243 | 4.36524 5.018685 | 4.373791 Avg Master | Avg This PR ------------------|-------------- 5.1134364 | 4.3898886 Speedup of **1.164821449** Closes apache#22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
… Arrow enabled ## What changes were proposed in this pull request? apache#22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well. However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side. This PR fixes this by also sending the partition order if there are no partitions present. ## How was this patch tested? New unit test added. Closes apache#24650 from dvogelbacher/dv/fixNoPartitionArrowConversion. Authored-by: David Vogelbacher <dvogelbacher@palantir.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… Arrow enabled (#625) ## What changes were proposed in this pull request? apache#22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well. However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side. This PR fixes this by also sending the partition order if there are no partitions present. ## How was this patch tested? New unit test added.
What changes were proposed in this pull request?
When executing
toPandas
with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.
Followup to #21546
How was this patch tested?
Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.
Performance Tests - toPandas
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute
toPandas()
and took the average best time of 5 runs/5 loops each.Test code
Spark config
Speedup of 1.164821449