Skip to content

Commit

Permalink
[SPARK-43568][SPARK-43633][PS] Support Categorical APIs for pandas 2
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to support `Categorical` APIs for [pandas 2](https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html), and match the behavior.

### Why are the changes needed?

To support pandas API on Spark with pandas 2.0.0 and above.

### Does this PR introduce _any_ user-facing change?

The behavior is matched with pandas 2.0.0 and above. e.g.

```diff
>>> psser
0    1
1    2
2    3
3    1
4    2
5    3
Name: a, dtype: category
Categories (3, int64): [1, 2, 3]
>>> psser.cat.remove_categories([1, 2, 3])
0    NaN
1    NaN
2    NaN
3    NaN
4    NaN
5    NaN
Name: a, dtype: category
-  Categories (0, object): []
+  Categories (0, int64): []
```

### How was this patch tested?

Enabling the existing tests.

Closes apache#42273 from itholic/pandas_categorical.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic authored and vpolet committed Aug 24, 2023
1 parent a6cbe6c commit c53d4e0
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 81 deletions.
1 change: 1 addition & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Upgrading from PySpark 3.5 to 4.0
* In Spark 4.0, ``DataFrame.mad`` has been removed from pandas API on Spark.
* In Spark 4.0, ``Series.mad`` has been removed from pandas API on Spark.
* In Spark 4.0, ``na_sentinel`` parameter from ``Index.factorize`` and `Series.factorize`` has been removed from pandas API on Spark, use ``use_na_sentinel`` instead.
* In Spark 4.0, ``inplace`` parameter from ``Categorical.add_categories``, ``Categorical.remove_categories``, ``Categorical.set_categories``, ``Categorical.rename_categories``, ``Categorical.reorder_categories``, ``Categorical.as_ordered``, ``Categorical.as_unordered`` have been removed from pandas API on Spark.


Upgrading from PySpark 3.3 to 3.4
Expand Down
66 changes: 21 additions & 45 deletions python/pyspark/pandas/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#
from typing import Any, Callable, List, Optional, Union, TYPE_CHECKING, cast
import warnings

import pandas as pd
from pandas.api.types import ( # type: ignore[attr-defined]
Expand Down Expand Up @@ -250,14 +249,11 @@ def add_categories(self, new_categories: Union[pd.Index, Any, List]) -> Optional
)
return DataFrame(internal)._psser_for(self._data._column_label).copy()

def _set_ordered(self, *, ordered: bool, inplace: bool) -> Optional["ps.Series"]:
def _set_ordered(self, *, ordered: bool) -> Optional["ps.Series"]:
from pyspark.pandas.frame import DataFrame

if self.ordered == ordered:
if inplace:
return None
else:
return self._data.copy()
return self._data.copy()
else:
internal = self._data._psdf._internal.with_new_spark_column(
self._data._column_label,
Expand All @@ -266,24 +262,12 @@ def _set_ordered(self, *, ordered: bool, inplace: bool) -> Optional["ps.Series"]
dtype=CategoricalDtype(categories=self.categories, ordered=ordered)
),
)
if inplace:
self._data._psdf._update_internal_frame(internal)
return None
else:
return DataFrame(internal)._psser_for(self._data._column_label).copy()
return DataFrame(internal)._psser_for(self._data._column_label).copy()

def as_ordered(self, inplace: bool = False) -> Optional["ps.Series"]:
def as_ordered(self) -> Optional["ps.Series"]:
"""
Set the Categorical to be ordered.
Parameters
----------
inplace : bool, default False
Whether or not to set the ordered attribute in-place or return
a copy of this categorical with ordered set to True.
.. deprecated:: 3.4.0
Returns
-------
Series or None
Expand Down Expand Up @@ -312,26 +296,12 @@ def as_ordered(self, inplace: bool = False) -> Optional["ps.Series"]:
dtype: category
Categories (3, object): ['a' < 'b' < 'c']
"""
if inplace:
warnings.warn(
"The `inplace` parameter in as_ordered is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
return self._set_ordered(ordered=True, inplace=inplace)
return self._set_ordered(ordered=True)

def as_unordered(self, inplace: bool = False) -> Optional["ps.Series"]:
def as_unordered(self) -> Optional["ps.Series"]:
"""
Set the Categorical to be unordered.
Parameters
----------
inplace : bool, default False
Whether or not to set the ordered attribute in-place or return
a copy of this categorical with ordered set to False.
.. deprecated:: 3.4.0
Returns
-------
Series or None
Expand Down Expand Up @@ -360,13 +330,7 @@ def as_unordered(self, inplace: bool = False) -> Optional["ps.Series"]:
dtype: category
Categories (3, object): ['a', 'b', 'c']
"""
if inplace:
warnings.warn(
"The `inplace` parameter in as_unordered is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
return self._set_ordered(ordered=False, inplace=inplace)
return self._set_ordered(ordered=False)

def remove_categories(self, removals: Union[pd.Index, Any, List]) -> Optional["ps.Series"]:
"""
Expand Down Expand Up @@ -441,8 +405,13 @@ def remove_categories(self, removals: Union[pd.Index, Any, List]) -> Optional["p
if len(categories) == 0:
return self._data.copy()
else:
data = [cat for cat in self.categories.sort_values() if cat not in categories]
if len(data) == 0:
# We should keep original dtype when even removing all categories.
data = pd.Index(data, dtype=self.categories.dtype) # type: ignore[assignment]
dtype = CategoricalDtype(
[cat for cat in self.categories if cat not in categories], ordered=self.ordered
categories=data,
ordered=self.ordered,
)
return self._data.astype(dtype)

Expand Down Expand Up @@ -488,7 +457,14 @@ def remove_unused_categories(self) -> Optional["ps.Series"]:
"""
categories = set(self._data.drop_duplicates()._to_pandas())
removals = [cat for cat in self.categories if cat not in categories]
return self.remove_categories(removals=removals)
categories = [cat for cat in removals if cat is not None] # type: ignore[assignment]
if len(categories) == 0:
return self._data.copy()
else:
dtype = CategoricalDtype(
[cat for cat in self.categories if cat not in categories], ordered=self.ordered
)
return self._data.astype(dtype)

def rename_categories(
self, new_categories: Union[list, dict, Callable]
Expand Down
38 changes: 2 additions & 36 deletions python/pyspark/pandas/tests/test_categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,14 @@ def test_categorical_series(self):
with self.assertRaisesRegex(ValueError, "Cannot call CategoricalAccessor on type int64"):
ps.Series([1, 2, 3]).cat

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43566): Enable CategoricalTests.test_categories_setter for pandas 2.0.0.",
)
def test_categories_setter(self):
pdf, psdf = self.df_pair

pser = pdf.a
psser = psdf.a

pser.cat.categories = ["z", "y", "x"]
psser.cat.categories = ["z", "y", "x"]
if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
# Bug in pandas 1.3. dtype is not updated properly with `inplace` argument.
pser = pser.astype(CategoricalDtype(categories=["x", "y", "z"]))
pser = pser.cat.rename_categories(["z", "y", "x"])
psser = psser.cat.rename_categories(["z", "y", "x"])

self.assert_eq(pser, psser)
self.assert_eq(pdf, psdf)
Expand All @@ -103,10 +96,6 @@ def test_add_categories(self):
self.assertRaises(ValueError, lambda: psser.cat.add_categories(4))
self.assertRaises(ValueError, lambda: psser.cat.add_categories([5, 5]))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43605): Enable CategoricalTests.test_remove_categories for pandas 2.0.0.",
)
def test_remove_categories(self):
pdf, psdf = self.df_pair

Expand Down Expand Up @@ -168,10 +157,6 @@ def test_reorder_categories(self):
self.assertRaises(TypeError, lambda: psser.cat.reorder_categories(1))
self.assertRaises(TypeError, lambda: psdf.b.cat.reorder_categories("abcd"))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43565): Enable CategoricalTests.test_as_ordered_unordered for pandas 2.0.0.",
)
def test_as_ordered_unordered(self):
pdf, psdf = self.df_pair

Expand All @@ -181,28 +166,9 @@ def test_as_ordered_unordered(self):
# as_ordered
self.assert_eq(pser.cat.as_ordered(), psser.cat.as_ordered())

pser.cat.as_ordered(inplace=True)
psser.cat.as_ordered(inplace=True)
if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
# Bug in pandas 1.3. dtype is not updated properly with `inplace` argument.
pser = pser.astype(CategoricalDtype(categories=[1, 2, 3], ordered=True))

self.assert_eq(pser, psser)
self.assert_eq(pdf, psdf)

# as_unordered
self.assert_eq(pser.cat.as_unordered(), psser.cat.as_unordered())

pser.cat.as_unordered(inplace=True)
psser.cat.as_unordered(inplace=True)
if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
# Bug in pandas 1.3. dtype is not updated properly with `inplace` argument.
pser = pser.astype(CategoricalDtype(categories=[1, 2, 3], ordered=False))
pdf.a = pser

self.assert_eq(pser, psser)
self.assert_eq(pdf, psdf)

def test_astype(self):
pser = pd.Series(["a", "b", "c"])
psser = ps.from_pandas(pser)
Expand Down

0 comments on commit c53d4e0

Please sign in to comment.