Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] ParquetDatasink fails with tensor data and partition_cols #50506

Closed
schmidt-ai opened this issue Feb 13, 2025 · 1 comment · Fixed by #50591
Closed

[Data] ParquetDatasink fails with tensor data and partition_cols #50506

schmidt-ai opened this issue Feb 13, 2025 · 1 comment · Fixed by #50591
Assignees
Labels
bug Something that is supposed to be working; but isn't P0 Issues that should be fixed in short order

Comments

@schmidt-ai
Copy link

schmidt-ai commented Feb 13, 2025

What happened + What you expected to happen

Writing data containing a tensor column to ParquetDatasink with partition column(s) fails because there is no pyarrow kernel for hash_list when called here.

This issue is solved by #50502.

Versions / Dependencies

MacOs 15.3
Python 3.10.16
ray==2.42.1
pyarrow==19.0.0

Reproduction script

from tempfile import TemporaryDirectory

import numpy as np
import ray.data

ctx = ray.data.DataContext.get_current()
ctx.use_arrow_tensor_v2 = False  # also fails if True

d = ray.data.from_items([{"partition_col": "a", "tensor": np.random.rand(5, 6)} for _ in range(10)])

with TemporaryDirectory() as tmpdir:
    d.write_parquet(tmpdir, partition_cols=["partition_col"])

pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<ray.data.arrow_tensor<ArrowTensorType>>, uint32)

This also fails if we explicitly cast to pyarrow.FixedShapeTensorArray:

from tempfile import TemporaryDirectory

import numpy as np
import pyarrow as pa
import ray.data


def to_arrow(batch):
    batch["tensor"] = pa.FixedShapeTensorArray.from_numpy_ndarray(batch["tensor"])
    return pa.Table.from_pydict(batch)


d = ray.data.from_items([{"partition_col": "a", "tensor": np.random.rand(5, 6).astype(np.float32)} for _ in range(10)])
d = d.map_batches(to_arrow)

with TemporaryDirectory() as tmpdir:
    d.write_parquet(tmpdir, partition_cols=["partition_col"])

pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[5,6], permutation=[0,1]]>, uint32)

Implementing the idea from #50502 fixes things:

# imports from previous code snippet

class PartitionedParquetDatasink(_FileDatasink):
    def __init__(
        self,
        *args,
        partition_cols: list[str],
        schema: pa.Schema | None = None,
        write_to_dataset_kwargs: dict[str, Any] = {},
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.partition_cols = partition_cols
        self.schema = schema
        self.write_to_dataset_kwargs = write_to_dataset_kwargs

    def write(self, blocks: Iterable[Block], ctx: TaskContext) -> None:
        write_to_dataset(
            table=pa.concat_tables(blocks),
            root_path=self.path,
            partition_cols=self.partition_cols,
            schema=self.schema,
            filesystem=self.filesystem,
            **self.write_to_dataset_kwargs,
        )

# stuff from previous snippet

with TemporaryDirectory() as tmpdir:
    d.write_datasink(PartitionedParquetDatasink(tmpdir, partition_cols=["partition_col"]))
    loaded = ray.data.read_parquet(tmpdir)
    loaded.show()

Issue Severity

Low: It annoys or frustrates me. I have a workaround using my implementation of PartitionedParquetDatasink.

@schmidt-ai schmidt-ai added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 13, 2025
@schmidt-ai schmidt-ai changed the title [Data] ParquetDatasink fails with tensor data and partition_cols set [Data] ParquetDatasink fails with tensor data and partition_cols Feb 13, 2025
@gvspraveen gvspraveen added P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 13, 2025
@gvspraveen gvspraveen self-assigned this Feb 13, 2025
@gvspraveen
Copy link
Contributor

gvspraveen commented Feb 13, 2025

Thanks for reaching out. We are actively working on this issue.

Root cause is: underlying pyarrow.groupBy which doesnt support tensor types.

import pyarrow.parquet as pq

tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2])

x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))}
schema = pa.schema(
    [
        ("category", pa.dictionary(pa.int32(), pa.string())),
        ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))),
    ]
)
t = pa.Table.from_pydict(x, schema=schema)
t.group_by("category").aggregate([("tensor", "list")])

israbbani pushed a commit that referenced this issue Feb 25, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Writing data containing a tensor column to `ParquetDatasink` with
partition column(s) fails because there is no pyarrow kernel for
`hash_list`.
This is because current implementation uses pyarrow `groupby.aggregate`.
Aggregate doesnt have kernels for tensor types (see snippet below).

This PR rewrites the implementation without aggregation on non partition
cols, thus avoiding this issue.

```
import pyarrow.parquet as pq

tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2])

x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))}
schema = pa.schema(
    [
        ("category", pa.dictionary(pa.int32(), pa.string())),
        ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))),
    ]
)
t = pa.Table.from_pydict(x, schema=schema)
t.group_by("category").aggregate([("tensor", "list")])

>> 
Traceback (most recent call last):
  File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module>
    t.group_by("category").aggregate([("tensor", "list")])
  File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate
  File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by
    return decl.to_table(use_threads=use_threads)
  File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32)
```

## Related issue number

Closes #50506


## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
xsuler pushed a commit to antgroup/ant-ray that referenced this issue Mar 4, 2025
…ct#50591)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Writing data containing a tensor column to `ParquetDatasink` with
partition column(s) fails because there is no pyarrow kernel for
`hash_list`.
This is because current implementation uses pyarrow `groupby.aggregate`.
Aggregate doesnt have kernels for tensor types (see snippet below).

This PR rewrites the implementation without aggregation on non partition
cols, thus avoiding this issue.

```
import pyarrow.parquet as pq

tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2])

x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))}
schema = pa.schema(
    [
        ("category", pa.dictionary(pa.int32(), pa.string())),
        ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))),
    ]
)
t = pa.Table.from_pydict(x, schema=schema)
t.group_by("category").aggregate([("tensor", "list")])

>> 
Traceback (most recent call last):
  File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module>
    t.group_by("category").aggregate([("tensor", "list")])
  File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate
  File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by
    return decl.to_table(use_threads=use_threads)
  File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32)
```

## Related issue number

Closes ray-project#50506


## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
xsuler pushed a commit to antgroup/ant-ray that referenced this issue Mar 4, 2025
…ct#50591)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Writing data containing a tensor column to `ParquetDatasink` with
partition column(s) fails because there is no pyarrow kernel for
`hash_list`.
This is because current implementation uses pyarrow `groupby.aggregate`.
Aggregate doesnt have kernels for tensor types (see snippet below).

This PR rewrites the implementation without aggregation on non partition
cols, thus avoiding this issue.

```
import pyarrow.parquet as pq

tensor_type = pa.fixed_shape_tensor(pa.int32(), [2, 2])

x = {"category": ["a", "b"] * 10, "tensor": list(np.random.random((20, 128)))}
schema = pa.schema(
    [
        ("category", pa.dictionary(pa.int32(), pa.string())),
        ("tensor", pa.fixed_shape_tensor(value_type=pa.float32(), shape=(128,))),
    ]
)
t = pa.Table.from_pydict(x, schema=schema)
t.group_by("category").aggregate([("tensor", "list")])

>> 
Traceback (most recent call last):
  File "/Users/praveengorthy/anyscale/rayturbo/python/ray/data/test_dataset.py", line 63, in <module>
    t.group_by("category").aggregate([("tensor", "list")])
  File "pyarrow/table.pxi", line 5562, in pyarrow.lib.TableGroupBy.aggregate
  File "/opt/miniconda3/lib/python3.9/site-packages/pyarrow/acero.py", line 308, in _group_by
    return decl.to_table(use_threads=use_threads)
  File "pyarrow/_acero.pyx", line 511, in pyarrow._acero.Declaration.to_table
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Function 'hash_list' has no kernel matching input types (extension<arrow.fixed_shape_tensor[value_type=float, shape=[128]]>, uint32)
```

## Related issue number

Closes ray-project#50506


## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Praveen Gorthy <praveeng@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P0 Issues that should be fixed in short order
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants