-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support writing hive partitioned parquet #17324
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ use polars_core::series::IsSorted; | |
use polars_core::POOL; | ||
use rayon::prelude::*; | ||
|
||
use crate::parquet::write::ParquetWriteOptions; | ||
use crate::utils::resolve_homedir; | ||
use crate::WriterFactory; | ||
|
||
|
@@ -127,3 +128,111 @@ where | |
} | ||
path | ||
} | ||
|
||
pub fn write_partitioned_dataset<S>( | ||
df: &DataFrame, | ||
path: &Path, | ||
partition_by: &[S], | ||
file_write_options: &ParquetWriteOptions, | ||
chunk_size: usize, | ||
) -> PolarsResult<()> | ||
where | ||
S: AsRef<str>, | ||
{ | ||
let base_path = path; | ||
|
||
for (path_part, part_df) in get_hive_partitions_iter(df, partition_by)? { | ||
let dir = base_path.join(path_part); | ||
std::fs::create_dir_all(&dir)?; | ||
|
||
let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xf_ffff_ffff_ffff); | ||
let rows_per_file = (df.height() / n_files).saturating_add(1); | ||
|
||
fn get_path_for_index(i: usize) -> String { | ||
// Use a fixed-width file name so that it sorts properly. | ||
format!("{:013x}.parquet", i) | ||
} | ||
|
||
for (i, slice_start) in (0..part_df.height()).step_by(rows_per_file).enumerate() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a future PR we can see if we can speed this up, but enabling parallism/async here. |
||
let f = std::fs::File::create(dir.join(get_path_for_index(i)))?; | ||
|
||
file_write_options | ||
.to_writer(f) | ||
.finish(&mut part_df.slice(slice_start as i64, rows_per_file))?; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Creates an iterator of (hive partition path, DataFrame) pairs, e.g.: | ||
/// ("a=1/b=1", DataFrame) | ||
fn get_hive_partitions_iter<'a, S>( | ||
df: &'a DataFrame, | ||
partition_by: &'a [S], | ||
) -> PolarsResult<Box<dyn Iterator<Item = (String, DataFrame)> + 'a>> | ||
where | ||
S: AsRef<str>, | ||
{ | ||
let schema = df.schema(); | ||
|
||
let partition_by_col_idx = partition_by | ||
.iter() | ||
.map(|x| { | ||
let Some(i) = schema.index_of(x.as_ref()) else { | ||
polars_bail!(ColumnNotFound: "{}", x.as_ref()) | ||
}; | ||
Ok(i) | ||
}) | ||
.collect::<PolarsResult<Vec<_>>>()?; | ||
|
||
let get_hive_path_part = move |df: &DataFrame| { | ||
const CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS | ||
.add(b'/') | ||
.add(b'=') | ||
.add(b':') | ||
.add(b' '); | ||
|
||
let cols = df.get_columns(); | ||
|
||
partition_by_col_idx | ||
.iter() | ||
.map(|&i| { | ||
let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap(); | ||
|
||
format!( | ||
"{}={}", | ||
s.name(), | ||
percent_encoding::percent_encode( | ||
s.str() | ||
.unwrap() | ||
.get(0) | ||
.unwrap_or("__HIVE_DEFAULT_PARTITION__") | ||
.as_bytes(), | ||
CHAR_SET | ||
) | ||
) | ||
}) | ||
.collect::<Vec<_>>() | ||
.join("/") | ||
}; | ||
|
||
let groups = df.group_by(partition_by)?; | ||
let groups = groups.take_groups(); | ||
|
||
let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups { | ||
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| { | ||
let part_df = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should iterate over I am not entirely sure how other tools determine the size of the parquet. We could split by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I set it to 1 million rows per file for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you know the schema at this point (or rather, the number of cols) it's better to target a given number of elements (rows x cols), as "rows" by itself is not a useful metric. 1 million rows with 1 col is a full three orders of magnitude removed from 1 million rows with 1000 cols 😆 Somewhere between 10-25 million elements is probably going to be a more consistent target 🤔 (and using estimated size is even more helpful to avoid edge-cases like large binary blobs). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, I've changed it slice the df into chunks of a target size |
||
unsafe { df._take_unchecked_slice_sorted(&group, false, IsSorted::Ascending) }; | ||
(get_hive_path_part(&part_df), part_df) | ||
})), | ||
GroupsProxy::Slice { groups, .. } => { | ||
Box::new(groups.into_iter().map(move |[offset, len]| { | ||
let part_df = df.slice(offset as i64, len as usize); | ||
(get_hive_path_part(&part_df), part_df) | ||
})) | ||
}, | ||
}; | ||
|
||
Ok(out) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,5 @@ | ||
use std::path::{Path, PathBuf}; | ||
|
||
use percent_encoding::percent_decode; | ||
use polars_core::error::to_compute_err; | ||
use polars_core::prelude::*; | ||
use polars_io::predicates::{BatchStats, ColumnStats}; | ||
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema}; | ||
|
@@ -68,26 +66,22 @@ pub fn hive_partitions_from_paths( | |
reader_schema: &Schema, | ||
try_parse_dates: bool, | ||
) -> PolarsResult<Option<Arc<[HivePartitions]>>> { | ||
let paths = paths | ||
.iter() | ||
.map(|x| { | ||
Ok(PathBuf::from( | ||
percent_decode(x.to_str().unwrap().as_bytes()) | ||
.decode_utf8() | ||
.map_err(to_compute_err)? | ||
.as_ref(), | ||
)) | ||
}) | ||
.collect::<PolarsResult<Vec<PathBuf>>>()?; | ||
let paths = paths.as_slice(); | ||
|
||
let Some(path) = paths.first() else { | ||
return Ok(None); | ||
}; | ||
|
||
let sep = separator(path); | ||
let path_string = path.to_str().unwrap(); | ||
|
||
fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> { | ||
let (k, v) = parse_hive_string(part)?; | ||
let v = percent_encoding::percent_decode(v.as_bytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drive-by - decode after splitting by |
||
.decode_utf8() | ||
.ok()?; | ||
|
||
Some((k, v)) | ||
} | ||
|
||
macro_rules! get_hive_parts_iter { | ||
($e:expr) => {{ | ||
let path_parts = $e[hive_start_idx..].split(sep); | ||
|
@@ -97,7 +91,8 @@ pub fn hive_partitions_from_paths( | |
if index == file_index { | ||
return None; | ||
} | ||
parse_hive_string(part) | ||
|
||
parse_hive_string_and_decode(part) | ||
}) | ||
}}; | ||
} | ||
|
@@ -158,7 +153,7 @@ pub fn hive_partitions_from_paths( | |
continue; | ||
} | ||
|
||
entry.insert(infer_field_schema(value, try_parse_dates, false)); | ||
entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false)); | ||
} | ||
} | ||
|
||
|
@@ -264,7 +259,7 @@ fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> { | |
// Files are not Hive partitions, so globs are not valid. | ||
if value.contains('*') { | ||
return None; | ||
} | ||
}; | ||
|
||
Some((name, value)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created
write_partitioned_dataset
here inpolars-io
.I was considering putting a
fn write_parquet_partitioned
intoimpl DataFrame
, but I notice that on the rust side we don't have e.g.DataFrame::write_parquet
and others, so I just made it a function like this