Skip to content

Commit

Permalink
ML-3721: Support pandas 2 (#459)
Browse files Browse the repository at this point in the history
* ML-3721: Support pandas 2

* Fix requirement

---------

Co-authored-by: Gal Topper <galt@iguazio.com>
  • Loading branch information
gtopper and Gal Topper authored Sep 21, 2023
1 parent dbaf8b6 commit 30abd95
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 14 deletions.
8 changes: 3 additions & 5 deletions integration/test_filesystems_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,14 @@ def test_write_to_parquet_to_v3io(setup_teardown_test):
for i in range(10):
controller.emit([i, f"this is {i}"])
expected.append([i, f"this is {i}"])
expected_in_pyarrow1 = pd.DataFrame(expected, columns=columns)
expected_in_pyarrow3 = expected_in_pyarrow1.copy()
expected_in_pyarrow1["my_int"] = expected_in_pyarrow1["my_int"].astype("int32")
expected_in_pyarrow3["my_int"] = expected_in_pyarrow3["my_int"].astype("category")
expected_df = pd.DataFrame(expected, columns=columns)
expected_df["my_int"] = expected_df["my_int"].astype("category")

controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_dir, columns=columns)
assert read_back_df.equals(expected_in_pyarrow1) or read_back_df.equals(expected_in_pyarrow3)
pd.testing.assert_frame_equal(read_back_df, expected_df, check_categorical=False)


def test_write_to_parquet_to_v3io_single_file_on_termination(setup_teardown_test):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ aiohttp~=3.8
v3io~=0.5.14
# exclude pandas 1.5.0 due to https://github.com/pandas-dev/pandas/issues/48767
# and 1.5.* due to https://github.com/pandas-dev/pandas/issues/49203
pandas~=1.0,<1.5
pandas>=1, !=1.5.*, <3
numpy>=1.16.5,<1.23
pyarrow>=1,<12
v3io-frames~=0.10.3
Expand Down
2 changes: 1 addition & 1 deletion storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_tim
df[name] = pd.to_datetime(df[name], utc=True)
else:
raise ex
if pd.core.dtypes.common.is_datetime64_dtype(df.index) or pd.core.dtypes.common.is_datetime64tz_dtype(df.index):
if pd.core.dtypes.common.is_datetime64_dtype(df.index) or isinstance(df.index.dtype, pd.DatetimeTZDtype):
df.index = df.index.floor("u")
with self._file_system.open(file_path, "wb") as file:
kwargs = {}
Expand Down
12 changes: 5 additions & 7 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2453,15 +2453,13 @@ def test_write_to_parquet(tmpdir):
for i in range(10):
controller.emit([i, f"this is {i}"])
expected.append([i, f"this is {i}"])
expected_in_pyarrow1 = pd.DataFrame(expected, columns=columns)
expected_in_pyarrow3 = expected_in_pyarrow1.copy()
expected_in_pyarrow1["my_int"] = expected_in_pyarrow1["my_int"].astype("int32")
expected_in_pyarrow3["my_int"] = expected_in_pyarrow3["my_int"].astype("category")
expected_df = pd.DataFrame(expected, columns=columns)
expected_df["my_int"] = expected_df["my_int"].astype("category")
controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_dir, columns=columns)
assert read_back_df.equals(expected_in_pyarrow1) or read_back_df.equals(expected_in_pyarrow3)
pd.testing.assert_frame_equal(read_back_df, expected_df, check_categorical=False)


# Regression test for ML-2510.
Expand Down Expand Up @@ -2517,14 +2515,14 @@ def test_write_to_parquet_string_as_datetime(tmpdir):
expected.append([i, f"this is {i}", my_time.isoformat(sep=" ")])
expected_df = pd.DataFrame(expected, columns=columns)
expected_df["my_int"] = expected_df["my_int"].astype("int8")
expected_df["my_datetime"] = expected_df["my_datetime"].astype("datetime64[us]")
expected_df["my_datetime"] = expected_df["my_datetime"].astype("datetime64[ns]")
controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_dir, columns=columns)
read_back_df.sort_values("my_int", inplace=True)
read_back_df.reset_index(drop=True, inplace=True)
assert read_back_df.equals(expected_df)
pd.testing.assert_frame_equal(read_back_df, expected_df)


def test_write_sparse_data_to_parquet(tmpdir):
Expand Down

0 comments on commit 30abd95

Please sign in to comment.