Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split up functions #1397

Merged
merged 11 commits into from
Apr 19, 2022
286 changes: 162 additions & 124 deletions src/awkward/_v2/operations/convert/ak_from_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import awkward as ak

np = ak.nplike.NumpyMetadata.instance()
numpy = ak.nplike.Numpy.instance()


def from_parquet(
path,
Expand Down Expand Up @@ -63,38 +60,37 @@ def from_parquet(
behavior=behavior,
),
):
return _impl(
import awkward._v2._connect.pyarrow # noqa: F401

parquet_columns, subform, actual_paths, fs, subrg, meta = _metadata(
path,
columns,
row_groups,
storage_options,
row_groups,
columns,
max_gap,
max_block,
footer_sample_size,
)
return _load(
actual_paths,
parquet_columns,
subrg,
max_gap,
max_block,
footer_sample_size,
conservative_optiontype,
subform,
highlevel,
behavior,
fs,
meta,
)


def _impl(
path,
columns,
row_groups,
storage_options,
max_gap,
max_block,
footer_sample_size,
conservative_optiontype,
highlevel,
behavior,
def _metadata(
path, storage_options, row_groups, columns, max_gap, max_block, footer_sample_size
):
import awkward._v2._connect.pyarrow # noqa: F401

name = "ak._v2.from_parquet"
pyarrow_parquet = ak._v2._connect.pyarrow.import_pyarrow_parquet(name)
fsspec = ak._v2._connect.pyarrow.import_fsspec(name)

import pyarrow.parquet as pyarrow_parquet
import fsspec.parquet

if row_groups is not None:
Expand All @@ -106,129 +102,133 @@ def _impl(
fs, _, paths = fsspec.get_fs_token_paths(
path, mode="rb", storage_options=storage_options
)

all_paths, path_for_metadata = _all_and_metadata_paths(path, fs, paths)

parquet_columns = None
subform = None
subrg = [None] * len(all_paths)
actual_paths = all_paths
if columns is not None or row_groups is not None:
with fsspec.parquet.open_parquet_file(
path_for_metadata,
fs=fs,
engine="pyarrow",
row_groups=[],
storage_options=storage_options,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
) as file_for_metadata:
parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata)

if columns is not None:
# FIXME: get this from parquetfile_for_metadata
list_indicator = "list.item"

form = ak._v2._connect.pyarrow.form_handle_arrow(
parquetfile_for_metadata.schema_arrow, pass_empty_field=True
)
subform = form.select_columns(columns)
parquet_columns = subform.columns(list_indicator=list_indicator)

if row_groups is not None:
eoln = "\n "
metadata = parquetfile_for_metadata.metadata
if any(not 0 <= rg < metadata.num_row_groups for rg in row_groups):
raise ak._v2._util.error(
ValueError(
f"one of the requested row_groups is out of range (must be less than {metadata.num_row_groups})"
)
with fsspec.parquet.open_parquet_file(
path_for_metadata,
fs=fs,
engine="pyarrow",
row_groups=[],
storage_options=storage_options,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
) as file_for_metadata:
parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata)

if columns is not None:
list_indicator = "list.item"

form = ak._v2._connect.pyarrow.form_handle_arrow(
parquetfile_for_metadata.schema_arrow, pass_empty_field=True
)
subform = form.select_columns(columns)
parquet_columns = subform.columns(list_indicator=list_indicator)

metadata = parquetfile_for_metadata.metadata
if row_groups is not None:
eoln = "\n "
if any(not 0 <= rg < metadata.num_row_groups for rg in row_groups):
raise ak._v2._util.error(
ValueError(
f"one of the requested row_groups is out of range "
f"(must be less than {metadata.num_row_groups})"
)
)

split_paths = [p.split("/") for p in all_paths]
prev_index = None
prev_i = 0
actual_paths = []
subrg = []
for i in range(metadata.num_row_groups):
unsplit_path = metadata.row_group(i).column(0).file_path
if unsplit_path == "":
if len(all_paths) == 1:
index = 0
else:
raise ak._v2._util.error(
LookupError(
f"""path from metadata is {unsplit_path!r} but more than one path matches:

{eoln.join(all_paths)}"""
)
)

split_paths = [p.split("/") for p in all_paths]
prev_index = None
prev_i = 0
actual_paths = []
subrg = []
for i in range(metadata.num_row_groups):
unsplit_path = metadata.row_group(i).column(0).file_path
if unsplit_path == "":
if len(all_paths) == 1:
index = 0
else:
split_path = unsplit_path.split("/")
index = None
for j, compare in enumerate(split_paths):
if split_path == compare[-len(split_path) :]:
index = j
break
if index is None:
raise ak._v2._util.error(
LookupError(
f"""path {'/'.join(split_path)!r} from metadata not found in path matches:

{eoln.join(all_paths)}"""
)
raise ak._v2._util.error(
LookupError(
f"""path from metadata is {unsplit_path!r} but more
than one path matches:

{eoln.join(all_paths)}"""
)
)

if prev_index != index:
prev_index = index
prev_i = i
actual_paths.append(all_paths[index])
subrg.append([])
else:
split_path = unsplit_path.split("/")
index = None
for j, compare in enumerate(split_paths):
if split_path == compare[-len(split_path) :]:
index = j
break
if index is None:
raise ak._v2._util.error(
LookupError(
f"""path {'/'.join(split_path)!r} from metadata not found
in path matches:

{eoln.join(all_paths)}"""
)
)

if i in row_groups:
subrg[-1].append(i - prev_i)
if prev_index != index:
prev_index = index
prev_i = i
actual_paths.append(all_paths[index])
subrg.append([])

if i in row_groups:
subrg[-1].append(i - prev_i)

for k in range(len(subrg) - 1, -1, -1):
if len(subrg[k]) == 0:
del actual_paths[k]
del subrg[k]
if subform is None:
subform = ak._v2._connect.pyarrow.form_handle_arrow(
parquetfile_for_metadata.schema_arrow, pass_empty_field=True
)
return parquet_columns, subform, actual_paths, fs, subrg, metadata

for k in range(len(subrg) - 1, -1, -1):
if len(subrg[k]) == 0:
del actual_paths[k]
del subrg[k]

def _load(
actual_paths,
parquet_columns,
subrg,
max_gap,
max_block,
footer_sample_size,
conservative_optiontype,
subform,
highlevel,
behavior,
fs,
meta,
):
arrays = []
for i, p in enumerate(actual_paths):
with fsspec.parquet.open_parquet_file(
p,
fs=fs,
engine="pyarrow",
columns=parquet_columns,
row_groups=subrg[i],
storage_options=storage_options,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
) as file:
parquetfile = pyarrow_parquet.ParquetFile(file)

if subform is None:
subform = ak._v2._connect.pyarrow.form_handle_arrow(
parquetfile.schema_arrow, pass_empty_field=True
)

if row_groups is None:
arrow_table = parquetfile.read(parquet_columns)
else:
arrow_table = parquetfile.read_row_groups(subrg[i], parquet_columns)

arrays.append(
ak._v2._connect.pyarrow.handle_arrow(
arrow_table,
_read_parquet_file(
p,
fs=fs,
parquet_columns=parquet_columns,
row_groups=subrg[i],
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
conservative_optiontype=conservative_optiontype,
pass_empty_field=True,
metadata=meta,
)
)

if len(arrays) == 0:
numpy = ak.nplike.Numpy.instance()
return ak._v2.operations.convert.ak_from_buffers._impl(
subform, 0, _DictOfEmptyBuffers(), "", numpy, highlevel, behavior
)
Expand All @@ -238,6 +238,44 @@ def _impl(
)


def _read_parquet_file(
path,
fs,
parquet_columns,
row_groups,
footer_sample_size,
max_gap,
max_block,
metadata,
conservative_optiontype,
):
import fsspec.parquet
import pyarrow.parquet as pyarrow_parquet

with fsspec.parquet.open_parquet_file(
path,
fs=fs,
engine="pyarrow",
columns=parquet_columns,
row_groups=row_groups,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
) as file:
parquetfile = pyarrow_parquet.ParquetFile(file)

if row_groups is None:
arrow_table = parquetfile.read(parquet_columns)
else:
arrow_table = parquetfile.read_row_groups(row_groups, parquet_columns)

return ak._v2._connect.pyarrow.handle_arrow(
arrow_table,
conservative_optiontype=conservative_optiontype,
pass_empty_field=True,
)


class _DictOfEmptyBuffers:
def __getitem__(self, where):
return b"\x00\x00\x00\x00\x00\x00\x00\x00"
Expand Down