Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26762][SQL][R] Arrow optimization for conversion from Spark DataFrame to R DataFrame #23760

Closed
wants to merge 5 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Feb 12, 2019

What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from Spark DataFrame to R DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
collect(createDataFrame(mtcars))

Requirements

  • R 3.5.x
  • Arrow package 0.12+
    Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.0", subdir = "r")'

Note: currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
Note: currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

Benchmarks

Shall

sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g

R code

df <- cache(createDataFrame(read.csv("500000.csv")))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  collect(df)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()

Data (350 MB):

object.size(read.csv("500000.csv"))
350379504 bytes

"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

Results

Time difference of 221.32014 secs
Time difference of 15.51145 secs

The performance improvement was around 1426%.

Limitations:

  • For now, Arrow optimization with R does not support when the data is raw, and when user explicitly gives float type in the schema. They produce corrupt values. In this case, we decide to fall back to non-optimization code path.

  • Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

How was this patch tested?

Existing tests related with Arrow optimization cover this change. Also, manually tested.

@HyukjinKwon
Copy link
Member Author

cc @BryanCutler, @viirya, @felixcheung, @icexelloss, @rxin, @gatorsmile, @shivaram, @falaki, @yanboliang

Looks previous collect code wasn't performant enough. This optimization applies to head and take as well.

@HyukjinKwon
Copy link
Member Author

I am going to update SQLConf, documentation later when this job is finished. Also, I need to deduplicate some logics across R with Arrow when the job is done later.

@HyukjinKwon HyukjinKwon reopened this Feb 12, 2019
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102242 has finished for PR 23760 at commit 10c3f11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102249 has finished for PR 23760 at commit 8c84556.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I just skimmed the R code but the rest seems reasonable and the speedup looks awesome!

@BryanCutler
Copy link
Member

It might be a good idea to add a test that forces a delay on a partition execution so that you can verify that the R side receives them in the correct order. This was discussed here and added in bf2feec

R/pkg/R/DataFrame.R Show resolved Hide resolved
R/pkg/R/DataFrame.R Show resolved Hide resolved
R/pkg/R/DataFrame.R Outdated Show resolved Hide resolved
R/pkg/R/DataFrame.R Show resolved Hide resolved
@HyukjinKwon
Copy link
Member Author

BTW, I am speeding up and planing to make a blog post at Apache Arrow like https://arrow.apache.org/blog/2019/01/25/r-spark-improvements/ (thanks for letting me know @felixcheung).

Looks sparklyr added Arrow optimization already.

@HyukjinKwon
Copy link
Member Author

re: #23760 (comment)

Thing is, nowadays SparkR doesn't have RDD APIs and in the transition to be removed out completely. Maybe I can try to test with dapply but I think it's difficult to get the partition index. Should be okay since the codes were restored from the previous codes almost as are.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

Copy link
Contributor

@falaki falaki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this. I did a first pass.

R/pkg/R/DataFrame.R Show resolved Hide resolved
R/pkg/R/DataFrame.R Show resolved Hide resolved
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 15, 2019

I hope we can go ahead as is if there are not notable comments to avoid conflict hell ..

Currently I intentionally didn't start to work some items at #23787 (comment) to avoid conflicts but I am trying to complete all as soon as possible.

FWIW, sparklyr already added Arrow optimization, sparklyr#1611 and https://arrow.apache.org/blog/2019/01/25/r-spark-improvements/

@HyukjinKwon
Copy link
Member Author

gentle ping. Would you guys mind if I go ahead?

@felixcheung
Copy link
Member

looks like it's help to break off the remaining tasks in JIRA? #23787 (comment)

@felixcheung felixcheung reopened this Feb 19, 2019
@felixcheung
Copy link
Member

sorry, clicked wrong

@HyukjinKwon
Copy link
Member Author

Yup, Will add obvious ones first.

@felixcheung
Copy link
Member

pending follow ups, ok to me to merge this first. it's getting long to track what should be done and what's changed

@HyukjinKwon
Copy link
Member Author

I have added the test, manually ran the tests, created JIRAs under https://issues.apache.org/jira/browse/SPARK-26759 for follow ups. Will get this in soon if there's no more particular comments.

@SparkQA
Copy link

SparkQA commented Feb 19, 2019

Test build #102502 has finished for PR 23760 at commit cfe947c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

Thank you all, @BryanCutler, @vanzin, @felixcheung, @viirya, @falaki

Arnoldosmium pushed a commit to palantir/spark that referenced this pull request Apr 10, 2019
…taFrame to R DataFrame

## What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from Spark DataFrame to R DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
collect(createDataFrame(mtcars))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```

**R code**

```r
df <- cache(createDataFrame(read.csv("500000.csv")))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  collect(df)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 221.32014 secs
```

```
Time difference of 15.51145 secs
```

The performance improvement was around **1426%**.

### Limitations:

- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values. In this case, we decide to fall back to non-optimization code path.

- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

## How was this patch tested?

Existing tests related with Arrow optimization cover this change. Also, manually tested.

Closes apache#23760 from HyukjinKwon/SPARK-26762.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
vinooganesh pushed a commit to palantir/spark that referenced this pull request Jun 10, 2019
…taFrame to R DataFrame

## What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from Spark DataFrame to R DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
collect(createDataFrame(mtcars))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```

**R code**

```r
df <- cache(createDataFrame(read.csv("500000.csv")))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  collect(df)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 221.32014 secs
```

```
Time difference of 15.51145 secs
```

The performance improvement was around **1426%**.

### Limitations:

- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values. In this case, we decide to fall back to non-optimization code path.

- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

## How was this patch tested?

Existing tests related with Arrow optimization cover this change. Also, manually tested.

Closes apache#23760 from HyukjinKwon/SPARK-26762.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-26762 branch March 3, 2020 01:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants