diff --git a/requirements-dev.txt b/requirements-dev.txt index 3e5826ec3d..9559af2d55 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,9 +1,10 @@ autograd flake8 +fsspec;python_version > "3.6" and sys_platform != "win32" jax>=0.2.7;sys_platform != "win32" jaxlib>=0.1.57,!=0.1.68;sys_platform != "win32" numba>=0.50.0 numexpr pandas>=0.24.0 -pyarrow>=6.0.0;sys_platform != "win32" +pyarrow>=7.0.0;python_version > "3.6" and sys_platform != "win32" PyYAML diff --git a/src/awkward/_v2/_connect/pyarrow.py b/src/awkward/_v2/_connect/pyarrow.py index 43f82a9b41..718e512705 100644 --- a/src/awkward/_v2/_connect/pyarrow.py +++ b/src/awkward/_v2/_connect/pyarrow.py @@ -27,10 +27,10 @@ else: if ak._v2._util.parse_version(pyarrow.__version__) < ak._v2._util.parse_version( - "6.0.0" + "7.0.0" ): pyarrow = None - error_message = "pyarrow 6.0.0 or later required for {0}" + error_message = "pyarrow 7.0.0 or later required for {0}" def import_pyarrow(name): @@ -904,6 +904,7 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): buffers = obj.buffers() awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(obj.type) + out = popbuffers( obj, awkwardarrow_type, storage_type, buffers, generate_bitmasks ) @@ -924,65 +925,94 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): return ak._v2.operations.structure.concatenate(layouts, highlevel=False) elif isinstance(obj, pyarrow.lib.RecordBatch): - child_array = [] - for i in range(obj.num_columns): - layout = handle_arrow(obj.column(i), generate_bitmasks) - if not obj.schema.field(i).nullable: - child_array.append(remove_optiontype(layout)) - else: - child_array.append(layout) - if pass_empty_field and list(obj.schema.names) == [""]: - return child_array[0] + layout = handle_arrow(obj.column(0), generate_bitmasks) + if not obj.schema.field(0).nullable: + return remove_optiontype(layout) + else: + return layout else: - return ak._v2.contents.RecordArray( - child_array, obj.schema.names, length=len(obj) + record_is_optiontype = False + optiontype_fields = [] + optiontype_parameters = None + recordtype_parameters = None + if ( + obj.schema.metadata is not None + and b"ak:parameters" in obj.schema.metadata + ): + for x in json.loads(obj.schema.metadata[b"ak:parameters"]): + (key,) = x.keys() + (value,) = x.values() + if key == "optiontype_fields": + optiontype_fields = value + elif key in ( + "UnmaskedArray", + "BitMaskedArray", + "ByteMaskedArray", + "IndexedOptionArray", + ): + record_is_optiontype = True + optiontype_parameters = value + elif key == "RecordArray": + recordtype_parameters = value + + record_mask = None + contents = [] + for i in range(obj.num_columns): + field = obj.schema.field(i) + layout = handle_arrow(obj.column(i), generate_bitmasks) + if record_is_optiontype: + if record_mask is None: + record_mask = layout.mask_as_bool(valid_when=False) + else: + record_mask &= layout.mask_as_bool(valid_when=False) + if ( + record_is_optiontype and field.name not in optiontype_fields + ) or not field.nullable: + contents.append(remove_optiontype(layout)) + else: + contents.append(layout) + + out = ak._v2.contents.RecordArray( + contents, + obj.schema.names, + length=len(obj), + parameters=recordtype_parameters, ) + if record_is_optiontype and record_mask is None and generate_bitmasks: + record_mask = numpy.zeros(len(out), dtype=np.bool_) + + if record_is_optiontype and record_mask is None: + return ak._v2.contents.UnmaskedArray( + out, parameters=optiontype_parameters + ) + + elif record_is_optiontype: + return ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(record_mask), + out, + valid_when=False, + parameters=optiontype_parameters, + ) + + else: + return out + elif isinstance(obj, pyarrow.lib.Table): batches = obj.combine_chunks().to_batches() if len(batches) == 0: - # zero-length array with the right type + # FIXME: create a zero-length array with the right type raise ak._v2._util.error(NotImplementedError) elif len(batches) == 1: - out = handle_arrow(batches[0], generate_bitmasks, pass_empty_field) + return handle_arrow(batches[0], generate_bitmasks, pass_empty_field) else: arrays = [ handle_arrow(batch, generate_bitmasks, pass_empty_field) for batch in batches if len(batch) > 0 ] - raise ak._v2._util.error( - NotImplementedError( - "FIXME: need ak._v2.operations.structure.concatenate" - ) - ) - out = ak._v2.operations.structure.concatenate(arrays, highlevel=False) - - if obj.schema.metadata is not None and b"ak:parameters" in obj.schema.metadata: - optiontype, recordtype = None, None - if out.is_OptionType: - optiontype = out - if out.is_RecordType: - recordtype = out - elif out.content.is_RecordType: - recordtype = out.content - - parameters = json.loads(obj.schema.metadata[b"ak:parameters"]) - for x in parameters: - (key,) = x.keys() - (value,) = x.values() - if optiontype is not None and key in ( - "UnmaskedArray", - "BitMaskedArray", - "ByteMaskedArray", - "IndexedOptionArray", - ): - optiontype._parameters = value - elif recordtype is not None and key == "RecordArray": - recordtype._parameters = value - - return out + return ak._v2.operations.structure.concatenate(arrays, highlevel=False) elif ( isinstance(obj, Iterable) @@ -1014,43 +1044,57 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): def form_handle_arrow(schema, pass_empty_field=False): - forms = [] - for i, arrowtype in enumerate(schema.types): - awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(arrowtype) + if pass_empty_field and list(schema.names) == [""]: + awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(schema.types[0]) akform = form_popbuffers(awkwardarrow_type, storage_type) - - if not schema.field(i).nullable: - forms.append(form_remove_optiontype(akform)) + if not schema.field(0).nullable: + return form_remove_optiontype(akform) else: - forms.append(akform) + return akform - if pass_empty_field and list(schema.names) == [""]: - assert len(forms) == 1 - out = forms[0] else: - out = ak._v2.forms.RecordForm(forms, list(schema.names)) - - if schema.metadata is not None and b"ak:parameters" in schema.metadata: - optiontype, recordtype = None, None - if out.is_OptionType: - optiontype = out - if out.is_RecordType: - recordtype = out - elif out.content.is_RecordType: - recordtype = out.content - - parameters = json.loads(schema.metadata[b"ak:parameters"]) - for x in parameters: - (key,) = x.keys() - (value,) = x.values() - if optiontype is not None and key in ( - "UnmaskedArray", - "BitMaskedArray", - "ByteMaskedArray", - "IndexedOptionArray", - ): - optiontype._parameters = value - elif recordtype is not None and key == "RecordArray": - recordtype._parameters = value + record_is_optiontype = False + optiontype_fields = [] + optiontype_parameters = None + recordtype_parameters = None + if schema.metadata is not None and b"ak:parameters" in schema.metadata: + for x in json.loads(schema.metadata[b"ak:parameters"]): + (key,) = x.keys() + (value,) = x.values() + if key == "optiontype_fields": + optiontype_fields = value + elif key in ( + "UnmaskedArray", + "BitMaskedArray", + "ByteMaskedArray", + "IndexedOptionArray", + ): + record_is_optiontype = True + optiontype_parameters = value + elif key == "RecordArray": + recordtype_parameters = value - return out + forms = [] + for i, arrowtype in enumerate(schema.types): + field = schema.field(i) + awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(arrowtype) + akform = form_popbuffers(awkwardarrow_type, storage_type) + + if ( + record_is_optiontype and field.name not in optiontype_fields + ) or not field.nullable: + forms.append(form_remove_optiontype(akform)) + else: + forms.append(akform) + + out = ak._v2.forms.RecordForm( + forms, list(schema.names), parameters=recordtype_parameters + ) + + if record_is_optiontype: + return ak._v2.forms.ByteMaskedForm( + "i8", out, valid_when=False, parameters=optiontype_parameters + ) + + else: + return out diff --git a/src/awkward/_v2/forms/bitmaskedform.py b/src/awkward/_v2/forms/bitmaskedform.py index 81be730be9..10f180793d 100644 --- a/src/awkward/_v2/forms/bitmaskedform.py +++ b/src/awkward/_v2/forms/bitmaskedform.py @@ -240,3 +240,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return self._content._column_types() diff --git a/src/awkward/_v2/forms/bytemaskedform.py b/src/awkward/_v2/forms/bytemaskedform.py index 36268048cf..a7f6b88fb4 100644 --- a/src/awkward/_v2/forms/bytemaskedform.py +++ b/src/awkward/_v2/forms/bytemaskedform.py @@ -218,3 +218,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return self._content._column_types() diff --git a/src/awkward/_v2/forms/emptyform.py b/src/awkward/_v2/forms/emptyform.py index dd25b2d64b..55774f5851 100644 --- a/src/awkward/_v2/forms/emptyform.py +++ b/src/awkward/_v2/forms/emptyform.py @@ -105,3 +105,6 @@ def _select_columns(self, index, specifier, matches, output): if any(match and index >= len(item) for item, match in zip(specifier, matches)): output.append(None) return self + + def _column_types(self): + return ("empty",) diff --git a/src/awkward/_v2/forms/form.py b/src/awkward/_v2/forms/form.py index ac85c18d5f..6cb1331f26 100644 --- a/src/awkward/_v2/forms/form.py +++ b/src/awkward/_v2/forms/form.py @@ -315,9 +315,9 @@ def simplify_optiontype(self): def simplify_uniontype(self, merge=True, mergebool=False): return self - def columns(self, list_indicator=None): + def columns(self, list_indicator=None, column_prefix=()): output = [] - self._columns((), output, list_indicator) + self._columns(column_prefix, output, list_indicator) return output def select_columns(self, specifier, expand_braces=True): @@ -342,3 +342,6 @@ def select_columns(self, specifier, expand_braces=True): output = [] return self._select_columns(0, specifier, matches, output) + + def column_types(self): + return self._column_types() diff --git a/src/awkward/_v2/forms/indexedform.py b/src/awkward/_v2/forms/indexedform.py index e117f1162b..f4aa8c5c74 100644 --- a/src/awkward/_v2/forms/indexedform.py +++ b/src/awkward/_v2/forms/indexedform.py @@ -182,3 +182,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return self._content._column_types() diff --git a/src/awkward/_v2/forms/indexedoptionform.py b/src/awkward/_v2/forms/indexedoptionform.py index 418fb10eee..ba1ba09312 100644 --- a/src/awkward/_v2/forms/indexedoptionform.py +++ b/src/awkward/_v2/forms/indexedoptionform.py @@ -199,3 +199,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return self._content._column_types() diff --git a/src/awkward/_v2/forms/listform.py b/src/awkward/_v2/forms/listform.py index c92eedba00..5cc790f14b 100644 --- a/src/awkward/_v2/forms/listform.py +++ b/src/awkward/_v2/forms/listform.py @@ -214,3 +214,9 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + if self.parameter("__array__") in ("string", "bytestring"): + return ("string",) + else: + return self._content._column_types() diff --git a/src/awkward/_v2/forms/listoffsetform.py b/src/awkward/_v2/forms/listoffsetform.py index 1ed268cd11..db04146394 100644 --- a/src/awkward/_v2/forms/listoffsetform.py +++ b/src/awkward/_v2/forms/listoffsetform.py @@ -180,3 +180,9 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + if self.parameter("__array__") in ("string", "bytestring"): + return ("string",) + else: + return self._content._column_types() diff --git a/src/awkward/_v2/forms/numpyform.py b/src/awkward/_v2/forms/numpyform.py index 612b533b56..a7e5eee99c 100644 --- a/src/awkward/_v2/forms/numpyform.py +++ b/src/awkward/_v2/forms/numpyform.py @@ -214,3 +214,6 @@ def _select_columns(self, index, specifier, matches, output): if any(match and index >= len(item) for item, match in zip(specifier, matches)): output.append(None) return self + + def _column_types(self): + return (ak._v2.types.numpytype.primitive_to_dtype(self._primitive),) diff --git a/src/awkward/_v2/forms/recordform.py b/src/awkward/_v2/forms/recordform.py index 33378665c9..42879ebad4 100644 --- a/src/awkward/_v2/forms/recordform.py +++ b/src/awkward/_v2/forms/recordform.py @@ -334,3 +334,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return sum((x._column_types() for x in self._contents), ()) diff --git a/src/awkward/_v2/forms/regularform.py b/src/awkward/_v2/forms/regularform.py index cfbc364875..93733f3aa6 100644 --- a/src/awkward/_v2/forms/regularform.py +++ b/src/awkward/_v2/forms/regularform.py @@ -185,3 +185,9 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + if self.parameter("__array__") in ("string", "bytestring"): + return ("string",) + else: + return self._content._column_types() diff --git a/src/awkward/_v2/forms/unionform.py b/src/awkward/_v2/forms/unionform.py index dfd63f51ef..1bc5dfae9d 100644 --- a/src/awkward/_v2/forms/unionform.py +++ b/src/awkward/_v2/forms/unionform.py @@ -272,3 +272,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return sum((x._column_types() for x in self._contents), ()) diff --git a/src/awkward/_v2/forms/unmaskedform.py b/src/awkward/_v2/forms/unmaskedform.py index 7b14e87ba0..8cf27d03f3 100644 --- a/src/awkward/_v2/forms/unmaskedform.py +++ b/src/awkward/_v2/forms/unmaskedform.py @@ -164,3 +164,6 @@ def _select_columns(self, index, specifier, matches, output): self._parameters, self._form_key, ) + + def _column_types(self): + return self._content._column_types() diff --git a/src/awkward/_v2/highlevel.py b/src/awkward/_v2/highlevel.py index d88fb90b16..2ceebe5aae 100644 --- a/src/awkward/_v2/highlevel.py +++ b/src/awkward/_v2/highlevel.py @@ -1151,12 +1151,17 @@ def _repr(self, limit_cols): if self._layout.nplike.known_shape and self._layout.nplike.known_data: typestr = repr(str(self.type))[1:-1] - strwidth = max( - 0, min(40, limit_cols - len(pytype) - len(" type='...'") - 3) - ) - if len(pytype) - len(" type=''") - len(typestr) - 3 < limit_cols: + if len(typestr) + len(pytype) + len(" type=''") + 3 < limit_cols // 2: + strwidth = limit_cols - ( + len(typestr) + len(pytype) + len(" type=''") + 3 + ) + else: strwidth = max( - 0, limit_cols - len(pytype) - len(" type=''") - len(typestr) - 3 + 0, + min( + limit_cols // 2, + limit_cols - len(pytype) - len(" type='...'") - 3, + ), ) valuestr = " " + awkward._v2._prettyprint.valuestr(self, 1, strwidth) @@ -1819,12 +1824,17 @@ def _repr(self, limit_cols): and self._layout.array.nplike.known_data ): typestr = repr(str(self.type))[1:-1] - strwidth = max( - 0, min(40, limit_cols - len(pytype) - len(" type='...'") - 3) - ) - if len(pytype) - len(" type=''") - len(typestr) - 3 < limit_cols: + if len(typestr) + len(pytype) + len(" type=''") + 3 < limit_cols // 2: + strwidth = limit_cols - ( + len(typestr) + len(pytype) + len(" type=''") + 3 + ) + else: strwidth = max( - 0, limit_cols - len(pytype) - len(" type=''") - len(typestr) - 3 + 0, + min( + limit_cols // 2, + limit_cols - len(pytype) - len(" type='...'") - 3, + ), ) valuestr = " " + awkward._v2._prettyprint.valuestr(self, 1, strwidth) diff --git a/src/awkward/_v2/operations/convert/ak_from_parquet.py b/src/awkward/_v2/operations/convert/ak_from_parquet.py index 8770da1e19..66e616b807 100644 --- a/src/awkward/_v2/operations/convert/ak_from_parquet.py +++ b/src/awkward/_v2/operations/convert/ak_from_parquet.py @@ -125,6 +125,13 @@ def _metadata( if columns is not None: list_indicator = "list.item" + for column_metadata in file_for_metadata.schema: + if ( + column_metadata.max_repetition_level > 0 + and ".list.element." in column_metadata.path + ): + list_indicator = "list.element" + break form = ak._v2._connect.pyarrow.form_handle_arrow( parquetfile_for_metadata.schema_arrow, pass_empty_field=True @@ -272,10 +279,11 @@ def _read_parquet_file( else: arrow_table = parquetfile.read_row_groups(row_groups, parquet_columns) - return ak._v2._connect.pyarrow.handle_arrow( + return ak._v2.operations.convert.ak_from_arrow._impl( arrow_table, - generate_bitmasks=generate_bitmasks, - pass_empty_field=True, + generate_bitmasks, + False, + None, ) diff --git a/src/awkward/_v2/operations/convert/ak_to_arrow_table.py b/src/awkward/_v2/operations/convert/ak_to_arrow_table.py index ff283cbb60..73319d25e0 100644 --- a/src/awkward/_v2/operations/convert/ak_to_arrow_table.py +++ b/src/awkward/_v2/operations/convert/ak_to_arrow_table.py @@ -110,6 +110,7 @@ def _impl( parameters = None paarrays, pafields = [], [] if check[-1].is_RecordType and not check[-1].is_tuple: + optiontype_fields = [] for name in check[-1].fields: paarrays.append( layout[name].to_arrow( @@ -127,7 +128,10 @@ def _impl( layout[name].is_OptionType ) ) - parameters = [] + if check[-1].contents[check[-1].field_to_index(name)].is_OptionType: + optiontype_fields.append(name) + + parameters = [{"optiontype_fields": optiontype_fields}] for x in check: parameters.append( {ak._v2._util.direct_Content_subclass(x).__name__: x._parameters} diff --git a/src/awkward/_v2/operations/convert/ak_to_parquet.py b/src/awkward/_v2/operations/convert/ak_to_parquet.py index b42f62466c..d5e470872b 100644 --- a/src/awkward/_v2/operations/convert/ak_to_parquet.py +++ b/src/awkward/_v2/operations/convert/ak_to_parquet.py @@ -1,7 +1,214 @@ # BSD 3-Clause License; see https://github.com/scikit-hep/awkward-1.0/blob/main/LICENSE +from collections.abc import Iterable, Sized, Mapping, Sequence + +import numpy as np + import awkward as ak -def to_parquet(): - raise ak._v2._util.error(NotImplementedError) +def to_parquet( + data, + destination, + list_to32=False, + string_to32=True, + bytestring_to32=True, + emptyarray_to=None, + categorical_as_dictionary=False, + extensionarray=True, + count_nulls=True, + compression="zstd", + compression_level=None, + row_group_size=64 * 1024 * 1024, + data_page_size=None, + parquet_flavor=None, + parquet_version="1.0", + parquet_page_version="1.0", + parquet_metadata_statistics=True, + parquet_dictionary_encoding=False, + parquet_byte_stream_split=False, + parquet_coerce_timestamps=None, + parquet_old_int96_timestamps=None, + parquet_compliant_nested=False, # https://issues.apache.org/jira/browse/ARROW-16348 + parquet_extra_options=None, + hook_after_write=None, +): + import awkward._v2._connect.pyarrow + + pyarrow_parquet = awkward._v2._connect.pyarrow.import_pyarrow_parquet( + "ak.to_parquet" + ) + fsspec = awkward._v2._connect.pyarrow.import_fsspec("ak.to_parquet") + + if isinstance(data, Iterable) and not isinstance(data, Sized): + iterator = iter(data) + elif isinstance(data, Iterable): + iterator = iter([data]) + else: + raise ak._v2._util.error( + TypeError( + "'data' must be an array (one row group) or iterable of arrays (row group per array)" + ) + ) + + row_group = 0 + array = next(iterator) + layout = ak._v2.operations.convert.ak_to_layout.to_layout( + array, allow_record=False, allow_other=False + ) + table = ak._v2.operations.convert.ak_to_arrow_table._impl( + layout, + list_to32, + string_to32, + bytestring_to32, + emptyarray_to, + categorical_as_dictionary, + extensionarray, + count_nulls, + ) + + if parquet_compliant_nested: + list_indicator = "list.element" + else: + list_indicator = "list.item" + + if table.column_names == [""]: + column_prefix = ("",) + else: + column_prefix = () + + form = layout.form + + def parquet_columns(specifier, only=None): + if specifier is None: + selected_form = form + else: + selected_form = form.select_columns(specifier) + + parquet_column_names = selected_form.columns( + list_indicator=list_indicator, column_prefix=column_prefix + ) + if only is not None: + column_types = selected_form.column_types() + assert len(parquet_column_names) == len(column_types) + if only == "string": + return [ + x + for x, y in zip(parquet_column_names, column_types) + if y == "string" + ] + elif only == "floating": + return [ + x + for x, y in zip(parquet_column_names, column_types) + if isinstance(y, np.dtype) and issubclass(y.type, np.floating) + ] + else: + return parquet_column_names + + if compression is True: + compression = "zstd" + elif compression is False or compression is None: + compression = "none" + elif isinstance(compression, Mapping): + replacement = {} + for specifier, value in compression.items(): + replacement.update({x: value for x in parquet_columns(specifier)}) + compression = replacement + + if isinstance(compression_level, Mapping): + replacement = {} + for specifier, value in compression_level.items(): + replacement.update({x: value for x in parquet_columns(specifier)}) + compression_level = replacement + + if parquet_metadata_statistics is True: + parquet_metadata_statistics = True + elif parquet_metadata_statistics is False or parquet_metadata_statistics is None: + parquet_metadata_statistics = False + elif isinstance(parquet_metadata_statistics, Mapping): + replacement = {} + for specifier, value in parquet_metadata_statistics.items(): + replacement.update({x: value for x in parquet_columns(specifier)}) + parquet_metadata_statistics = [x for x, value in replacement.items() if value] + elif isinstance(parquet_metadata_statistics, Sequence): + replacement = [] + for specifier in parquet_metadata_statistics: + replacement.extend([x for x in parquet_columns(specifier)]) + parquet_metadata_statistics = replacement + + if parquet_dictionary_encoding is True: + parquet_dictionary_encoding = parquet_columns(None, only="string") + elif parquet_dictionary_encoding is False or parquet_dictionary_encoding is None: + parquet_dictionary_encoding = False + elif isinstance(parquet_dictionary_encoding, Mapping): + replacement = {} + for specifier, value in parquet_dictionary_encoding.items(): + replacement.update( + {x: value for x in parquet_columns(specifier, only="string")} + ) + parquet_dictionary_encoding = [x for x, value in replacement.items() if value] + + if parquet_byte_stream_split is True: + parquet_byte_stream_split = parquet_columns(None, only="floating") + elif parquet_byte_stream_split is False or parquet_byte_stream_split is None: + parquet_byte_stream_split = False + elif isinstance(parquet_byte_stream_split, Mapping): + replacement = {} + for specifier, value in parquet_byte_stream_split.items(): + replacement.update( + {x: value for x in parquet_columns(specifier, only="floating")} + ) + parquet_byte_stream_split = [x for x, value in replacement.items() if value] + + if parquet_extra_options is None: + parquet_extra_options = {} + + with fsspec.open(destination, "wb") as file: + with pyarrow_parquet.ParquetWriter( + destination, + table.schema, + filesystem=file.fs, + flavor=parquet_flavor, + version=parquet_version, + use_dictionary=parquet_dictionary_encoding, + compression=compression, + write_statistics=parquet_metadata_statistics, + use_deprecated_int96_timestamps=parquet_old_int96_timestamps, + compression_level=compression_level, + use_byte_stream_split=parquet_byte_stream_split, + data_page_version=parquet_page_version, + use_compliant_nested_type=parquet_compliant_nested, + data_page_size=data_page_size, + coerce_timestamps=parquet_coerce_timestamps, + **parquet_extra_options, + ) as writer: + while True: + writer.write_table(table, row_group_size=row_group_size) + if hook_after_write is not None: + hook_after_write( + row_group=row_group, + array=array, + layout=layout, + table=table, + writer=writer, + ) + + row_group += 1 + try: + array = next(iterator) + except StopIteration: + break + layout = ak._v2.operations.convert.ak_to_layout.to_layout( + array, allow_record=False, allow_other=False + ) + table = ak._v2.operations.convert.ak_to_arrow_table._impl( + layout, + list_to32, + string_to32, + bytestring_to32, + emptyarray_to, + categorical_as_dictionary, + extensionarray, + count_nulls, + ) diff --git a/tests/v2/test_0028-add-dressed-types.py b/tests/v2/test_0028-add-dressed-types.py index a34e3a5c7c..52df25920f 100644 --- a/tests/v2/test_0028-add-dressed-types.py +++ b/tests/v2/test_0028-add-dressed-types.py @@ -39,7 +39,7 @@ def test_highlevel(): ) assert ( repr(c) - == "" + == "" ) assert str(c) == "[{one: 3.14, two: [1.1, 2.2]}, {one: 99.9, two: [-3.14]}]" diff --git a/tests/v2/test_1125-to-arrow-from-arrow.py b/tests/v2/test_1125-to-arrow-from-arrow.py index 83c4c6c4f9..7b79eeb917 100644 --- a/tests/v2/test_1125-to-arrow-from-arrow.py +++ b/tests/v2/test_1125-to-arrow-from-arrow.py @@ -239,21 +239,7 @@ def test_indexedoptionarray_emptyarray(tmp_path, extensionarray): ) paarray = akarray.to_arrow(extensionarray=extensionarray) arrow_round_trip(akarray, paarray, extensionarray) - - # https://issues.apache.org/jira/browse/ARROW-14522 - if extensionarray: - paarray = akarray.to_arrow(extensionarray=extensionarray, emptyarray_to="f8") - akarray2 = ak._v2.from_arrow(paarray, highlevel=False) - assert to_list(akarray2) == to_list(akarray) - - filename = os.path.join(tmp_path, "whatever.parquet") - pyarrow_parquet.write_table(pyarrow.table({"": paarray}), filename) - table = pyarrow_parquet.read_table(filename) - akarray4 = ak._v2.from_arrow(table[0].chunks[0], highlevel=False) - assert to_list(akarray4) == to_list(akarray) - - else: - parquet_round_trip(akarray, paarray, extensionarray, tmp_path) + parquet_round_trip(akarray, paarray, extensionarray, tmp_path) @pytest.mark.parametrize("categorical_as_dictionary", [False, True]) diff --git a/tests/v2/test_1294-to-and-from_parquet.py b/tests/v2/test_1294-to-and-from_parquet.py index 456c52cee4..7e491f8b6a 100644 --- a/tests/v2/test_1294-to-and-from_parquet.py +++ b/tests/v2/test_1294-to-and-from_parquet.py @@ -292,13 +292,11 @@ def test_indexedoptionarray_emptyarray(tmp_path, through, extensionarray): parameters={"which": "outer"}, ) - # https://issues.apache.org/jira/browse/ARROW-14522 - if through is through_arrow or not extensionarray: - schema_arrow, array_form = through(akarray, extensionarray, tmp_path) - predicted_form = ak._v2._connect.pyarrow.form_handle_arrow( - schema_arrow, pass_empty_field=True - ) - assert predicted_form == array_form + schema_arrow, array_form = through(akarray, extensionarray, tmp_path) + predicted_form = ak._v2._connect.pyarrow.form_handle_arrow( + schema_arrow, pass_empty_field=True + ) + assert predicted_form == array_form @pytest.mark.parametrize("categorical_as_dictionary", [False, True]) diff --git a/tests/v2/test_1440-start-v2-to_parquet.py b/tests/v2/test_1440-start-v2-to_parquet.py new file mode 100644 index 0000000000..118f370051 --- /dev/null +++ b/tests/v2/test_1440-start-v2-to_parquet.py @@ -0,0 +1,481 @@ +# BSD 3-Clause License; see https://github.com/scikit-hep/awkward-1.0/blob/main/LICENSE + +import os +import pytest # noqa: F401 +import numpy as np # noqa: F401 +import awkward as ak # noqa: F401 + +pyarrow = pytest.importorskip("pyarrow") +pyarrow_parquet = pytest.importorskip("pyarrow.parquet") + +to_list = ak._v2.operations.convert.to_list + + +def parquet_round_trip(akarray, extensionarray, tmp_path): + filename = os.path.join(tmp_path, "whatever.parquet") + ak._v2.to_parquet(akarray, filename, extensionarray=extensionarray) + akarray2 = ak._v2.from_parquet(filename) + + assert to_list(akarray2) == to_list(akarray) + if extensionarray: + print("read back") + akarray2.type.show() + print("original") + akarray.type.show() + + assert akarray2.type == akarray.type + + +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_numpyarray(tmp_path, extensionarray): + akarray = ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner"} + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([False, True, False])), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner"} + ), + valid_when=False, + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.NumpyArray( + np.arange(2 * 3 * 5).reshape(2, 3, 5), parameters={"which": "inner"} + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("dtype", [np.int32, np.uint32, np.int64]) +@pytest.mark.parametrize("list_to32", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_listoffsetarray_numpyarray(tmp_path, dtype, list_to32, extensionarray): + akarray = ak._v2.contents.ListOffsetArray( + ak._v2.index.Index(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + # akarray = ak._v2.contents.ListOffsetArray( + # ak._v2.index.Index(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + # ak._v2.contents.ByteMaskedArray( + # ak._v2.index.Index8( + # np.array( + # [False, True, False, True, True, False, True, True, False, False], + # dtype=np.int8, + # ) + # ), + # ak._v2.contents.NumpyArray( + # np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + # parameters={"which": "inner"}, + # ), + # valid_when=False, + # parameters={"which": "middle"}, + # ), + # parameters={"which": "outer"}, + # ) + # parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + # akarray = ak._v2.contents.ByteMaskedArray( + # ak._v2.index.Index8(np.array([True, False, True, True, True], dtype=np.int8)), + # ak._v2.contents.ListOffsetArray( + # ak._v2.index.Index(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + # ak._v2.contents.NumpyArray( + # np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + # parameters={"which": "inner"}, + # ), + # parameters={"which": "middle"}, + # ), + # valid_when=True, + # parameters={"which": "outer"}, + # ) + # parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + # akarray = ak._v2.contents.ByteMaskedArray( + # ak._v2.index.Index8(np.array([True, False, True, True, True], dtype=np.int8)), + # ak._v2.contents.ListOffsetArray( + # ak._v2.index.Index(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + # ak._v2.contents.ByteMaskedArray( + # ak._v2.index.Index8( + # np.array( + # [ + # False, + # True, + # False, + # True, + # True, + # False, + # True, + # True, + # False, + # False, + # ], + # dtype=np.int8, + # ) + # ), + # ak._v2.contents.NumpyArray( + # np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + # parameters={"which": "inner"}, + # ), + # valid_when=False, + # parameters={"which": "middle-1"}, + # ), + # parameters={"which": "middle-2"}, + # ), + # valid_when=True, + # parameters={"which": "outer"}, + # ) + # parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("dtype", [np.int32, np.uint32, np.int64]) +@pytest.mark.parametrize("list_to32", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_listoffsetarray_numpyarray_2(tmp_path, dtype, list_to32, extensionarray): + akarray = ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([True, False, True, False, True], dtype=np.int8)), + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + parameters={"which": "middle"}, + ), + valid_when=True, + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_numpyarray_bool(tmp_path, extensionarray): + akarray = ak._v2.contents.NumpyArray( + np.random.randint(0, 2, 14).astype(np.int8).view(np.bool_), + parameters={"which": "inner"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.random.randint(0, 2, 14).astype(np.int8).view(np.bool_)), + ak._v2.contents.NumpyArray( + np.random.randint(0, 2, 14).astype(np.int8).view(np.bool_), + parameters={"which": "inner"}, + ), + valid_when=False, + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_indexedoptionarray_numpyarray(tmp_path, extensionarray): + akarray = ak._v2.contents.IndexedOptionArray( + ak._v2.index.Index64(np.array([2, 0, 0, -1, 3, 1, 5, -1, 2], dtype=np.int64)), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5]), + parameters={"which": "inner"}, + ), + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.IndexedArray( + ak._v2.index.Index64(np.array([2, 0, 0, 3, 1, 5, 2], dtype=np.int64)), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5]), + parameters={"which": "inner"}, + ), + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_indexedoptionarray_emptyarray(tmp_path, extensionarray): + akarray = ak._v2.contents.IndexedOptionArray( + ak._v2.index.Index64(np.array([-1, -1, -1, -1, -1], dtype=np.int64)), + ak._v2.contents.EmptyArray(parameters={"which": "inner"}), + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("categorical_as_dictionary", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_dictionary_encoding(tmp_path, categorical_as_dictionary, extensionarray): + akarray = ak._v2.contents.IndexedArray( + ak._v2.index.Index64(np.array([3, 2, 2, 2, 0, 1, 3], dtype=np.uint64)), + ak._v2.contents.NumpyArray([0.0, 1.1, 2.2, 3.3], parameters={"which": "inner"}), + parameters={"__array__": "categorical", "which": "outer"}, + ) + + # https://issues.apache.org/jira/browse/ARROW-14525 + if not (extensionarray and categorical_as_dictionary): + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("string_to32", [False, True]) +@pytest.mark.parametrize("dtype", [np.int32, np.uint32, np.int64]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_listoffsetraray_string(tmp_path, dtype, string_to32, extensionarray): + akarray = ak._v2.contents.ListOffsetArray( + ak._v2.index.Index64(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + ak._v2.contents.NumpyArray( + np.arange(97, 107, dtype=np.uint8), parameters={"__array__": "char"} + ), + parameters={"__array__": "string", "something": "else"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("bytestring_to32", [False, True]) +@pytest.mark.parametrize("dtype", [np.int32, np.uint32, np.int64]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_listoffsetraray_bytestring(tmp_path, dtype, bytestring_to32, extensionarray): + akarray = ak._v2.contents.ListOffsetArray( + ak._v2.index.Index64(np.array([0, 3, 3, 5, 6, 10], dtype=dtype)), + ak._v2.contents.NumpyArray( + np.arange(97, 107, dtype=np.uint8), parameters={"__array__": "byte"} + ), + parameters={"__array__": "bytestring", "something": "else"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("size", [5]) +@pytest.mark.parametrize("list_to32", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_regulararray_numpyarray(tmp_path, size, list_to32, extensionarray): + akarray = ak._v2.contents.RegularArray( + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + size, + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.RegularArray( + ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8( + np.array( + [False, True, False, True, True, False, True, True, False, False], + dtype=np.int8, + ) + ), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + valid_when=False, + parameters={"which": "middle"}, + ), + size, + parameters={"which": "outer"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([True, False], dtype=np.int8)), + ak._v2.contents.RegularArray( + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + size, + parameters={"which": "middle"}, + ), + valid_when=True, + parameters={"which": "outer"}, + ) + + # https://issues.apache.org/jira/browse/ARROW-14547 + # parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([True, False], dtype=np.int8)), + ak._v2.contents.RegularArray( + ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8( + np.array( + [ + False, + True, + False, + True, + True, + False, + True, + True, + False, + False, + ], + dtype=np.int8, + ) + ), + ak._v2.contents.NumpyArray( + np.array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]), + parameters={"which": "inner"}, + ), + valid_when=False, + parameters={"which": "middle-1"}, + ), + size, + parameters={"which": "middle-2"}, + ), + valid_when=True, + parameters={"which": "outer"}, + ) + + # https://issues.apache.org/jira/browse/ARROW-14547 + # parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("size", [5]) +@pytest.mark.parametrize("bytestring_to32", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_regularaarray_bytestring(tmp_path, size, bytestring_to32, extensionarray): + akarray = ak._v2.contents.RegularArray( + ak._v2.contents.NumpyArray( + np.arange(97, 107, dtype=np.uint8), parameters={"__array__": "byte"} + ), + size, + parameters={"__array__": "bytestring", "something": "else"}, + ) + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_unmaskedarray_numpyarray(tmp_path, extensionarray): + akarray = ak._v2.contents.UnmaskedArray( + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner"} + ) + ) + parquet_round_trip(ak._v2.Array(akarray), True, tmp_path) + + +@pytest.mark.parametrize("is_tuple", [False, True]) +@pytest.mark.parametrize("extensionarray", [False, True]) +def test_recordarray(tmp_path, is_tuple, extensionarray): + akarray = ak._v2.contents.RecordArray( + [ + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner1"} + ), + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index32(np.array([0, 3, 3, 5], dtype=np.int32)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3, 4.4, 5.5]), parameters={"which": "inner2"} + ), + ), + ], + None if is_tuple else ["x", "y"], + parameters={"which": "outer"}, + ) + if not is_tuple or extensionarray: + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.RecordArray( + [ + ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([False, True, False]).view(np.int8)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner1"} + ), + valid_when=False, + ), + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index32(np.array([0, 3, 3, 5], dtype=np.int32)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3, 4.4, 5.5]), parameters={"which": "inner2"} + ), + ), + ], + None if is_tuple else ["x", "y"], + parameters={"which": "outer"}, + ) + if not is_tuple or extensionarray: + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.RecordArray( + [ + ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([False, True, False]).view(np.int8)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner1"} + ), + valid_when=False, + ), + ak._v2.contents.UnmaskedArray( + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index32(np.array([0, 3, 3, 5], dtype=np.int32)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3, 4.4, 5.5]), + parameters={"which": "inner2"}, + ), + ), + ), + ], + None if is_tuple else ["x", "y"], + parameters={"which": "outer"}, + ) + if not is_tuple or extensionarray: + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.IndexedOptionArray( + ak._v2.index.Index64(np.array([2, 0, -1, 0, 1], dtype=np.int64)), + ak._v2.contents.RecordArray( + [ + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner1"} + ), + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index32(np.array([0, 3, 3, 5], dtype=np.int32)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3, 4.4, 5.5]), + parameters={"which": "inner2"}, + ), + ), + ], + None if is_tuple else ["x", "y"], + parameters={"which": "outer"}, + ), + ) + if not is_tuple or extensionarray: + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path) + + akarray = ak._v2.contents.IndexedOptionArray( + ak._v2.index.Index64(np.array([2, 0, -1, 0, 1], dtype=np.int64)), + ak._v2.contents.RecordArray( + [ + ak._v2.contents.ByteMaskedArray( + ak._v2.index.Index8(np.array([False, True, False]).view(np.int8)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3]), parameters={"which": "inner1"} + ), + valid_when=False, + ), + ak._v2.contents.ListOffsetArray( + ak._v2.index.Index32(np.array([0, 3, 3, 5], dtype=np.int32)), + ak._v2.contents.NumpyArray( + np.array([1.1, 2.2, 3.3, 4.4, 5.5]), + parameters={"which": "inner2"}, + ), + ), + ], + None if is_tuple else ["x", "y"], + parameters={"which": "outer"}, + ), + ) + if not is_tuple or extensionarray: + parquet_round_trip(ak._v2.Array(akarray), extensionarray, tmp_path)