Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing ak._v2.to_parquet. #1440

Merged
merged 16 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
autograd
flake8
fsspec;python_version > "3.6" and sys_platform != "win32"
jax>=0.2.7;sys_platform != "win32"
jaxlib>=0.1.57,!=0.1.68;sys_platform != "win32"
numba>=0.50.0
numexpr
pandas>=0.24.0
pyarrow>=6.0.0;sys_platform != "win32"
pyarrow>=7.0.0;python_version > "3.6" and sys_platform != "win32"
PyYAML
206 changes: 125 additions & 81 deletions src/awkward/_v2/_connect/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

else:
if ak._v2._util.parse_version(pyarrow.__version__) < ak._v2._util.parse_version(
"6.0.0"
"7.0.0"
):
pyarrow = None
error_message = "pyarrow 6.0.0 or later required for {0}"
error_message = "pyarrow 7.0.0 or later required for {0}"


def import_pyarrow(name):
Expand Down Expand Up @@ -904,6 +904,7 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False):
buffers = obj.buffers()

awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(obj.type)

out = popbuffers(
obj, awkwardarrow_type, storage_type, buffers, generate_bitmasks
)
Expand All @@ -924,65 +925,94 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False):
return ak._v2.operations.structure.concatenate(layouts, highlevel=False)

elif isinstance(obj, pyarrow.lib.RecordBatch):
child_array = []
for i in range(obj.num_columns):
layout = handle_arrow(obj.column(i), generate_bitmasks)
if not obj.schema.field(i).nullable:
child_array.append(remove_optiontype(layout))
else:
child_array.append(layout)

if pass_empty_field and list(obj.schema.names) == [""]:
return child_array[0]
layout = handle_arrow(obj.column(0), generate_bitmasks)
if not obj.schema.field(0).nullable:
return remove_optiontype(layout)
else:
return layout
else:
return ak._v2.contents.RecordArray(
child_array, obj.schema.names, length=len(obj)
record_is_optiontype = False
optiontype_fields = []
optiontype_parameters = None
recordtype_parameters = None
if (
obj.schema.metadata is not None
and b"ak:parameters" in obj.schema.metadata
):
for x in json.loads(obj.schema.metadata[b"ak:parameters"]):
(key,) = x.keys()
(value,) = x.values()
if key == "optiontype_fields":
optiontype_fields = value
elif key in (
"UnmaskedArray",
"BitMaskedArray",
"ByteMaskedArray",
"IndexedOptionArray",
):
record_is_optiontype = True
optiontype_parameters = value
elif key == "RecordArray":
recordtype_parameters = value

record_mask = None
contents = []
for i in range(obj.num_columns):
field = obj.schema.field(i)
layout = handle_arrow(obj.column(i), generate_bitmasks)
if record_is_optiontype:
if record_mask is None:
record_mask = layout.mask_as_bool(valid_when=False)
else:
record_mask &= layout.mask_as_bool(valid_when=False)
if (
record_is_optiontype and field.name not in optiontype_fields
) or not field.nullable:
contents.append(remove_optiontype(layout))
else:
contents.append(layout)

out = ak._v2.contents.RecordArray(
contents,
obj.schema.names,
length=len(obj),
parameters=recordtype_parameters,
)

if record_is_optiontype and record_mask is None and generate_bitmasks:
record_mask = numpy.zeros(len(out), dtype=np.bool_)

if record_is_optiontype and record_mask is None:
return ak._v2.contents.UnmaskedArray(
out, parameters=optiontype_parameters
)

elif record_is_optiontype:
return ak._v2.contents.ByteMaskedArray(
ak._v2.index.Index8(record_mask),
out,
valid_when=False,
parameters=optiontype_parameters,
)

else:
return out

elif isinstance(obj, pyarrow.lib.Table):
batches = obj.combine_chunks().to_batches()
if len(batches) == 0:
# zero-length array with the right type
# FIXME: create a zero-length array with the right type
raise ak._v2._util.error(NotImplementedError)
elif len(batches) == 1:
out = handle_arrow(batches[0], generate_bitmasks, pass_empty_field)
return handle_arrow(batches[0], generate_bitmasks, pass_empty_field)
else:
arrays = [
handle_arrow(batch, generate_bitmasks, pass_empty_field)
for batch in batches
if len(batch) > 0
]
raise ak._v2._util.error(
NotImplementedError(
"FIXME: need ak._v2.operations.structure.concatenate"
)
)
out = ak._v2.operations.structure.concatenate(arrays, highlevel=False)

if obj.schema.metadata is not None and b"ak:parameters" in obj.schema.metadata:
optiontype, recordtype = None, None
if out.is_OptionType:
optiontype = out
if out.is_RecordType:
recordtype = out
elif out.content.is_RecordType:
recordtype = out.content

parameters = json.loads(obj.schema.metadata[b"ak:parameters"])
for x in parameters:
(key,) = x.keys()
(value,) = x.values()
if optiontype is not None and key in (
"UnmaskedArray",
"BitMaskedArray",
"ByteMaskedArray",
"IndexedOptionArray",
):
optiontype._parameters = value
elif recordtype is not None and key == "RecordArray":
recordtype._parameters = value

return out
return ak._v2.operations.structure.concatenate(arrays, highlevel=False)

elif (
isinstance(obj, Iterable)
Expand Down Expand Up @@ -1014,43 +1044,57 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False):


def form_handle_arrow(schema, pass_empty_field=False):
forms = []
for i, arrowtype in enumerate(schema.types):
awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(arrowtype)
if pass_empty_field and list(schema.names) == [""]:
awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(schema.types[0])
akform = form_popbuffers(awkwardarrow_type, storage_type)

if not schema.field(i).nullable:
forms.append(form_remove_optiontype(akform))
if not schema.field(0).nullable:
return form_remove_optiontype(akform)
else:
forms.append(akform)
return akform

if pass_empty_field and list(schema.names) == [""]:
assert len(forms) == 1
out = forms[0]
else:
out = ak._v2.forms.RecordForm(forms, list(schema.names))

if schema.metadata is not None and b"ak:parameters" in schema.metadata:
optiontype, recordtype = None, None
if out.is_OptionType:
optiontype = out
if out.is_RecordType:
recordtype = out
elif out.content.is_RecordType:
recordtype = out.content

parameters = json.loads(schema.metadata[b"ak:parameters"])
for x in parameters:
(key,) = x.keys()
(value,) = x.values()
if optiontype is not None and key in (
"UnmaskedArray",
"BitMaskedArray",
"ByteMaskedArray",
"IndexedOptionArray",
):
optiontype._parameters = value
elif recordtype is not None and key == "RecordArray":
recordtype._parameters = value
record_is_optiontype = False
optiontype_fields = []
optiontype_parameters = None
recordtype_parameters = None
if schema.metadata is not None and b"ak:parameters" in schema.metadata:
for x in json.loads(schema.metadata[b"ak:parameters"]):
(key,) = x.keys()
(value,) = x.values()
if key == "optiontype_fields":
optiontype_fields = value
elif key in (
"UnmaskedArray",
"BitMaskedArray",
"ByteMaskedArray",
"IndexedOptionArray",
):
record_is_optiontype = True
optiontype_parameters = value
elif key == "RecordArray":
recordtype_parameters = value

return out
forms = []
for i, arrowtype in enumerate(schema.types):
field = schema.field(i)
awkwardarrow_type, storage_type = to_awkwardarrow_storage_types(arrowtype)
akform = form_popbuffers(awkwardarrow_type, storage_type)

if (
record_is_optiontype and field.name not in optiontype_fields
) or not field.nullable:
forms.append(form_remove_optiontype(akform))
else:
forms.append(akform)

out = ak._v2.forms.RecordForm(
forms, list(schema.names), parameters=recordtype_parameters
)

if record_is_optiontype:
return ak._v2.forms.ByteMaskedForm(
"i8", out, valid_when=False, parameters=optiontype_parameters
)

else:
return out
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/bitmaskedform.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return self._content._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/bytemaskedform.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return self._content._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/emptyform.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ def _select_columns(self, index, specifier, matches, output):
if any(match and index >= len(item) for item, match in zip(specifier, matches)):
output.append(None)
return self

def _column_types(self):
return ("empty",)
7 changes: 5 additions & 2 deletions src/awkward/_v2/forms/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ def simplify_optiontype(self):
def simplify_uniontype(self, merge=True, mergebool=False):
return self

def columns(self, list_indicator=None):
def columns(self, list_indicator=None, column_prefix=()):
output = []
self._columns((), output, list_indicator)
self._columns(column_prefix, output, list_indicator)
return output

def select_columns(self, specifier, expand_braces=True):
Expand All @@ -342,3 +342,6 @@ def select_columns(self, specifier, expand_braces=True):

output = []
return self._select_columns(0, specifier, matches, output)

def column_types(self):
return self._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/indexedform.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return self._content._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/indexedoptionform.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return self._content._column_types()
6 changes: 6 additions & 0 deletions src/awkward/_v2/forms/listform.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,9 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
if self.parameter("__array__") in ("string", "bytestring"):
return ("string",)
else:
return self._content._column_types()
6 changes: 6 additions & 0 deletions src/awkward/_v2/forms/listoffsetform.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,9 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
if self.parameter("__array__") in ("string", "bytestring"):
return ("string",)
else:
return self._content._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/numpyform.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,6 @@ def _select_columns(self, index, specifier, matches, output):
if any(match and index >= len(item) for item, match in zip(specifier, matches)):
output.append(None)
return self

def _column_types(self):
return (ak._v2.types.numpytype.primitive_to_dtype(self._primitive),)
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/recordform.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return sum((x._column_types() for x in self._contents), ())
6 changes: 6 additions & 0 deletions src/awkward/_v2/forms/regularform.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,9 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
if self.parameter("__array__") in ("string", "bytestring"):
return ("string",)
else:
return self._content._column_types()
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/unionform.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return sum((x._column_types() for x in self._contents), ())
3 changes: 3 additions & 0 deletions src/awkward/_v2/forms/unmaskedform.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ def _select_columns(self, index, specifier, matches, output):
self._parameters,
self._form_key,
)

def _column_types(self):
return self._content._column_types()
Loading