From 12f45ba393e076461648781145ac65896bdcdf2f Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 11 Jul 2023 00:53:53 +0800 Subject: [PATCH] GH-36284: [Python][Parquet] Support write page index in Python API (#36290) ### Rationale for this change Support `write_page_index` in Parquet Python API ### What changes are included in this PR? support `write_page_index` in properties ### Are these changes tested? Currently not ### Are there any user-facing changes? User can generate page index here. * Closes: #36284 Lead-authored-by: mwish Co-authored-by: Antoine Pitrou Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Alenka Frim Signed-off-by: Antoine Pitrou --- python/pyarrow/_parquet.pxd | 11 ++++++- python/pyarrow/_parquet.pyx | 30 +++++++++++++++---- python/pyarrow/parquet/core.py | 11 +++++++ python/pyarrow/tests/parquet/test_metadata.py | 15 ++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index a3db7ab8d3001..39cdcc063b503 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -300,6 +300,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: c_bool encrypted_with_footer_key() const const c_string& key_metadata() const + cdef cppclass ParquetIndexLocation" parquet::IndexLocation": + int64_t offset + int32_t length + cdef cppclass CColumnChunkMetaData" parquet::ColumnChunkMetaData": int64_t file_offset() const const c_string& file_path() const @@ -321,6 +325,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: int64_t total_compressed_size() const int64_t total_uncompressed_size() const unique_ptr[CColumnCryptoMetaData] crypto_metadata() const + optional[ParquetIndexLocation] GetColumnIndexLocation() const + optional[ParquetIndexLocation] GetOffsetIndexLocation() const cdef cppclass CRowGroupMetaData" parquet::RowGroupMetaData": c_bool Equals(const CRowGroupMetaData&) const @@ -420,6 +426,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* max_row_group_length(int64_t size) Builder* write_batch_size(int64_t batch_size) Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit) + Builder* enable_write_page_index() + Builder* disable_write_page_index() shared_ptr[WriterProperties] build() cdef cppclass ArrowWriterProperties: @@ -567,7 +575,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( data_page_version=*, FileEncryptionProperties encryption_properties=*, write_batch_size=*, - dictionary_pagesize_limit=*) except * + dictionary_pagesize_limit=*, + write_page_index=*) except * cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 2f53d5fbbaa34..70bbccb10e0a9 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -462,7 +462,7 @@ cdef class ColumnChunkMetaData(_Weakrefable): @property def dictionary_page_offset(self): - """Offset of dictionary page reglative to column chunk offset (int).""" + """Offset of dictionary page relative to column chunk offset (int).""" if self.has_dictionary_page: return self.metadata.dictionary_page_offset() else: @@ -470,7 +470,7 @@ cdef class ColumnChunkMetaData(_Weakrefable): @property def data_page_offset(self): - """Offset of data page reglative to column chunk offset (int).""" + """Offset of data page relative to column chunk offset (int).""" return self.metadata.data_page_offset() @property @@ -493,6 +493,16 @@ cdef class ColumnChunkMetaData(_Weakrefable): """Uncompressed size in bytes (int).""" return self.metadata.total_uncompressed_size() + @property + def has_offset_index(self): + """Whether the column chunk has an offset index""" + return self.metadata.GetOffsetIndexLocation().has_value() + + @property + def has_column_index(self): + """Whether the column chunk has a column index""" + return self.metadata.GetColumnIndexLocation().has_value() + cdef class RowGroupMetaData(_Weakrefable): """Metadata for a single row group.""" @@ -1455,7 +1465,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( data_page_version=None, FileEncryptionProperties encryption_properties=None, write_batch_size=None, - dictionary_pagesize_limit=None) except *: + dictionary_pagesize_limit=None, + write_page_index=False) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1599,6 +1610,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( # a size larger than this then it will be latched to this value. props.max_row_group_length(_MAX_ROW_GROUP_SIZE) + # page index + + if write_page_index: + props.enable_write_page_index() + else: + props.disable_write_page_index() + properties = props.build() return properties @@ -1710,7 +1728,8 @@ cdef class ParquetWriter(_Weakrefable): encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, - store_schema=True): + store_schema=True, + write_page_index=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1740,7 +1759,8 @@ cdef class ParquetWriter(_Weakrefable): data_page_version=data_page_version, encryption_properties=encryption_properties, write_batch_size=write_batch_size, - dictionary_pagesize_limit=dictionary_pagesize_limit + dictionary_pagesize_limit=dictionary_pagesize_limit, + write_page_index=write_page_index ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index bf5fc6b24f649..e0cdfee62ef4b 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -874,6 +874,13 @@ def _sanitize_table(table, new_schema, flavor): it will restore the timezone (Parquet only stores the UTC values without timezone), or columns with duration type will be restored from the int64 Parquet column. +write_page_index : bool, default False + Whether to write a page index in general for all columns. + Writing statistics to the page index disables the old method of writing + statistics to each data page header. The page index makes statistics-based + filtering more efficient than the page header, as it gathers all the + statistics for a Parquet file in a single place, avoiding scattered I/O. + Note that the page index is not yet used on the read size by PyArrow. """ _parquet_writer_example_doc = """\ @@ -966,6 +973,7 @@ def __init__(self, where, schema, filesystem=None, write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, + write_page_index=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1022,6 +1030,7 @@ def __init__(self, where, schema, filesystem=None, write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, + write_page_index=write_page_index, **options) self.is_open = True @@ -3084,6 +3093,7 @@ def write_table(table, where, row_group_size=None, version='2.6', write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, + write_page_index=False, **kwargs): # Implementor's note: when adding keywords here / updating defaults, also # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions @@ -3111,6 +3121,7 @@ def write_table(table, where, row_group_size=None, version='2.6', write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, + write_page_index=write_page_index, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 342fdb21aed56..a5f9f7e57b6f5 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -357,6 +357,21 @@ def test_field_id_metadata(): assert schema[5].metadata[field_id] == b'-1000' +def test_parquet_file_page_index(): + for write_page_index in (False, True): + table = pa.table({'a': [1, 2, 3]}) + + writer = pa.BufferOutputStream() + _write_table(table, writer, write_page_index=write_page_index) + reader = pa.BufferReader(writer.getvalue()) + + # Can retrieve sorting columns from metadata + metadata = pq.read_metadata(reader) + cc = metadata.row_group(0).column(0) + assert cc.has_offset_index is write_page_index + assert cc.has_column_index is write_page_index + + @pytest.mark.pandas def test_multi_dataset_metadata(tempdir): filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]