Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance #22275

Conversation

BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Aug 29, 2018

What changes were proposed in this pull request?

When executing toPandas with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to #21546

How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute toPandas() and took the average best time of 5 runs/5 loops each.

Test code

df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start

Spark config

spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
Current Master w/ Arrow stream This PR
5.16207 4.342533
5.133671 4.399408
5.147513 4.468471
5.105243 4.36524
5.018685 4.373791
Avg Master Avg This PR
5.1134364 4.3898886

Speedup of 1.164821449

if (partitionCount == numPartitions) {
batchWriter.end()
out.writeInt(batchOrder.length)
// Batch order indices are from 0 to N-1 batches, sorted by order they arrived
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Batch order indices are from 0 to N-1 batches, sorted by order they arrived. Re-sort indices to the correct order to build a table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a slight change? // Re-order batches according to these indices to build a table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like // Sort by the output global batch indexes partition index, partition batch index tuple?
When I was first read this code path I got confused my self so I think we should spend a bit of time on the comment here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sounds good

@@ -187,9 +187,15 @@ def loads(self, obj):

class ArrowStreamSerializer(Serializer):
"""
Serializes Arrow record batches as a stream.
Serializes Arrow record batches as a stream. Optionally load the ordering of the batches as a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is optional. Do we have other usage of this ArrowStreamSerializer without the ordering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's also used in the createDataFrame path, but that does only use dump_stream. Still, it seemed best to make this an optional feature of the serializer.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95441 has finished for PR 22275 at commit d6fefee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrowStreamSerializer(Serializer):

@BryanCutler
Copy link
Member Author

Thanks @viirya ! What are your thoughts @HyukjinKwon ? I consolidated the batch order serializer from before into the ArrowStreamSerializer to simplify a little.

@BryanCutler
Copy link
Member Author

@holdenk I was wondering if you had any thoughts on this? Thanks!

@holdenk
Copy link
Contributor

holdenk commented Sep 19, 2018

Sure, I'll take a look on Friday if it's not urgent

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally, is this going to limit how much data to pass along because of the bit length of the index?

@BryanCutler
Copy link
Member Author

generally, is this going to limit how much data to pass along because of the bit length of the index?

So the index passed to python is the RecordBatch index, not an element index, and it would limit the number of batches to Int.MAX. I wouldn't expect that would be likely and you can always set the number of batches to 1 per partition, so that would be the limiting factor then. WDYT @felixcheung ?

@felixcheung
Copy link
Member

got it. so the size of the each batch could grow.

df = self.spark.range(64, numPartitions=8).toDF("a")
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
self.assertPandasEqual(pdf, pdf_arrow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, is this test case "enough" to trigger any possible problem just by random? would increasing the number of batch or num record per batch increase the chance of streaming order or concurrency issue perhaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty similar to the kind of test case we could verify with something like hypothesis. Integrating hypothesis is probably too much work, but we could at least explore num partitions space in a loop quickly here. Would that help do you think @felixcheung ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds good

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, reducing memory overhead pressure in the driver is always welcome :) I got a bit confused reading the PR the first time through so I think it might make sense to look at how we can improve the readability a bit.

index = read_int(stream)
self.batch_order.append(index)

def get_batch_order_and_reset(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at _load_from_socket I think I understand why this was done as a separate function here, but what about if the serializer its self returned either a tuple or re-ordered the batches its self?

I'm just trying to get a better understanding, not saying those are better designs.

@@ -208,8 +214,26 @@ def load_stream(self, stream):
for batch in reader:
yield batch

if self.load_batch_order:
num = read_int(stream)
self.batch_order = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to have get_batch_order_and_reset as a separate function, could we verify batch_order is None before we reset and throw here if it's not? Just thinking of future folks who might have to debug something here.

df = self.spark.range(64, numPartitions=8).toDF("a")
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
self.assertPandasEqual(pdf, pdf_arrow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty similar to the kind of test case we could verify with something like hypothesis. Integrating hypothesis is probably too much work, but we could at least explore num partitions space in a loop quickly here. Would that help do you think @felixcheung ?

if (partitionCount == numPartitions) {
batchWriter.end()
out.writeInt(batchOrder.length)
// Batch order indices are from 0 to N-1 batches, sorted by order they arrived
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like // Sort by the output global batch indexes partition index, partition batch index tuple?
When I was first read this code path I got confused my self so I think we should spend a bit of time on the comment here.

// After last batch, end the stream
if (lastIndex == results.length) {
batchWriter.end()
arrowBatches.indices.foreach { i => batchOrder.append((index, i)) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call i something more descriptive like partition_batch_num or similar?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup!

@BryanCutler
Copy link
Member Author

Thanks for the review @holdenk ! I haven't had time to followup, but I'll take a look through this and see what I can do about making things clearer.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Oct 29, 2018

Test build #98202 has finished for PR 22275 at commit d6fefee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrowStreamSerializer(Serializer):

@BryanCutler BryanCutler force-pushed the arrow-toPandas-oo-batches-SPARK-25274 branch from d6fefee to 7d19977 Compare October 30, 2018 23:07
@BryanCutler
Copy link
Member Author

Apologies for the delay in circling back to this. I reorganized a little to simplify and expanded the comments to hopefully better describe the code.

A quick summary of the changes: I changed the ArrowStreamSerializer to not have any state - that seemed to complicate things. So instead of saving the batch order indices, they are loaded on the last iteration of load_stream, and this was put in a special serializer ArrowCollectSerializer so that it is clear where it is used. I also consolidated all the batch ordering calls within _collectAsArrow so it is easier to follow the whole process.

(64, 1, 64), # Test single partition, single batch
(64, 1, 8), # Test single partition, multiple batches
(30, 7, 2), # Test different sized partitions
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk and @felixcheung , I didn't do a loop but chose some different levels of partition numbers to be a bit more sure that partitions won't end up in order. I also added some other cases of different partition/batch ratios. Let me know if you think we need more to be sure here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we're guaranteeing out-of-order from the JVM. Could we delay on one of the early partitions to guarantee out of order?

Copy link
Member Author

@BryanCutler BryanCutler Nov 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's not a guarantee, but with a large num of partitions, it's a pretty slim chance they will all be in order. I can also add a case with some delay. My only concern is how big to make the delay to be sure it's enough without adding wasted time to the tests.

How about we keep the case with a large number of partitions and add a case with 100ms delay on the first partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk , I updated the tests, please take another look when you get a chance. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new tests, I think 0.1 on one of partitions is enough.

@SparkQA
Copy link

SparkQA commented Oct 31, 2018

Test build #98284 has finished for PR 22275 at commit 7d19977.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 31, 2018

Test build #98285 has finished for PR 22275 at commit 6457e42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 7, 2018

Test build #98538 has finished for PR 22275 at commit bf2feec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler BryanCutler changed the title [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send out-of-order record batches to improve performance [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance Nov 8, 2018
@SparkQA
Copy link

SparkQA commented Nov 9, 2018

Test build #98624 has finished for PR 22275 at commit 725cd47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2018

Test build #98630 has finished for PR 22275 at commit 8045fac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2018

Test build #98629 has finished for PR 22275 at commit 7dc92c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

ping @HyukjinKwon and @viirya to maybe take another look at the recent changes to make this cleaner, if you are able to. Thanks!


def delay_first_part(partition_index, iterator):
if partition_index == 0:
time.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this :)

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change to the tests, thanks @BryanCutler

@viirya
Copy link
Member

viirya commented Nov 10, 2018

LGTM the current change looks clearer. Thanks @BryanCutler

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 11, 2018

Thanks for asking me. Will take a look within few days. Don't block because of me for clarification. I can take a look even after it got merged and we can make a followup change if there was an issue we missed.

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99743 has finished for PR 22275 at commit 00c7b8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in ecaa495 Dec 6, 2018
@BryanCutler
Copy link
Member Author

merged to master, thanks @holdenk @viirya and @felixcheung !

@HyukjinKwon
Copy link
Member

Sorry @BryanCutler for my super super late input. LGTM to me as well :D.

@BryanCutler BryanCutler deleted the arrow-toPandas-oo-batches-SPARK-25274 branch February 11, 2019 18:50
@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon !

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ord batches to improve performance

## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to apache#21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
---------------------|------------
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
------------------|--------------
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes apache#22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
pull bot pushed a commit to Pandinosaurus/spark that referenced this pull request May 22, 2019
… Arrow enabled

## What changes were proposed in this pull request?
apache#22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well.
However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side.
This PR fixes this by also sending the partition order if there are no partitions present.

## How was this patch tested?
New unit test added.

Closes apache#24650 from dvogelbacher/dv/fixNoPartitionArrowConversion.

Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
bulldozer-bot bot pushed a commit to palantir/spark that referenced this pull request Nov 22, 2019
… Arrow enabled (#625)

## What changes were proposed in this pull request?
apache#22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well.
However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side. 
This PR fixes this by also sending the partition order if there are no partitions present.

## How was this patch tested?
New unit test added.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants