-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Improve parallelism in writing hive parquet #17512
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #17512 +/- ##
==========================================
+ Coverage 80.48% 80.51% +0.03%
==========================================
Files 1483 1483
Lines 195118 195234 +116
Branches 2778 2778
==========================================
+ Hits 157039 157193 +154
+ Misses 37568 37530 -38
Partials 511 511 ☔ View full report in Codecov by Sentry. |
I think we should maybe add a semaphore to ensure we don't open to many files at once. |
POOL.install(|| match groups { | ||
GroupsProxy::Idx(idx) => idx | ||
.all() | ||
.chunks(MAX_OPEN_FILES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do any simple semaphore logic because of Rayon's work-stealing, so I've just applied the limit by batching the groups instead.
I tried other ways, but it was very rough 🥲. It's basically impossible to apply backpressure with Rayon - you can't really tell it not to steal/execute certain tasks 🤕.
@@ -722,7 +722,8 @@ impl BatchedParquetReader { | |||
use_statistics, | |||
hive_partition_columns.as_deref(), | |||
); | |||
tx.send((dfs, rows_read, limit)).unwrap(); | |||
// Don't unwrap, async task could be cancelled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by - not related to this PR, but I got a CI failure from this
|
||
let cols = df.get_columns(); | ||
let get_n_files_and_rows_per_file = |part_df: &DataFrame| { | ||
let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced the max number of files per-directory from 4_503_599_627_370_495
to 4_294_967_295
, don't think we needed that many 😅. We now use 8 characters instead of 13 for the file name, which should help with sorting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, I doubt we'll hit that in our lifetime. :')
d2d746e
to
c13d673
Compare
@@ -9,5 +9,7 @@ pub use crate::json::*; | |||
pub use crate::ndjson::core::*; | |||
#[cfg(feature = "parquet")] | |||
pub use crate::parquet::{metadata::*, read::*, write::*}; | |||
#[cfg(feature = "parquet")] | |||
pub use crate::partition::write_partitioned_dataset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by - forgot to re-export this for rust users
c52207a
to
2c70546
Compare
is hive writing exposed on python side somewhere or still only rust? |
It is in the python writers. See |
No description provided.