Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] OrderBy with spillover #35268

Open
egillax opened this issue Apr 21, 2023 · 18 comments · May be fixed by #35898
Open

[C++] OrderBy with spillover #35268

egillax opened this issue Apr 21, 2023 · 18 comments · May be fixed by #35898

Comments

@egillax
Copy link
Contributor

egillax commented Apr 21, 2023

Describe the enhancement requested

Hi everyone,

I'm representing a group of researchers that is working with observational health data. We have an ecosystem of packages mostly in R and have been exploring using arrow as a backend for working with our data instead of sqlite. We've been impressed by the speed improvements and were almost ready to make the switch but we've hid a roadblock.

The current sorting in arrow (using dplyr::arrange) is taking to much memory. Looking at it further I see this operation is a pipeline breaker and seems to accumulate everything in memory before sorting with a single thread.

I also see mentioned in many places the plan is to improve this and add spillover mechanisms to the sort and other pipeline breakers.

I did a small comparison between arrow, our current solution, duckdb and dplyr. I measured time and max memory with gnu time

Small Medium
memory after dplyr::compute() 1.1 GB 5.1 GB
arrow (arrange and then write_dataset) memory 3.1 GB 14.1 GB
time 1 minute 12 sec 8 minutes 46 sec
dplyr (collect and then arrange) memory 3.6 GB 15.9 GB
time 11 seconds 1 minute
duckdb (from parquet files) memory 4.3 GB 19.3 GB
time 4 seconds 21 seconds
Our current solution (uses sqlite) memory 240 MB 260 MB
time 2 minutes 30 seconds 13 min 22 sec

As you can see our current solution is slow but will never run out of memory.

It would be very nice if spillover was added to the sort in arrow so we could specify a memory limit to ensure we don't run out of memory and sort larger than memory data. I hope you would even consider this feature in the near future (even for arrow 13.0.0).

I just wanted to make this issue to make you aware this is a blocker for us at the moment. We don't have the c++ knowledge to contribute to a solution for this, but would be glad to help if changes of R bindings would be needed and of course with testing.

Component(s)

C++

@westonpace westonpace changed the title OrderBy with spillover [C++] OrderBy with spillover Apr 21, 2023
@westonpace
Copy link
Member

Thanks for the detailed writeup and investigation. I agree this is a top priority for Acero although I unfortunately have been very busy these past few months and I don't know if that will change so I can't really promise anything.

There are a few approaches that can be taken here and https://en.wikipedia.org/wiki/External_sorting provides a good summary / starting point for an investigation. Some of these approaches require a merge step. So some kind of merge_indices / merge function (e.g. #33512) would be essential.

@Liamtoha
Copy link

Describe the enhancement requested

Hi everyone,

I'm representing a group of researchers that is working with observational health data. We have an ecosystem of packages mostly in R and have been exploring using arrow as a backend for working with our data instead of sqlite. We've been impressed by the speed improvements and were almost ready to make the switch but we've hid a roadblock.

The current sorting in arrow (using dplyr::arrange) is taking to much memory. Looking at it further I see this operation is a pipeline breaker and seems to accumulate everything in memory before sorting with a single thread.

I also see mentioned in many places the plan is to improve this and add spillover mechanisms to the sort and other pipeline breakers.

I did a small comparison between arrow, our current solution, duckdb and dplyr. I measured time and max memory with gnu time

Small Medium
memory after dplyr::compute() 1.1 GB 5.1 GB
arrow (arrange and then write_dataset) memory 3.1 GB 14.1 GB
time 1 minute 12 sec 8 minutes 46 sec
dplyr (collect and then arrange) memory 3.6 GB 15.9 GB
time 11 seconds 1 minute
duckdb (from parquet files) memory 4.3 GB 19.3 GB
time 4 seconds 21 seconds
Our current solution (uses sqlite) memory 240 MB 260 MB
time 2 minutes 30 seconds 13 min 22 sec
As you can see our current solution is slow but will never run out of memory.

It would be very nice if spillover was added to the sort in arrow so we could specify a memory limit to ensure we don't run out of memory and sort larger than memory data. I hope you would even consider this feature in the near future (even for arrow 13.0.0).

I just wanted to make this issue to make you aware this is a blocker for us at the moment. We don't have the c++ knowledge to contribute to a solution for this, but would be glad to help if changes of R bindings would be needed and of course with testing.

Component(s)

C++

Commit

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented May 4, 2023

Hi, I wanna take this issue. But I am busy this month, if there is no other contributors working on this, I would like to contribute the feature and take this in the end of this month.

@westonpace
Copy link
Member

Great! You might take a look at #35320 as you're getting started. It helps give some overview of Acero in general.

@R-JunmingChen
Copy link
Contributor

take

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented May 13, 2023

Great! You might take a look at #35320 as you're getting started. It helps give some overview of Acero in general.

I am stuck with how to create a appropriate design for external sort in acero.
With your doc, I think it's good if I first create a external sorting kernel and then use it in acero.
But acero needs seperate parts of external sorting. To be acurate:

  • In the InputReceived() of execnode order_by_sink with spillover, we should read data to buffer with size of buffer_size . When the buffer is fully filled, we should sort the buffer and write the sorted data to disk.
  • In the DoFinish() of execnode order_by_sink with spillover, we should perform a n-way external merge sorting.

So I have two simple plan,

  1. Implement an externalOrderByImpl for acero and implement above methods on its own. In this way, we do not implement anything in arrow::compute. May be we still need to call the sorting method in compute in InputReceived().
  2. Implement two kernels in arrow::compute, one does the spillover things and one dose the n-way external merge sorting. externalOrderByImpl just call the two kernel in its InputReceived() and DoFinish(). It will be a little weird, cause I think the two kernels don't conform arrow's design principle for kernel.

I need some suggestions before I implement codes for external sorting.

@westonpace
Copy link
Member

westonpace commented May 18, 2023

order_by_sink is the old way. We should be migrating to use order_by in order_by_node.cc. You should make these changes there.

It will be a little weird, cause I think the two kernels don't conform arrow's design principle for kernel.

I agree that compute kernels are not best for some of this

we should sort the buffer and write the sorted data to disk.

This can be a kernel (this already exists with SortIndices).

Implement two kernels in arrow::compute, one does the spillover things

Kernels should not write to disk. I think we should create a separate abstraction, a spilling accumulation queue, that writes to disk. For example, it could be something like...

class OrderedSpillingAccumulationQueue {
public:
  
  OrderedSpillingAccumulationQueue(int64_t buffer_size);

  // Inserts a batch into the queue.  This may trigger a write to disk if enough data is accumulated
  // If it does, then SpillCount should be incremented before this method returns (but the write can
  // happen in the background, asynchronously)
  Status InsertBatch(ExecBatch batch);

  // The number of files that have been written to disk.  This should also include any data in memory
  // so it will be the number of files written to disk + 1 if there is in-memory data.
  int SpillCount();

  // This should only be called after all calls to InsertBatch have been completed.  This starts reading
  // the data that was spilled. It will grab the next batch of data from the given spilled file.  If spill_index
  // == SpillCount() - 1 then this might be data that is already in-memory.
  Future<std::optional<ExecBatch>> FetchNextBatch(int spill_index);
};

The n-way merge can then call FetchBatch appropriately. An n-way merge is going to be challenging to implement performantly because it is not a columnar algorithm. Thinking about this more, the n-way merge kernel will probably not be a compute function. You will probably want to use something like ExecBatchBuilder to accumulate the results.

Keep in mind that there is another approach which will not require an n-way merge (external distribution sort). This approach may be simpler to implement but I don't know.

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented May 19, 2023

The n-way merge can then call FetchBatch appropriately. An n-way merge is going to be challenging to implement performantly because it is not a columnar algorithm. Thinking about this more, the n-way merge kernel will probably not be a compute function. You will probably want to use something like ExecBatchBuilder to accumulate the results.

Helpful suggestions, It is indeed challenging to implement external merge sort performantly.
Draft plan:

  1. In InsertBatch, we do what your comment says.
  2. we do n-way merge in buffer. We could also just compare the columns needed for sorting and take the indices, whose format should be {batch_index}_{indice}, to materialize a result.

Keep in mind that there is another approach which will not require an n-way merge (external distribution sort). This approach may be simpler to implement but I don't know.

I have roughly investigated external distribution sort. May be we shouldn't choose it.
The external distribution sort prefers n-pivots could divide the entire data evenly. However, It's hard to get the perfect n-pivots in the first round of InsertBatch unless every batch is identically distribution. If the n-pivots can't divide the data evenly, the sorting would call extra IO, compared to external merge sort, to recusively divide data untill the smallest segment fits buffer.

@github-actions github-actions bot linked a pull request Jun 3, 2023 that will close this issue
4 tasks
@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented Jun 5, 2023

Hi, @westonpace, I am stuck with reading back data. I need to read back data with specific offset and batch size in spillover files.

Currenty, my offset/batch size counts on the number of rows and I use parquet format for temporal spillover.
However, our parquet lib doesn't support read file with row offset( only the byte offset are supported). Besides, the recordBatchReader, which is a generator, supports batch_size with the number of row but it doesn't support offset.

May be AsyncGenerator is a good choice to utilize recordBatchReader, but I don't find examples for reading parquet file, the test cases of it are all memory operations. Does the AsyncGenerator could resolve the problem?

@westonpace
Copy link
Member

Are you trying to read a single row? Or a whole batch of rows?

If you need random access to individual rows then parquet is not going to be a good fit. We might want to investigate some kind of row-major format.

If you only need to load specific batches of data then could you create a row group for each batch? Or a separate file for each batch?

If you need random access to batches of data (e.g. you don't know the row group boundaries at write time but it isn't random access to rows) then we could maybe use the row skip feature that was recently added to parquet (I don't think it has been exposed yet).

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented Jun 10, 2023

If you only need to load specific batches of data then could you create a row group for each batch? Or a separate file for each batch?

I can't know the batch_size of reading when I write it to disk so I can't create a row group/file with a suitable size. Since the batch_size of reading is deicded by spill_over_count and buffer_size ( like buffer_size / spill_over_count ), the spill_over_count can't be determined untill all the inputs are finished.

If you need random access to batches of data (e.g. you don't know the row group boundaries at write time but it isn't random access to rows) then we could maybe use the row skip feature that was recently added to parquet (I don't think it has been exposed yet).

Sorry for my confused description. The real problem is that I wanna make this Future<std::optional<ExecBatch>> FetchNextBatch(int spill_index); work. So, for a specific example_spill_over_file_one.parquet, I should read with row offset of batch_size * 0 and batch size of batch_size at fisrt and when I use up the data for comparing I should then read row offset of batch_size * 1 and batch size of batch_size...... and so on.

The skip feature can solve my problem more easily.
Currently, I have used AsyncGenerator like what the source_node.cc does to read back data. I think it's enough to solve my problem?

@westonpace
Copy link
Member

Yes, I think that would solve your problem. For example, is this similar to how the file_parquet.cc file uses parquet::arrow::FileReader::GetRecordBatchGenerator?

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented Jun 22, 2023

Yes, I think that would solve your problem. For example, is this similar to how the file_parquet.cc file uses parquet::arrow::FileReader::GetRecordBatchGenerator?

Yes, it's similar.

I am closing to finish the first draft for this issue and I confront another problem may need your guidance. How can I compare two values from two arrays in the run time in an Acero node.
To be specific, I can use the following codes to compare:
a_array->data()->GetValues<CType>(a_indice) > b_array->data()-><Ctype>GetValues(b_indice).
To make this code work, the CType need to be known in the compile time. However, it looks like I can only get the CType in run time for an Array, which is from the input of an Acero node.

I know, the current solution in arrow::compute materializes all the arrow::DataType for a compute kernel template and add them to a Function. So, in the run time, I can use Function to handle the data whatever the arrow::DataType it is.

To solve my problem, do I need to create a Function and add corresponding kernels to it for comparison?
Or, do we have any simpler method to solve the problem?

Thanks

@westonpace
Copy link
Member

To solve my problem, do I need to create a Function and add corresponding kernels to it for comparison?
Or, do we have any simpler method to solve the problem?

Yes. But, luckily, in this case the functions should already exist: https://gist.github.com/westonpace/a45738e5a356324d410cba2c2713b1fd

@R-JunmingChen
Copy link
Contributor

R-JunmingChen commented Jul 4, 2023

Yes. But, luckily, in this case the functions should already exist: https://gist.github.com/westonpace/a45738e5a356324d410cba2c2713b1fd

No, compare Functions currently don't support comparing between single elements in array. Like a_array->data()->GetValues<CType>(a_indice) > b_array->data()-><Ctype>GetValues(b_indice). These Function can be used to compare two array entirely. But merge sort only need to compare the min/max keys between different arrays, comparing the entire arrays is too costly.

If we need to add the Function used to compare elements with specific indices in array, I plan to submmit another PR to implement the Function firstly.

Actually, I think we could reach higher preformance if we implement a special External Merge Function like
AsyncGenerator<RecordBatch> MergeFunction(Vector<AsyncGenerator<RecordBatch>> record_batches, const ExternalMergeOption& options, ExecContext* ctx)

@R-JunmingChen
Copy link
Contributor

It seems that I can use Scalar to obtain a single element and compare it with the Function you mentioned. So the problem could be solved with low engineering cost.

@egillax
Copy link
Contributor Author

egillax commented Oct 13, 2023

Hi, @R-JunmingChen

Are you still working on this issue?

@R-JunmingChen
Copy link
Contributor

hi, @egillax, sorry for replying so late. I am stuck with the implementation for sorting, the performance of external merge sort of my implementation is not good. I have a plan to continue this PR in the late Nov.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants