diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8f7b514ccf71..db5d4502cfec 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; +use crate::file::metadata::{KeyValue, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -489,11 +489,6 @@ impl PageWriter for ArrowPageWriter { Ok(spec) } - fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> { - // Skip writing metadata as won't be copied anyway - Ok(()) - } - fn close(&mut self) -> Result<()> { Ok(()) } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 947a633f48a2..585d1951c14a 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use crate::basic::{Encoding, PageType}; use crate::errors::{ParquetError, Result}; -use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics}; +use crate::file::statistics::Statistics; use crate::format::PageHeader; /// Parquet Page definition. @@ -350,12 +350,6 @@ pub trait PageWriter: Send { /// either data page or dictionary page. fn write_page(&mut self, page: CompressedPage) -> Result; - /// Writes column chunk metadata into the output stream/sink. - /// - /// This method is called once before page writer is closed, normally when writes are - /// finalised in column writer. - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>; - /// Closes resources and flushes underlying sink. /// Page writer should not be used after this method is called. fn close(&mut self) -> Result<()>; diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 54d8fd3cc13e..519a219f943d 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -579,7 +579,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.write_dictionary_page()?; } self.flush_data_pages()?; - let metadata = self.write_column_metadata()?; + let metadata = self.build_column_metadata()?; self.page_writer.close()?; let boundary_order = match ( @@ -1041,8 +1041,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(()) } - /// Assembles and writes column chunk metadata. - fn write_column_metadata(&mut self) -> Result { + /// Assembles column chunk metadata. + fn build_column_metadata(&mut self) -> Result { let total_compressed_size = self.column_metrics.total_compressed_size as i64; let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64; let num_values = self.column_metrics.total_num_values as i64; @@ -1050,15 +1050,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // If data page offset is not set, then no pages have been written let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64; - let file_offset = match dict_page_offset { - Some(dict_offset) => dict_offset + total_compressed_size, - None => data_page_offset + total_compressed_size, - }; - let mut builder = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings(self.encodings.iter().cloned().collect()) - .set_file_offset(file_offset) .set_total_compressed_size(total_compressed_size) .set_total_uncompressed_size(total_uncompressed_size) .set_num_values(num_values) @@ -1138,8 +1132,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } let metadata = builder.build()?; - self.page_writer.write_metadata(&metadata)?; - Ok(metadata) } @@ -3589,10 +3581,6 @@ mod tests { Ok(res) } - fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> { - Ok(()) - } - fn close(&mut self) -> Result<()> { Ok(()) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index cd3555de828c..48bc97bb6efc 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -682,7 +682,12 @@ impl ColumnChunkMetaData { self.file_path.as_deref() } - /// Byte offset in `file_path()`. + /// Byte offset of `ColumnMetaData` in `file_path()`. + /// + /// Note that the meaning of this field has been inconsistent between implementations + /// so its use has since been deprecated in the Parquet specification. Modern implementations + /// will set this to `0` to indicate that the `ColumnMetaData` is solely contained in the + /// `ColumnChunk` struct. pub fn file_offset(&self) -> i64 { self.file_offset } @@ -1040,6 +1045,14 @@ impl ColumnChunkMetaDataBuilder { } /// Sets file offset in bytes. + /// + /// This field was meant to provide an alternate to storing `ColumnMetadata` directly in + /// the `ColumnChunkMetadata`. However, most Parquet readers assume the `ColumnMetadata` + /// is stored inline and ignore this field. + #[deprecated( + since = "53.0.0", + note = "The Parquet specification requires this field to be 0" + )] pub fn set_file_offset(mut self, value: i64) -> Self { self.0.file_offset = value; self @@ -1453,7 +1466,6 @@ mod tests { let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) .set_encodings(vec![Encoding::PLAIN, Encoding::RLE]) .set_file_path("file_path".to_owned()) - .set_file_offset(100) .set_num_values(1000) .set_compression(Compression::SNAPPY) .set_total_compressed_size(2000) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index f2e8f74a378c..f6d8910cfc7f 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -649,13 +649,10 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { )); } - let file_offset = self.buf.bytes_written() as i64; - let map_offset = |x| x - src_offset + write_offset as i64; let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr()) .set_compression(metadata.compression()) .set_encodings(metadata.encodings().clone()) - .set_file_offset(file_offset) .set_total_compressed_size(metadata.compressed_size()) .set_total_uncompressed_size(metadata.uncompressed_size()) .set_num_values(metadata.num_values()) @@ -680,7 +677,6 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { } } - SerializedPageWriter::new(self.buf).write_metadata(&metadata)?; let (_, on_close) = self.get_on_close(); on_close(close) } @@ -808,14 +804,6 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { - let mut protocol = TCompactOutputProtocol::new(&mut self.sink); - metadata - .to_column_metadata_thrift() - .write_to_out_protocol(&mut protocol)?; - Ok(()) - } - fn close(&mut self) -> Result<()> { self.sink.flush()?; Ok(())