Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support writing hive partitioned parquet #17324

Merged
merged 1 commit into from
Jul 6, 2024

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Jul 1, 2024

df = pl.DataFrame(
    {
        "date1" : [    date(2024, 1, 1),     date(2024, 1, 2)],
        "date2" : [datetime(2024, 1, 1), datetime(2024, 1, 1)],
        "x"     : [                   1,                    2],
    }
)

df.write_parquet(".env/hive_parquet/", partition_by=["date1", "date2"])

# .env/hive_parquet/
# ├── date1=2024-01-01
# │   └── date2=2024-01-01%2000%3A00%3A00.000000
# │       └── 00000000.parquet
# └── date1=2024-01-02
#     └── date2=2024-01-01%2000%3A00%3A00.000000
#         └── 00000000.parquet

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Jul 1, 2024
Copy link

codecov bot commented Jul 1, 2024

Codecov Report

Attention: Patch coverage is 95.00000% with 8 lines in your changes missing coverage. Please review.

Project coverage is 80.60%. Comparing base (27ac6cc) to head (df69033).

Files Patch % Lines
py-polars/polars/dataframe/frame.py 54.54% 4 Missing and 1 partial ⚠️
crates/polars-io/src/partition.rs 98.86% 1 Missing ⚠️
.../polars-pipe/src/executors/sinks/output/parquet.rs 50.00% 1 Missing ⚠️
py-polars/src/dataframe/io.rs 97.05% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #17324      +/-   ##
==========================================
+ Coverage   80.58%   80.60%   +0.02%     
==========================================
  Files        1480     1480              
  Lines      193682   193822     +140     
  Branches     2765     2769       +4     
==========================================
+ Hits       156071   156224     +153     
+ Misses      37103    37089      -14     
- Partials      508      509       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@alexander-beedie
Copy link
Collaborator

Looking forward to this one! ✌️

@@ -438,6 +438,51 @@ impl PyDataFrame {
Ok(())
}

#[cfg(feature = "parquet")]
#[pyo3(signature = (py_f, partition_by, compression, compression_level, statistics, row_group_size, data_page_size))]
pub fn write_parquet_partitioned(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create the full write_partitioned_dataset in polars-io.

Python should not have more functionality than rust, so we should lower this and just dispatch here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created it - made a comment below

where
S: AsRef<str>,
{
for x in partition_by.iter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the partition length is large, we should first collect the schema, otherwise we have quadratic performance here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't realize before indexing the DataFrame by column name was linear access time

py-polars/polars/dataframe/frame.py Show resolved Hide resolved

return

assert pl.thread_pool_size() == 1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopped working and I'm not sure why, but this test is valid as long as the prefetch size is set to 1


let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups {
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| {
let part_df =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should iterate over DataFrames of a certain size. So that we don't write a single file per folder, but for large partitions many smaller parquet files.

I am not entirely sure how other tools determine the size of the parquet. We could split by n_rows where we use the estimated_size as hint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set it to 1 million rows per file for now

Copy link
Collaborator

@alexander-beedie alexander-beedie Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you know the schema at this point (or rather, the number of cols) it's better to target a given number of elements (rows x cols), as "rows" by itself is not a useful metric.

1 million rows with 1 col is a full three orders of magnitude removed from 1 million rows with 1000 cols 😆

Somewhere between 10-25 million elements is probably going to be a more consistent target 🤔 (and using estimated size is even more helpful to avoid edge-cases like large binary blobs).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've changed it slice the df into chunks of a target size

let Some(path) = paths.first() else {
return Ok(None);
};

let sep = separator(path);
let path_string = path.to_str().unwrap();

fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
let (k, v) = parse_hive_string(part)?;
let v = percent_encoding::percent_decode(v.as_bytes())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by - decode after splitting by =, otherwise we break when the value contains / or =

@@ -127,3 +128,107 @@ where
}
path
}

pub fn write_partitioned_dataset<S>(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created write_partitioned_dataset here in polars-io.

I was considering putting a fn write_parquet_partitioned into impl DataFrame, but I notice that on the rust side we don't have e.g. DataFrame::write_parquet and others, so I just made it a function like this

@nameexhaustion nameexhaustion marked this pull request as ready for review July 3, 2024 14:51
format!("{:013x}.parquet", i)
}

for (i, slice_start) in (0..part_df.height()).step_by(rows_per_file).enumerate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR we can see if we can speed this up, but enabling parallism/async here.

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, Great addition. I think we can experiment making this fast, but let's first get the core functionality in. 👍

@ritchie46 ritchie46 merged commit f0a82d9 into pola-rs:main Jul 6, 2024
27 checks passed
@c-peters c-peters added the accepted Ready for implementation label Jul 8, 2024
@nameexhaustion nameexhaustion deleted the hive-write branch July 8, 2024 12:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

4 participants