Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support writing hive partitioned parquet #17324

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod ndjson;
mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
#[cfg(feature = "partition")]
#[cfg(feature = "parquet")]
pub mod partition;
#[cfg(feature = "async")]
pub mod pl_async;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ParquetWriteOptions {
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}
Expand Down
16 changes: 15 additions & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,23 @@ use polars_parquet::write::{

use super::batched_writer::BatchedWriter;
use super::options::ParquetCompression;
use super::ParquetWriteOptions;
use crate::prelude::chunk_df_for_writing;
use crate::shared::schema_to_arrow_checked;

impl ParquetWriteOptions {
pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
where
F: Write,
{
ParquetWriter::new(f)
.with_compression(self.compression)
.with_statistics(self.statistics)
.with_row_group_size(self.row_group_size)
.with_data_page_size(self.data_page_size)
}
}

/// Write a DataFrame to Parquet format.
#[must_use]
pub struct ParquetWriter<W> {
Expand Down Expand Up @@ -103,7 +117,7 @@ where
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
data_page_size: self.data_page_size,
}
}

Expand Down
109 changes: 109 additions & 0 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::parquet::write::ParquetWriteOptions;
use crate::utils::resolve_homedir;
use crate::WriterFactory;

Expand Down Expand Up @@ -127,3 +128,111 @@ where
}
path
}

pub fn write_partitioned_dataset<S>(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created write_partitioned_dataset here in polars-io.

I was considering putting a fn write_parquet_partitioned into impl DataFrame, but I notice that on the rust side we don't have e.g. DataFrame::write_parquet and others, so I just made it a function like this

df: &DataFrame,
path: &Path,
partition_by: &[S],
file_write_options: &ParquetWriteOptions,
chunk_size: usize,
) -> PolarsResult<()>
where
S: AsRef<str>,
{
let base_path = path;

for (path_part, part_df) in get_hive_partitions_iter(df, partition_by)? {
let dir = base_path.join(path_part);
std::fs::create_dir_all(&dir)?;

let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xf_ffff_ffff_ffff);
let rows_per_file = (df.height() / n_files).saturating_add(1);

fn get_path_for_index(i: usize) -> String {
// Use a fixed-width file name so that it sorts properly.
format!("{:013x}.parquet", i)
}

for (i, slice_start) in (0..part_df.height()).step_by(rows_per_file).enumerate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR we can see if we can speed this up, but enabling parallism/async here.

let f = std::fs::File::create(dir.join(get_path_for_index(i)))?;

file_write_options
.to_writer(f)
.finish(&mut part_df.slice(slice_start as i64, rows_per_file))?;
}
}

Ok(())
}

/// Creates an iterator of (hive partition path, DataFrame) pairs, e.g.:
/// ("a=1/b=1", DataFrame)
fn get_hive_partitions_iter<'a, S>(
df: &'a DataFrame,
partition_by: &'a [S],
) -> PolarsResult<Box<dyn Iterator<Item = (String, DataFrame)> + 'a>>
where
S: AsRef<str>,
{
let schema = df.schema();

let partition_by_col_idx = partition_by
.iter()
.map(|x| {
let Some(i) = schema.index_of(x.as_ref()) else {
polars_bail!(ColumnNotFound: "{}", x.as_ref())
};
Ok(i)
})
.collect::<PolarsResult<Vec<_>>>()?;

let get_hive_path_part = move |df: &DataFrame| {
const CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
.add(b'/')
.add(b'=')
.add(b':')
.add(b' ');

let cols = df.get_columns();

partition_by_col_idx
.iter()
.map(|&i| {
let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();

format!(
"{}={}",
s.name(),
percent_encoding::percent_encode(
s.str()
.unwrap()
.get(0)
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
.as_bytes(),
CHAR_SET
)
)
})
.collect::<Vec<_>>()
.join("/")
};

let groups = df.group_by(partition_by)?;
let groups = groups.take_groups();

let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups {
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| {
let part_df =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should iterate over DataFrames of a certain size. So that we don't write a single file per folder, but for large partitions many smaller parquet files.

I am not entirely sure how other tools determine the size of the parquet. We could split by n_rows where we use the estimated_size as hint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set it to 1 million rows per file for now

Copy link
Collaborator

@alexander-beedie alexander-beedie Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you know the schema at this point (or rather, the number of cols) it's better to target a given number of elements (rows x cols), as "rows" by itself is not a useful metric.

1 million rows with 1 col is a full three orders of magnitude removed from 1 million rows with 1000 cols 😆

Somewhere between 10-25 million elements is probably going to be a more consistent target 🤔 (and using estimated size is even more helpful to avoid edge-cases like large binary blobs).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've changed it slice the df into chunks of a target size

unsafe { df._take_unchecked_slice_sorted(&group, false, IsSorted::Ascending) };
(get_hive_path_part(&part_df), part_df)
})),
GroupsProxy::Slice { groups, .. } => {
Box::new(groups.into_iter().map(move |[offset, len]| {
let part_df = df.slice(offset as i64, len as usize);
(get_hive_path_part(&part_df), part_df)
}))
},
};

Ok(out)
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct WriteOptions {
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if None
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
}

use arrow::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -298,7 +298,7 @@ pub fn array_to_pages(
let byte_size = estimated_bytes_size(primitive_array);

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ParquetSink {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
.with_compression(options.compression)
.with_data_page_size(options.data_pagesize_limit)
.with_data_page_size(options.data_page_size)
.with_statistics(options.statistics)
.with_row_group_size(options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down Expand Up @@ -154,7 +154,7 @@ impl ParquetCloudSink {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_pagesize_limit)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down
31 changes: 13 additions & 18 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::path::{Path, PathBuf};

use percent_encoding::percent_decode;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_io::predicates::{BatchStats, ColumnStats};
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
Expand Down Expand Up @@ -68,26 +66,22 @@ pub fn hive_partitions_from_paths(
reader_schema: &Schema,
try_parse_dates: bool,
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let paths = paths
.iter()
.map(|x| {
Ok(PathBuf::from(
percent_decode(x.to_str().unwrap().as_bytes())
.decode_utf8()
.map_err(to_compute_err)?
.as_ref(),
))
})
.collect::<PolarsResult<Vec<PathBuf>>>()?;
let paths = paths.as_slice();

let Some(path) = paths.first() else {
return Ok(None);
};

let sep = separator(path);
let path_string = path.to_str().unwrap();

fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
let (k, v) = parse_hive_string(part)?;
let v = percent_encoding::percent_decode(v.as_bytes())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by - decode after splitting by =, otherwise we break when the value contains / or =

.decode_utf8()
.ok()?;

Some((k, v))
}

macro_rules! get_hive_parts_iter {
($e:expr) => {{
let path_parts = $e[hive_start_idx..].split(sep);
Expand All @@ -97,7 +91,8 @@ pub fn hive_partitions_from_paths(
if index == file_index {
return None;
}
parse_hive_string(part)

parse_hive_string_and_decode(part)
})
}};
}
Expand Down Expand Up @@ -158,7 +153,7 @@ pub fn hive_partitions_from_paths(
continue;
}

entry.insert(infer_field_schema(value, try_parse_dates, false));
entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
}
}

Expand Down Expand Up @@ -264,7 +259,7 @@ fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
// Files are not Hive partitions, so globs are not valid.
if value.contains('*') {
return None;
}
};

Some((name, value))
}
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ fn integration_write(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let encodings = schema
Expand Down
4 changes: 2 additions & 2 deletions crates/polars/tests/it/io/parquet/arrow/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn pages(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let pages1 = [array11, array12, array13]
Expand Down Expand Up @@ -82,7 +82,7 @@ fn read_with_indexes(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let to_compressed = |pages: Vec<Page>| {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn round_trip_opt_stats(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn round_trip(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
Loading