-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++] OrderBy with spillover #35268
Comments
Thanks for the detailed writeup and investigation. I agree this is a top priority for Acero although I unfortunately have been very busy these past few months and I don't know if that will change so I can't really promise anything. There are a few approaches that can be taken here and https://en.wikipedia.org/wiki/External_sorting provides a good summary / starting point for an investigation. Some of these approaches require a merge step. So some kind of |
Commit |
Hi, I wanna take this issue. But I am busy this month, if there is no other contributors working on this, I would like to contribute the feature and take this in the end of this month. |
Great! You might take a look at #35320 as you're getting started. It helps give some overview of Acero in general. |
take |
I am stuck with how to create a appropriate design for external sort in acero.
So I have two simple plan,
I need some suggestions before I implement codes for external sorting. |
I agree that compute kernels are not best for some of this
This can be a kernel (this already exists with SortIndices).
Kernels should not write to disk. I think we should create a separate abstraction, a spilling accumulation queue, that writes to disk. For example, it could be something like...
The n-way merge can then call Keep in mind that there is another approach which will not require an n-way merge (external distribution sort). This approach may be simpler to implement but I don't know. |
Helpful suggestions, It is indeed challenging to implement external merge sort performantly.
I have roughly investigated external distribution sort. May be we shouldn't choose it. |
Hi, @westonpace, I am stuck with reading back data. I need to read back data with specific offset and batch size in spillover files. Currenty, my offset/batch size counts on the number of rows and I use parquet format for temporal spillover. May be AsyncGenerator is a good choice to utilize recordBatchReader, but I don't find examples for reading parquet file, the test cases of it are all memory operations. Does the AsyncGenerator could resolve the problem? |
Are you trying to read a single row? Or a whole batch of rows? If you need random access to individual rows then parquet is not going to be a good fit. We might want to investigate some kind of row-major format. If you only need to load specific batches of data then could you create a row group for each batch? Or a separate file for each batch? If you need random access to batches of data (e.g. you don't know the row group boundaries at write time but it isn't random access to rows) then we could maybe use the row skip feature that was recently added to parquet (I don't think it has been exposed yet). |
I can't know the batch_size of reading when I write it to disk so I can't create a row group/file with a suitable size. Since the batch_size of reading is deicded by spill_over_count and buffer_size ( like buffer_size / spill_over_count ), the spill_over_count can't be determined untill all the inputs are finished.
Sorry for my confused description. The real problem is that I wanna make this The skip feature can solve my problem more easily. |
Yes, I think that would solve your problem. For example, is this similar to how the |
Yes, it's similar. I am closing to finish the first draft for this issue and I confront another problem may need your guidance. How can I compare two values from two arrays in the run time in an Acero node. I know, the current solution in To solve my problem, do I need to create a Function and add corresponding kernels to it for comparison? Thanks |
Yes. But, luckily, in this case the functions should already exist: https://gist.github.com/westonpace/a45738e5a356324d410cba2c2713b1fd |
No, compare Functions currently don't support comparing between single elements in array. Like If we need to add the Function used to compare elements with specific indices in array, I plan to submmit another PR to implement the Function firstly. Actually, I think we could reach higher preformance if we implement a special External Merge Function like |
It seems that I can use Scalar to obtain a single element and compare it with the Function you mentioned. So the problem could be solved with low engineering cost. |
Hi, @R-JunmingChen Are you still working on this issue? |
hi, @egillax, sorry for replying so late. I am stuck with the implementation for sorting, the performance of external merge sort of my implementation is not good. I have a plan to continue this PR in the late Nov. |
Describe the enhancement requested
Hi everyone,
I'm representing a group of researchers that is working with observational health data. We have an ecosystem of packages mostly in R and have been exploring using
arrow
as a backend for working with our data instead ofsqlite
. We've been impressed by the speed improvements and were almost ready to make the switch but we've hid a roadblock.The current sorting in arrow (using
dplyr::arrange
) is taking to much memory. Looking at it further I see this operation is apipeline breaker
and seems to accumulate everything in memory before sorting with a single thread.I also see mentioned in many places the plan is to improve this and add spillover mechanisms to the sort and other
pipeline breakers
.I did a small comparison between
arrow
, our current solution,duckdb
anddplyr
. I measured time and max memory withgnu time
As you can see our current solution is slow but will never run out of memory.
It would be very nice if spillover was added to the sort in arrow so we could specify a memory limit to ensure we don't run out of memory and sort larger than memory data. I hope you would even consider this feature in the near future (even for arrow
13.0.0
).I just wanted to make this issue to make you aware this is a blocker for us at the moment. We don't have the c++ knowledge to contribute to a solution for this, but would be glad to help if changes of R bindings would be needed and of course with testing.
Component(s)
C++
The text was updated successfully, but these errors were encountered: