Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Streaming data allocation #3995

Closed
AlbertoEAF opened this issue Feb 17, 2021 · 10 comments
Closed

[feature] Streaming data allocation #3995

AlbertoEAF opened this issue Feb 17, 2021 · 10 comments

Comments

@AlbertoEAF
Copy link
Contributor

AlbertoEAF commented Feb 17, 2021

Motivation

Although LightGBM supports creating a dataset from an array of matrices through LGBM_DatasetCreateFromMats, there's no support to load data from a data stream in single-pass fashion without knowing its size beforehand. For bigger datasets in distributed systems this can be a performance limitation.

This feature adds support for it in SWIG, although it is not Java-specific and could be exposed in the rest of the code too.

Proposed Solution

Problem:
Assume the input dataset size is not known (stream) and you want to copy the data in a single-pass.

Solution:
It is possible to implement such feature in C++ by a dynamic "chunked array" - i.e., an array of arrays (chunks), where all chunks are of a fixed size. The sequence is the following:

  1. Initialize ChunkedArray with one chunk.
  2. As each value is read from the stream, it is back-inserted in the last chunk.
  3. When the chunk is full, a new chunk is automatically allocated before back-insertion.
  4. Repeat from 2 until the input data is exhausted.

After reading the dataset, it's size can be computed in O(1):

size = (num_chunks-1) * chunk_size + num_elements_in_last_chunk

This array of arrays can now be used in the LGBM_DatasetCreateFromMats call.

Patch notes

  • This patch is required to support streaming in SWIG.
  • The ChunkedArray class is only exposed in SWIG but could be exposed for users of other languages too as it's a regular C++ class.

Example usage:

In C++:

#include "ChunkedArray.hpp"

auto data = ChunkedArray<double>(100000); // chunk_size = 100000

// Automatically managed memory & no need to specify addresses:
while (!stream.empty()) {
  data.add(stream.pop()); 
}

LGBM_DatasetCreateFromMats(..., a.data_as_void(), ...); 

This was properly wrapped in SWIG, so it works as a regular Java class too.

Here's the patch diff and I'd really like to see LGBM gain this support, can I submit an MR with it?

Special request for input @imatiach-msft @guolinke @StrikerRUS :)

@AlbertoEAF
Copy link
Contributor Author

AlbertoEAF commented Feb 17, 2021

Also, I'd like to add automatic C++ CI tests for this.

Do we already have C++ tests in the rest of the code? There are some low-level parts which would useful to unit-test directly in C++, such as this.

I was looking into it and still in 2021 Google Test seems one of the best solutions. The other alternative was Catch2, but even if it makes testing easier, apparently it limits debugger usage and its compile times are slower.

Any comments/preferences?

@AlbertoEAF AlbertoEAF changed the title [feature] (SWIG) Streaming data allocation [feature] Streaming data allocation Feb 17, 2021
@StrikerRUS
Copy link
Collaborator

I'm definitely +1 for this feature! Described use case seems reasonable to me. As a "feature request for feature request", I'd like to see this in C API, so everyone will be able to use it in codebases with LightGBM 🙂 .

Speaking about C++ tests, unfortunately we don't have any right now 😞 . There is only very old feature request for them: #261 (and #3841 as a sub-issue) and very promising ongoing pull request which should help enable Google Test for LightGBM repo on our CI services: #3555.

@imatiach-msft
Copy link
Contributor

+1 this would be very useful for lightgbm in mmlspark codebase, which currently aggregates all of the streaming data as a large Java array, converts that to a large native array and passes that into the lightgbm native codebase. I think this would reduce memory usage by 1/3 (I think 1/3 is used by Java, then 1/3 by temporary native array created from Java, and another 1/3 by dataset in lightgbm).

@imatiach-msft
Copy link
Contributor

imatiach-msft commented Feb 17, 2021

Hmm, I'm actually not entirely sure how much memory is used on the Java side in mmlspark. It might actually reduce memory usage by 50% since I think we are actually making a deep copy of the Java data when reading it from the DataFrame in mapPartitions function, so then 1/4 of the data is loaded by the user in spark, 1/4 is currently aggregated and turned into native array, 1/4 of memory is taken up by that native array and finally another 1/4 is taken by lightgbm dataset (I think it makes a copy of the native array passed in). Streaming, if implemented correctly, would cut out two of those 1/4 in the middle. In any case this enhancement would definitely reduce memory usage substantially.

@AlbertoEAF
Copy link
Contributor Author

AlbertoEAF commented Feb 17, 2021

Great analysis @imatiach-msft :) in our case with the Java provider, the input Java Dataset was taking ~70% of the total memory (due to the overhead of the Java pointers for each value which don't exist at all in C++).
The remaining 30% of the memory was sufficient to have 2 copies of the train data in C++:

  1. C++ buffer - to create the LightGBM dataset.
  2. LightGBM Dataset - after creating it one can delete the C++ buffer.

Now we read data in streaming to the ChunkedArray and need only 30% of the memory. Some jobs used to require >120GB of executor, so it's a big difference.

That's great to see we're all in the same page guys, and thanks for the input!
I'll open a PR to add support for SWIG, and later in another PR we can even expose this for other languages as @StrikerRUS suggested ;)

@imatiach-msft
Copy link
Contributor

@AlbertoEAF the PR looks amazing! Do you have plans to add sparse support as well? LGBM_DatasetCreateFromMats is only for dense case. We really need sparse support for mmlspark as it's used in many use cases as well.

@AlbertoEAF
Copy link
Contributor Author

AlbertoEAF commented Feb 19, 2021

Thanks @imatiach-msft :D

Well I didn't, but as soon as ChunkedArray is merged, it should be trivial to support collecting data in streaming for the sparse case as well from MMLSpark's code.

There are two parts to it:

  1. You use a ChunkedArray<double> for the data array, and two ChunkedArray<size_t> for the col and row indices. This way you can already collect input data from a CSR/CSC stream in your Java/Scala MMLSpark code.

  2. Unlike LGBM_DatasetCreateFromMats vs LGBM_DatasetCreateFromMat, there is no way to receive partitioned sparse data for the time being, so you have to coalesce those 3 chunked arrays first. After that you can get rid of the chunked arrays and calling LGBM_DatasetCreateFromCSR is business as usual.

About Coalescing

As each ChunkedArray knows its size, you just create an array of that size and call:

val coalescedArray = lightgbmlib.new_intArray(chunkedArray.get_add_count());

// Copy data to the coalescedArray:
chunkedArray.coalesce_to(coalescedArray);  // Memory spike: 2 copies of data in C++.

chunkedArray.release(); // Spike gone.
...

Getting rid of all memory spikes

Ideally, in the future a LGBM_DatasetCreateFromCSRs could be added to LightGBM so coalescing the arrays (step 2) is not needed, further removing that temporary memory spike when having chunked+coalesced data which you already pay in Java when you aggregate the data from what I understood:

1/4 of the data is loaded by the user in spark, 1/4 is currently aggregated and turned into native array, 1/4 of memory is taken up by that native array

It's important to remember here that the memory usage of a dataset copy is much higher in Java than C++ as Java's memory representation is less efficient.

Summary

After ChunkedArray is merged as is you can already support the streaming with sparse data in MMLSpark too.

What do you think @imatiach-msft ?

AlbertoEAF added a commit to feedzai/LightGBM that referenced this issue Feb 23, 2021
Through the new class ChunkedArray it's possible to collect data in Java in streaming.

Rationale described at  microsoft#3995.
@AlbertoEAF
Copy link
Contributor Author

gentle ping @imatiach-msft :)

@imatiach-msft
Copy link
Contributor

@AlbertoEAF yes, sure, sounds great to me. I guess I will just need to add sparse support sometime in the future. Maybe for now if I upgrade I can just use the optimized new method for dense data, but keep the sparse code on the memory-intense code.

@StrikerRUS
Copy link
Collaborator

Closed in favor of being in #2302. We decided to keep all feature requests in one place.

Welcome to contribute this feature! Please re-open this issue (or post a comment if you are not a topic starter) if you are actively working on implementing this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants