diff --git a/src/awkward/_v2/operations/convert/ak_from_parquet.py b/src/awkward/_v2/operations/convert/ak_from_parquet.py index c3591861b7..410c0474fb 100644 --- a/src/awkward/_v2/operations/convert/ak_from_parquet.py +++ b/src/awkward/_v2/operations/convert/ak_from_parquet.py @@ -2,9 +2,6 @@ import awkward as ak -np = ak.nplike.NumpyMetadata.instance() -numpy = ak.nplike.Numpy.instance() - def from_parquet( path, @@ -63,38 +60,37 @@ def from_parquet( behavior=behavior, ), ): - return _impl( + import awkward._v2._connect.pyarrow # noqa: F401 + + parquet_columns, subform, actual_paths, fs, subrg, meta = _metadata( path, - columns, - row_groups, storage_options, + row_groups, + columns, + max_gap, + max_block, + footer_sample_size, + ) + return _load( + actual_paths, + parquet_columns, + subrg, max_gap, max_block, footer_sample_size, conservative_optiontype, + subform, highlevel, behavior, + fs, + meta, ) -def _impl( - path, - columns, - row_groups, - storage_options, - max_gap, - max_block, - footer_sample_size, - conservative_optiontype, - highlevel, - behavior, +def _metadata( + path, storage_options, row_groups, columns, max_gap, max_block, footer_sample_size ): - import awkward._v2._connect.pyarrow # noqa: F401 - - name = "ak._v2.from_parquet" - pyarrow_parquet = ak._v2._connect.pyarrow.import_pyarrow_parquet(name) - fsspec = ak._v2._connect.pyarrow.import_fsspec(name) - + import pyarrow.parquet as pyarrow_parquet import fsspec.parquet if row_groups is not None: @@ -106,129 +102,133 @@ def _impl( fs, _, paths = fsspec.get_fs_token_paths( path, mode="rb", storage_options=storage_options ) - all_paths, path_for_metadata = _all_and_metadata_paths(path, fs, paths) parquet_columns = None subform = None subrg = [None] * len(all_paths) actual_paths = all_paths - if columns is not None or row_groups is not None: - with fsspec.parquet.open_parquet_file( - path_for_metadata, - fs=fs, - engine="pyarrow", - row_groups=[], - storage_options=storage_options, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - ) as file_for_metadata: - parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata) - - if columns is not None: - # FIXME: get this from parquetfile_for_metadata - list_indicator = "list.item" - - form = ak._v2._connect.pyarrow.form_handle_arrow( - parquetfile_for_metadata.schema_arrow, pass_empty_field=True - ) - subform = form.select_columns(columns) - parquet_columns = subform.columns(list_indicator=list_indicator) - - if row_groups is not None: - eoln = "\n " - metadata = parquetfile_for_metadata.metadata - if any(not 0 <= rg < metadata.num_row_groups for rg in row_groups): - raise ak._v2._util.error( - ValueError( - f"one of the requested row_groups is out of range (must be less than {metadata.num_row_groups})" - ) + with fsspec.parquet.open_parquet_file( + path_for_metadata, + fs=fs, + engine="pyarrow", + row_groups=[], + storage_options=storage_options, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + ) as file_for_metadata: + parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata) + + if columns is not None: + list_indicator = "list.item" + + form = ak._v2._connect.pyarrow.form_handle_arrow( + parquetfile_for_metadata.schema_arrow, pass_empty_field=True + ) + subform = form.select_columns(columns) + parquet_columns = subform.columns(list_indicator=list_indicator) + + metadata = parquetfile_for_metadata.metadata + if row_groups is not None: + eoln = "\n " + if any(not 0 <= rg < metadata.num_row_groups for rg in row_groups): + raise ak._v2._util.error( + ValueError( + f"one of the requested row_groups is out of range " + f"(must be less than {metadata.num_row_groups})" ) + ) - split_paths = [p.split("/") for p in all_paths] - prev_index = None - prev_i = 0 - actual_paths = [] - subrg = [] - for i in range(metadata.num_row_groups): - unsplit_path = metadata.row_group(i).column(0).file_path - if unsplit_path == "": - if len(all_paths) == 1: - index = 0 - else: - raise ak._v2._util.error( - LookupError( - f"""path from metadata is {unsplit_path!r} but more than one path matches: - - {eoln.join(all_paths)}""" - ) - ) - + split_paths = [p.split("/") for p in all_paths] + prev_index = None + prev_i = 0 + actual_paths = [] + subrg = [] + for i in range(metadata.num_row_groups): + unsplit_path = metadata.row_group(i).column(0).file_path + if unsplit_path == "": + if len(all_paths) == 1: + index = 0 else: - split_path = unsplit_path.split("/") - index = None - for j, compare in enumerate(split_paths): - if split_path == compare[-len(split_path) :]: - index = j - break - if index is None: - raise ak._v2._util.error( - LookupError( - f"""path {'/'.join(split_path)!r} from metadata not found in path matches: - - {eoln.join(all_paths)}""" - ) + raise ak._v2._util.error( + LookupError( + f"""path from metadata is {unsplit_path!r} but more + than one path matches: + + {eoln.join(all_paths)}""" ) + ) - if prev_index != index: - prev_index = index - prev_i = i - actual_paths.append(all_paths[index]) - subrg.append([]) + else: + split_path = unsplit_path.split("/") + index = None + for j, compare in enumerate(split_paths): + if split_path == compare[-len(split_path) :]: + index = j + break + if index is None: + raise ak._v2._util.error( + LookupError( + f"""path {'/'.join(split_path)!r} from metadata not found + in path matches: + + {eoln.join(all_paths)}""" + ) + ) - if i in row_groups: - subrg[-1].append(i - prev_i) + if prev_index != index: + prev_index = index + prev_i = i + actual_paths.append(all_paths[index]) + subrg.append([]) + + if i in row_groups: + subrg[-1].append(i - prev_i) + + for k in range(len(subrg) - 1, -1, -1): + if len(subrg[k]) == 0: + del actual_paths[k] + del subrg[k] + if subform is None: + subform = ak._v2._connect.pyarrow.form_handle_arrow( + parquetfile_for_metadata.schema_arrow, pass_empty_field=True + ) + return parquet_columns, subform, actual_paths, fs, subrg, metadata - for k in range(len(subrg) - 1, -1, -1): - if len(subrg[k]) == 0: - del actual_paths[k] - del subrg[k] +def _load( + actual_paths, + parquet_columns, + subrg, + max_gap, + max_block, + footer_sample_size, + conservative_optiontype, + subform, + highlevel, + behavior, + fs, + meta, +): arrays = [] for i, p in enumerate(actual_paths): - with fsspec.parquet.open_parquet_file( - p, - fs=fs, - engine="pyarrow", - columns=parquet_columns, - row_groups=subrg[i], - storage_options=storage_options, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - ) as file: - parquetfile = pyarrow_parquet.ParquetFile(file) - - if subform is None: - subform = ak._v2._connect.pyarrow.form_handle_arrow( - parquetfile.schema_arrow, pass_empty_field=True - ) - - if row_groups is None: - arrow_table = parquetfile.read(parquet_columns) - else: - arrow_table = parquetfile.read_row_groups(subrg[i], parquet_columns) - arrays.append( - ak._v2._connect.pyarrow.handle_arrow( - arrow_table, + _read_parquet_file( + p, + fs=fs, + parquet_columns=parquet_columns, + row_groups=subrg[i], + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, conservative_optiontype=conservative_optiontype, - pass_empty_field=True, + metadata=meta, ) ) if len(arrays) == 0: + numpy = ak.nplike.Numpy.instance() return ak._v2.operations.convert.ak_from_buffers._impl( subform, 0, _DictOfEmptyBuffers(), "", numpy, highlevel, behavior ) @@ -238,6 +238,44 @@ def _impl( ) +def _read_parquet_file( + path, + fs, + parquet_columns, + row_groups, + footer_sample_size, + max_gap, + max_block, + metadata, + conservative_optiontype, +): + import fsspec.parquet + import pyarrow.parquet as pyarrow_parquet + + with fsspec.parquet.open_parquet_file( + path, + fs=fs, + engine="pyarrow", + columns=parquet_columns, + row_groups=row_groups, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + ) as file: + parquetfile = pyarrow_parquet.ParquetFile(file) + + if row_groups is None: + arrow_table = parquetfile.read(parquet_columns) + else: + arrow_table = parquetfile.read_row_groups(row_groups, parquet_columns) + + return ak._v2._connect.pyarrow.handle_arrow( + arrow_table, + conservative_optiontype=conservative_optiontype, + pass_empty_field=True, + ) + + class _DictOfEmptyBuffers: def __getitem__(self, where): return b"\x00\x00\x00\x00\x00\x00\x00\x00"