-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add possibility to manually perform the column projection #565
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,8 @@ def attrs(self) -> dict | None: ... | |
|
||
def project_columns(self: T, columns: frozenset[str]) -> T: ... | ||
|
||
def project_manually(self: T, columns: frozenset[str]) -> ImplementsIOFunction: ... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fully specified in the implementation of ColumnProjectionMixin, right? So does the protocol definition here serve any purpose? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be able to remove it without mypy complaining, I'll check. Apart from that, I was adding it here as other ones were defined for this protocol aswell, e.g. |
||
|
||
def __call__(self, *args, **kwargs): ... | ||
|
||
|
||
|
@@ -176,3 +178,6 @@ def project( | |
return self | ||
|
||
return self.project_columns(self.necessary_columns(report, state)) | ||
|
||
def project_manually(self: S, columns: frozenset[str]) -> ImplementsIOFunction: | ||
return self.project_columns(columns) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from dask_awkward.manual.column_optimization import optimize_columns |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from __future__ import annotations | ||
|
||
from typing import cast | ||
|
||
from dask.highlevelgraph import HighLevelGraph | ||
|
||
from dask_awkward.layers.layers import AwkwardInputLayer | ||
from dask_awkward.lib.core import Array | ||
|
||
|
||
def optimize_columns(array: Array, columns: dict[str, frozenset[str]]) -> Array: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to think of way to ensure that the resulting output is not optimised again at compute time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's why I set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hah, OK, I can see that works. We should see what it does with multiple input layers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 It should work as follows when calling |
||
""" | ||
Manually updates the AwkwardInputLayer(s) with the specified columns. This is useful | ||
for tracing the necessary buffers for a given computation once, and then reusing the | ||
typetracer reports to touch only the necessary columns for other datasets. | ||
|
||
Calling this function will update the `AwkwardInputLayer`'s `necessary_columns` attribute, | ||
i.e. pruning the columns that are not wanted. This replaces the automatic column optimization, | ||
which is why one should be careful when using this function combined with `.compute(optimize_graph=True)`. | ||
|
||
|
||
Parameters | ||
---------- | ||
array : Array | ||
The dask-awkward array to be optimized. | ||
columns : dict[str, frozenset[str]] | ||
The columns to be touched. | ||
|
||
Returns | ||
------- | ||
Array | ||
A new Dask-Awkward array with only the specified columns. | ||
""" | ||
if not isinstance(array, Array): | ||
raise TypeError( | ||
f"Expected `dak_array` to be of type `dask_awkward.Array`, got {type(array)}" | ||
) | ||
|
||
dsk = array.dask | ||
layers = dict(dsk.layers) | ||
deps = dict(dsk.dependencies) | ||
|
||
for name, cols in columns.items(): | ||
io_layer = cast(AwkwardInputLayer, layers[name]) | ||
if not isinstance(io_layer, AwkwardInputLayer): | ||
raise TypeError( | ||
f"Expected layer {name} to be of type `dask_awkward.layers.AwkwardInputLayer`, got {type(io_layer)}" | ||
) | ||
projected_layer = io_layer.project_manually(columns=cols) | ||
|
||
# explicitely disable 'project-ability' now, since we did this manually just now | ||
# Is there a better way to do this? Because this disables the possibility to chain call `dak.manual.optimize_columns` | ||
projected_layer.is_projectable = False | ||
|
||
layers[name] = projected_layer | ||
|
||
new_dsk = HighLevelGraph(layers, deps) | ||
return array._rebuild(dsk=new_dsk) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from __future__ import annotations | ||
|
||
import awkward as ak | ||
import numpy as np | ||
import pytest | ||
|
||
import dask_awkward as dak | ||
|
||
|
||
def test_optimize_columns(): | ||
pytest.importorskip("pyarrow") | ||
pytest.importorskip("requests") | ||
pytest.importorskip("aiohttp") | ||
|
||
array = dak.from_parquet( | ||
"https://github.com/scikit-hep/awkward/raw/main/tests/samples/nullable-record-primitives-simple.parquet" | ||
) | ||
|
||
needs = dak.inspect.report_necessary_columns(array.u4) | ||
only_u4_array = dak.manual.optimize_columns(array, needs) | ||
|
||
assert only_u4_array.fields == ["u4", "u8"] | ||
|
||
materialized_only_u4_array = only_u4_array.compute() | ||
|
||
# u4 is materialized, u8 is not | ||
assert isinstance( | ||
materialized_only_u4_array.layout.content("u4").content.data, np.ndarray | ||
) | ||
assert isinstance( | ||
materialized_only_u4_array.layout.content("u8").content.data, | ||
ak._nplikes.placeholder.PlaceholderArray, | ||
) | ||
|
||
# now again, but we add 'u8' by hand to the columns | ||
key, cols = needs.popitem() | ||
cols |= {"u8"} | ||
|
||
needs = {key: cols} | ||
|
||
u4_and_u8_array = dak.manual.optimize_columns(array, needs) | ||
|
||
assert u4_and_u8_array.fields == ["u4", "u8"] | ||
|
||
materialized_u4_and_u8_array = u4_and_u8_array.compute() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With or without optimise? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would result in the same output, because currently |
||
|
||
# now u4 and u8 are materialized | ||
assert isinstance( | ||
materialized_u4_and_u8_array.layout.content("u4").content.data, np.ndarray | ||
) | ||
assert isinstance( | ||
materialized_u4_and_u8_array.layout.content("u8").content.data, np.ndarray | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I am not a fan of all the cruft is takes to make mypy happy - I'd rather do with simpler or missing types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the same implementations as we have here for
project
andnecessary_columns
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to remove it, but unfortunately it is not that easy to appease mypy otherwise (except for putting
# type: ignore
at multiple places - but what's the point of type checking then?).The reason for the current implementation is that we support a protocol that all projectable IO layers need to adhere to (i.e.
ImplementsProjection
). By making sure thatself.is_projectable
is true, we know that our IO layer has the methods of theImplementsProjection
protocol available. Unfortunately, mypy seems to not recognize this correctly, becauseself.io_func
may be a more general implementation that only needs to implement theImplementsIOFunction
protocol.ImplementsProjection
is a specification of that, and we can safely "cast" it here after making sure thatself.is_projectable
is True.It's not something that I invented/added new here. I'm following the existing protocols for the IO functions.
(The same argument goes for the comment below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It is mostly removed in the one-pass PR, and I would say nothing useful got lost :)