-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support writing hive partitioned parquet #17324
Conversation
nameexhaustion
commented
Jul 1, 2024
•
edited
Loading
edited
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #17324 +/- ##
==========================================
+ Coverage 80.58% 80.60% +0.02%
==========================================
Files 1480 1480
Lines 193682 193822 +140
Branches 2765 2769 +4
==========================================
+ Hits 156071 156224 +153
+ Misses 37103 37089 -14
- Partials 508 509 +1 ☔ View full report in Codecov by Sentry. |
86340fc
to
bd7a49a
Compare
Looking forward to this one! ✌️ |
bd7a49a
to
05a3790
Compare
@@ -438,6 +438,51 @@ impl PyDataFrame { | |||
Ok(()) | |||
} | |||
|
|||
#[cfg(feature = "parquet")] | |||
#[pyo3(signature = (py_f, partition_by, compression, compression_level, statistics, row_group_size, data_page_size))] | |||
pub fn write_parquet_partitioned( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create the full write_partitioned_dataset
in polars-io
.
Python should not have more functionality than rust, so we should lower this and just dispatch here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created it - made a comment below
crates/polars-io/src/partition.rs
Outdated
where | ||
S: AsRef<str>, | ||
{ | ||
for x in partition_by.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the partition length is large, we should first collect the schema, otherwise we have quadratic performance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I didn't realize before indexing the DataFrame by column name was linear access time
|
||
return | ||
|
||
assert pl.thread_pool_size() == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopped working and I'm not sure why, but this test is valid as long as the prefetch size is set to 1
|
||
let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups { | ||
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| { | ||
let part_df = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should iterate over DataFrame
s of a certain size. So that we don't write a single file per folder, but for large partitions many smaller parquet files.
I am not entirely sure how other tools determine the size of the parquet. We could split by n_rows
where we use the estimated_size
as hint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set it to 1 million rows per file for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you know the schema at this point (or rather, the number of cols) it's better to target a given number of elements (rows x cols), as "rows" by itself is not a useful metric.
1 million rows with 1 col is a full three orders of magnitude removed from 1 million rows with 1000 cols 😆
Somewhere between 10-25 million elements is probably going to be a more consistent target 🤔 (and using estimated size is even more helpful to avoid edge-cases like large binary blobs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I've changed it slice the df into chunks of a target size
29b61aa
to
1517147
Compare
1517147
to
8bd264b
Compare
let Some(path) = paths.first() else { | ||
return Ok(None); | ||
}; | ||
|
||
let sep = separator(path); | ||
let path_string = path.to_str().unwrap(); | ||
|
||
fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> { | ||
let (k, v) = parse_hive_string(part)?; | ||
let v = percent_encoding::percent_decode(v.as_bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by - decode after splitting by =
, otherwise we break when the value contains /
or =
@@ -127,3 +128,107 @@ where | |||
} | |||
path | |||
} | |||
|
|||
pub fn write_partitioned_dataset<S>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created write_partitioned_dataset
here in polars-io
.
I was considering putting a fn write_parquet_partitioned
into impl DataFrame
, but I notice that on the rust side we don't have e.g. DataFrame::write_parquet
and others, so I just made it a function like this
format!("{:013x}.parquet", i) | ||
} | ||
|
||
for (i, slice_start) in (0..part_df.height()).step_by(rows_per_file).enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a future PR we can see if we can speed this up, but enabling parallism/async here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, Great addition. I think we can experiment making this fast, but let's first get the core functionality in. 👍
7f62043
to
df69033
Compare