Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Excessive memory usage of Delta writers #1225

Open
gruuya opened this issue Mar 16, 2023 · 12 comments
Open

Excessive memory usage of Delta writers #1225

gruuya opened this issue Mar 16, 2023 · 12 comments
Labels
bug Something isn't working

Comments

@gruuya
Copy link
Contributor

gruuya commented Mar 16, 2023

Environment

Delta-rs version: 0.8.0

Binding: Rust

Environment:

  • Cloud provider: N/A
  • OS: Any(Ubuntu)
  • Other:

Bug

What happened:

Writing to Delta tables from Datafusion scans of relatively modest Parquet files (~150MB) results in memory usage explosion (peak >1GB).

What you expected to happen:

The memory usage shouldn't peak so drastically.

How to reproduce it:

  1. Grab bytehound to reproduce the memory profile and build it.
  2. Fetch https://seafowl-public.s3.eu-west-1.amazonaws.com/tutorial/trase-supply-chains.parquet and store it somewhere:
    $ du -h ~/supply-chains.parquet
    146M	/home/ubuntu/supply-chains.parquet
  3. Create a minimal example
    use datafusion::prelude::SessionContext;
    use deltalake::{operations::write::WriteBuilder, storage::DeltaObjectStore};
    use tempfile::TempDir;
    use std::{path::Path, collections::HashMap, sync::Arc};
    use url::Url;
    use log::info;
    
    
    #[tokio::main(flavor = "current_thread")]
    async fn main() -> Result<(), deltalake::DeltaTableError> {
        env_logger::init();
        let ctx = SessionContext::new();
        let state = ctx.state();
    
        // Create the external table pointing to a public parquet file
        ctx.sql("CREATE EXTERNAL TABLE supply_chains STORED AS PARQUET LOCATION '/home/ubuntu/supply-chains.parquet'")
            .await?;
    
        let scan = ctx.table_provider("supply_chains").await?.scan(&state, None, &[], None).await?;
    
        let tmp_dir = TempDir::new()?;
        let canonical_path = Path::new(&tmp_dir.path().display().to_string()).canonicalize()?;
        let url = Url::from_directory_path(canonical_path).unwrap();
        let object_store = Arc::from(DeltaObjectStore::try_new(url, HashMap::default())?);
    
        info!("Starting the write");
        let table = WriteBuilder::new()
            .with_input_execution_plan(scan)
            .with_input_session_state(state)
            .with_object_store(object_store)
            .await?;
    
        info!("Created table {table}");
    
        Ok(())
    }
    I used the following Cargo.toml:
    [package]
    name = "write-profiling"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    datafusion = "19.0.0"
    deltalake = { version = "0.8.0", features = ["datafusion"] }
    env_logger = "0.10.0"
    log = "0.4"
    tempfile = "3.4.0"
    tokio = "1.0"
    url = "2.3"
    
    [[bin]]
    name = "write_profiling"
    path = "main.rs"
  4. Run the profiler
    ~/write-profiling$ LD_PRELOAD=~/bytehound/target/release/libbytehound.so RUST_LOG=info ./target/debug/write_profiling
    [2023-03-16T15:48:02Z INFO  write_profiling] Starting the write
    [2023-03-16T15:52:15Z INFO  write_profiling] Created table DeltaTable(/tmp/.tmpCqMx0c/)
    	    version: 0
    	    metadata: GUID=2d7d73c4-c11e-451a-a09e-88509fb9b166, name=None, description=None, partitionColumns=[], createdTime=Some(1678981682662), configuration={}
    	    min_version: read=1, write=1
    	    files count: 2
  5. Run the bytehound server to take a look at the results: ~/bytehound/target/release/bytehound server -p 8888 memory-profiling_*.dat

The results I get:
slika

If I group by backtrace I get the two biggest offenders:
slika
(I suspect the first one could be misleading)

More details:

This is something we've previously had to deal with in Seafowl:

I suspect the biggest contributor to this is the fact that pending object store uploads are being buffered to memory, while the optimal solution is to first persist record batch outputs of a Datafusion pla to temporary Parquet files instead. In addition, these files should then be uploaded by loading them chunk by chunk and using ObjectStore::put_multipart on each chunk so as to avoid loading the entire file to memory.

@gruuya gruuya added the bug Something isn't working label Mar 16, 2023
@rtyler
Copy link
Member

rtyler commented Mar 16, 2023

Very interesting! Thanks for the detailed bug report @gruuya

@wjones127
Copy link
Collaborator

I'd definitely be in favor of writing incrementally with ObjectStore::put_multipart. I'd add that I've been meaning to benchmark and optimize that method; I naively set some parameters, like the chunk size and number of threads per upload, and they could likely be substantially improved.

@roeap
Copy link
Collaborator

roeap commented Mar 16, 2023

Once the optimistic commit PR has been merged, I planned to focus some attention to the write paths as well. There are some changes on the way or already integrated with datafusion around how writes are handled. Essentially so far the was no coincept of a sink within the logical / physical plans, and that either changed, or is about to . My hope was to integrate with that... The above analysis is great to help what to look out for...

@gruuya
Copy link
Contributor Author

gruuya commented Mar 18, 2023

Fwiw, I've revised the existing Seafowl parquet writing logic (which was already optimized) to perform Delta-writes instead, so here's a relevant data point to this issue. Trying it out on the above example (which is actually just a step in our tutorial) shows a 10x decrease in peak memory consumption (from above 1GB to always below 100MB):
slika

To re-cap, the optimizations that permit this are

  • Writing the incoming record batches from the plan execution directly to temp parquet files on disk. These are rotated after 1M rows (perhaps not ideal, but done for historical reasons).
  • Doing multipart upload from these temp parquet files for cloud object stores, with part size being ~5MB.
  • For local FS object stores skip put/put_multipart altogether, and simply do a rename of the temp files, moving them to the table's root directory.

In fact, the main logic found in plan_to_object_store could be a drop-in replacement for the Delta's parquet write/upload phase, save for the fact that we don't yet support partition columns there (which made this pivot a lot easier).

@roeap
Copy link
Collaborator

roeap commented May 28, 2023

#1397 Should hopefully make delta-rs a lot less memory hungry when writing. @gruuya, would it be possible to run this again once that PR landed?

@gruuya
Copy link
Contributor Author

gruuya commented May 29, 2023

Certainly! Will do some profiling and post the result here later on.

@gruuya
Copy link
Contributor Author

gruuya commented May 29, 2023

Unfortunately, that didn't make much difference:
slika

To streamline the memory profiling operation I've created a new repository that allows reproducing the profiling in three lines:

  1. git clone https://github.com/splitgraph/delta-rs-write-profiling.git
  2. curl -o supply-chains.parquet https://seafowl-public.s3.eu-west-1.amazonaws.com/tutorial/trase-supply-chains.parquet
  3. ./profile.sh /home/ubuntu/supply-chains.parquet

@gruuya
Copy link
Contributor Author

gruuya commented May 29, 2023

@roeap @wjones127 if it doesn't interfere with your plans I could try and re-purpose Seafowl's writing logic and open a delta-rs PR with that, as I have some free time atm?

This would basically mean re-factoring delta-rs write_execution_plan such that it incorporates all the optimizations from Seafowl's plan_to_object_store method.

@wjones127
Copy link
Collaborator

I think we'll have new optimization soon once some upstream changes are made in object-store and parquet creates. In object store, the next release will add support for multi-part uploads for Cloudflare R2, which is the only thing blocking us from switching over. In parquet, the next release will have a method for getting the size of the currently buffered row group, which will let us control the size of row groups (and in turn the memory overhead). That also won't require any temporary files.

Unfortunately, that didn't make much difference:

That's not unexpected. The changes should make it faster, not necessarily use less total memory. We'll need to do the changes described above to use less memory.

Are there optimizations beyond the ones described you think you can add?

@gruuya
Copy link
Contributor Author

gruuya commented May 29, 2023

In object store, the next release will add support for multi-part uploads for Cloudflare R2, which is the only thing blocking us from switching over.

Oh I didn't realize that, thanks!

That also won't require any temporary files.

Interesting, I'm quite curious how that would work; does this mean:

  • Uploading parts containing resulting row groups/record batches as they are generated from the plan? But then surely these need to be flushed to an actual Parquet file first, so that e.g. appropriate metadata is written upon closing the writer.
  • The other alternative is buffering the rows in memory first (as now) but that defeats the purpose of multipart upload?

Are there optimizations beyond the ones described you think you can add?

Apart from the above point not really.

@wjones127
Copy link
Collaborator

You can flush the row group to storage but the writer keeps the metadata in memory. At the end, the metadata is written when closing the file. So parquet files can be written out in a streaming fashion like this. They just can’t be appended to once they are closed.

@gruuya
Copy link
Contributor Author

gruuya commented May 30, 2023

Nice, looking forward to that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants