Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No longer write Parquet column metadata after column chunks *and* in the footer #6117

Merged
merged 21 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
741bbf6
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
BugenZhao Jul 16, 2024
8f76248
Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily acciden…
XiangpengHao Jul 16, 2024
bb5f12b
Make display of interval types more pretty (#6006)
Rachelint Jul 16, 2024
756b1fb
Update snafu (#5930)
Jesse-Bakker Jul 16, 2024
fe04e09
Update Parquet thrift generated structures (#6045)
etseidl Jul 16, 2024
2e7f7ef
Revert "Revert "Write Bloom filters between row groups instead of the…
alamb Jul 16, 2024
effccc1
Revert "Update snafu (#5930)" (#6069)
alamb Jul 16, 2024
649d09d
Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
crepererum Jul 17, 2024
05e681d
remove repeated codes to make the codes more concise. (#6080)
Rachelint Jul 18, 2024
e40b311
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
etseidl Jul 19, 2024
81c34ac
Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
dependabot[bot] Jul 23, 2024
3bc9987
Deprecate read_page_locations() and simplify offset index in `Parquet…
etseidl Jul 23, 2024
20e11ec
no longer write inline column metadata
etseidl Jul 25, 2024
095130f
Merge remote-tracking branch 'apache/master' into 53.0.0-dev
alamb Jul 25, 2024
a6353d1
Update parquet/src/column/writer/mod.rs
alamb Jul 25, 2024
d122b1f
Merge remote-tracking branch 'origin/53.0.0-dev' into no_column_meta
etseidl Jul 25, 2024
957499d
suggestion from review
etseidl Jul 26, 2024
a033e43
add some more documentation
etseidl Jul 26, 2024
571ce65
Merge remote-tracking branch 'origin/master' into no_column_meta
etseidl Jul 26, 2024
07f9a1d
Merge remote-tracking branch 'origin/master' into no_column_meta
etseidl Jul 26, 2024
444b14f
remove write_metadata from PageWriter
etseidl Jul 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::metadata::{KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -489,11 +489,6 @@ impl PageWriter for ArrowPageWriter {
Ok(spec)
}

fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> {
// Skip writing metadata as won't be copied anyway
Ok(())
}

fn close(&mut self) -> Result<()> {
Ok(())
}
Expand Down
8 changes: 1 addition & 7 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;

use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::file::statistics::Statistics;
use crate::format::PageHeader;

/// Parquet Page definition.
Expand Down Expand Up @@ -350,12 +350,6 @@ pub trait PageWriter: Send {
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;

/// Writes column chunk metadata into the output stream/sink.
///
/// This method is called once before page writer is closed, normally when writes are
/// finalised in column writer.
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>;

/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
fn close(&mut self) -> Result<()>;
Expand Down
18 changes: 3 additions & 15 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.write_dictionary_page()?;
}
self.flush_data_pages()?;
let metadata = self.write_column_metadata()?;
let metadata = self.build_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
Expand Down Expand Up @@ -1041,24 +1041,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(())
}

/// Assembles and writes column chunk metadata.
fn write_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
/// Assembles column chunk metadata.
fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
let total_compressed_size = self.column_metrics.total_compressed_size as i64;
let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
let num_values = self.column_metrics.total_num_values as i64;
let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
// If data page offset is not set, then no pages have been written
let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;

let file_offset = match dict_page_offset {
Some(dict_offset) => dict_offset + total_compressed_size,
None => data_page_offset + total_compressed_size,
};

let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(self.encodings.iter().cloned().collect())
.set_file_offset(file_offset)
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
.set_num_values(num_values)
Expand Down Expand Up @@ -1138,8 +1132,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

let metadata = builder.build()?;
self.page_writer.write_metadata(&metadata)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major functional change I think -- I expect it would result in slightly smaller parquet files as the per column metadata is no longer written twice

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac is here some impl using this metadata? Would a flag being better here? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has been discussed in the ML and should be safe to remove this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://lists.apache.org/thread/r6r2cvzrdoorq6h6gqwh0b1hbfbhxv29
apache/parquet-format#440

Would you mind find the detail message for that? I'll linking it to C++ issues there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best evidence that no readers use the copy outside the footer is this https://lists.apache.org/thread/ob2szsb0cv6fpwmvknss9jot1oqnxzsp

If any readers were using the alternate metadata they would have submitted bug reports by now 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've sync with gang and I got the info that Parquet-Java doesn't write that and doesn't impl doesn't read that, let's rush forward


Ok(metadata)
}

Expand Down Expand Up @@ -3589,10 +3581,6 @@ mod tests {
Ok(res)
}

fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> {
Ok(())
}

fn close(&mut self) -> Result<()> {
Ok(())
}
Expand Down
16 changes: 14 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,12 @@ impl ColumnChunkMetaData {
self.file_path.as_deref()
}

/// Byte offset in `file_path()`.
/// Byte offset of `ColumnMetaData` in `file_path()`.
///
/// Note that the meaning of this field has been inconsistent between implementations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// so its use has since been deprecated in the Parquet specification. Modern implementations
/// will set this to `0` to indicate that the `ColumnMetaData` is solely contained in the
/// `ColumnChunk` struct.
pub fn file_offset(&self) -> i64 {
self.file_offset
}
Expand Down Expand Up @@ -1040,6 +1045,14 @@ impl ColumnChunkMetaDataBuilder {
}

/// Sets file offset in bytes.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
///
/// This field was meant to provide an alternate to storing `ColumnMetadata` directly in
/// the `ColumnChunkMetadata`. However, most Parquet readers assume the `ColumnMetadata`
/// is stored inline and ignore this field.
#[deprecated(
since = "53.0.0",
note = "The Parquet specification requires this field to be 0"
)]
pub fn set_file_offset(mut self, value: i64) -> Self {
self.0.file_offset = value;
self
Expand Down Expand Up @@ -1453,7 +1466,6 @@ mod tests {
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
.set_file_path("file_path".to_owned())
.set_file_offset(100)
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
Expand Down
12 changes: 0 additions & 12 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,10 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
));
}

let file_offset = self.buf.bytes_written() as i64;

let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
.set_compression(metadata.compression())
.set_encodings(metadata.encodings().clone())
.set_file_offset(file_offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below in line 677 the metadata is still written to self.buf, should that also be skipped? And maybe the whole PageWriter::write_metadata method could be removed or deprecated?

Copy link
Contributor Author

@etseidl etseidl Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe this needs to be made clearer in the documentation. The spec still requires the field to be written so old readers don't break (i.e. it is still marked required in the thrift IDL). The correct behavior now is to write a 0 here.

Oh, I misunderstood your comment...the write below is correct as this is writing the copy of the ColumnMetaData in the footer. The change in this PR is to not also write a copy of ColumnMetaData at the end of each column chunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the write below is correct as this is writing the copy of the ColumnMetaData in the footer

The code is hard to follow, but my understanding is that this method copies an existing ColumnChunk from a ChunkReader. The buf seems to be the same that page data is written too, and I assume that append_column could be called multiple times . The metadata seems to also get collected inside the on_close closure and will in the end be written by SerializedFileWriter::write_metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this code:

close.metadata = builder.build()?;

Sets the metadata that will be eventually written into the file footer.

This code, perhaps what @jhorstmann is referring to (the line numbers appear to have changed):

        SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;

Also seems to write the contents of close.metadata into the page.

In this case, it does seem like it could be avoided too

Copy link
Contributor Author

@etseidl etseidl Jul 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, I had somehow convinced myself that this method was in the normal write path and that the write in question was necessary :( I think I need to add a check that file_offset is 0 to several tests. And also check for gaps between column chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that rippled a bit. 😅 Let me know if this is ok, or if write_metadata should be deprecated first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing write_metadata looks good to me

.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
Expand All @@ -680,7 +677,6 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
}
}

SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;
let (_, on_close) = self.get_on_close();
on_close(close)
}
Expand Down Expand Up @@ -808,14 +804,6 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
Ok(spec)
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
.to_column_metadata_thrift()
.write_to_out_protocol(&mut protocol)?;
Ok(())
}

fn close(&mut self) -> Result<()> {
self.sink.flush()?;
Ok(())
Expand Down
Loading