-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] ParquetDatasink
fails with tensor data and partition_cols
#50506
Comments
ParquetDatasink
fails with tensor data and partition_cols
setParquetDatasink
fails with tensor data and partition_cols
Thanks for reaching out. We are actively working on this issue. Root cause is: underlying pyarrow.groupBy which doesnt support tensor types.
|
8 tasks
israbbani
pushed a commit
that referenced
this issue
Feb 25, 2025
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Writing data containing a tensor column to `ParquetDatasink` with partition column(s) fails because there is no pyarrow kernel for `hash_list`. This is because current implementation uses pyarrow `groupby.aggregate`. Aggregate doesnt have kernels for tensor types (see snippet below). This PR rewrites the implementation without aggregation on non partition cols, thus avoiding this issue. ``` import pyarrow.parquet as pq tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2]) x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))} schema = pa.schema( [ ("category", pa.dictionary(pa.int32(), pa.string())), ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))), ] ) t = pa.Table.from_pydict(x, schema=schema) t.group_by("category").aggregate([("tensor", "list")]) >> Traceback (most recent call last): File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module> t.group_by("category").aggregate([("tensor", "list")]) File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by return decl.to_table(use_threads=use_threads) File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32) ``` ## Related issue number Closes #50506 ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
xsuler
pushed a commit
to antgroup/ant-ray
that referenced
this issue
Mar 4, 2025
…ct#50591) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Writing data containing a tensor column to `ParquetDatasink` with partition column(s) fails because there is no pyarrow kernel for `hash_list`. This is because current implementation uses pyarrow `groupby.aggregate`. Aggregate doesnt have kernels for tensor types (see snippet below). This PR rewrites the implementation without aggregation on non partition cols, thus avoiding this issue. ``` import pyarrow.parquet as pq tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2]) x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))} schema = pa.schema( [ ("category", pa.dictionary(pa.int32(), pa.string())), ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))), ] ) t = pa.Table.from_pydict(x, schema=schema) t.group_by("category").aggregate([("tensor", "list")]) >> Traceback (most recent call last): File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module> t.group_by("category").aggregate([("tensor", "list")]) File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by return decl.to_table(use_threads=use_threads) File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32) ``` ## Related issue number Closes ray-project#50506 ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
xsuler
pushed a commit
to antgroup/ant-ray
that referenced
this issue
Mar 4, 2025
…ct#50591) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Writing data containing a tensor column to `ParquetDatasink` with partition column(s) fails because there is no pyarrow kernel for `hash_list`. This is because current implementation uses pyarrow `groupby.aggregate`. Aggregate doesnt have kernels for tensor types (see snippet below). This PR rewrites the implementation without aggregation on non partition cols, thus avoiding this issue. ``` import pyarrow.parquet as pq tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2]) x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))} schema = pa.schema( [ ("category", pa.dictionary(pa.int32(), pa.string())), ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))), ] ) t = pa.Table.from_pydict(x, schema=schema) t.group_by("category").aggregate([("tensor", "list")]) >> Traceback (most recent call last): File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module> t.group_by("category").aggregate([("tensor", "list")]) File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by return decl.to_table(use_threads=use_threads) File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32) ``` ## Related issue number Closes ray-project#50506 ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What happened + What you expected to happen
Writing data containing a tensor column to
ParquetDatasink
with partition column(s) fails because there is no pyarrow kernel forhash_list
when called here.This issue is solved by #50502.
Versions / Dependencies
MacOs 15.3
Python 3.10.16
ray==2.42.1
pyarrow==19.0.0
Reproduction script
pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<ray.data.arrow_tensor<ArrowTensorType>>, uint32)
This also fails if we explicitly cast to
pyarrow.FixedShapeTensorArray
:pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[5,6], permutation=[0,1]]>, uint32)
Implementing the idea from #50502 fixes things:
Issue Severity
Low: It annoys or frustrates me. I have a workaround using my implementation of
PartitionedParquetDatasink
.The text was updated successfully, but these errors were encountered: