diff --git a/integration/test_filesystems_integration.py b/integration/test_filesystems_integration.py index 58f3b33d..afb1b60f 100644 --- a/integration/test_filesystems_integration.py +++ b/integration/test_filesystems_integration.py @@ -278,7 +278,7 @@ def test_write_to_parquet_to_v3io_single_file_on_termination(setup_teardown_test controller.await_termination() read_back_df = pd.read_parquet(out_file, columns=columns) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + pd.testing.assert_frame_equal(read_back_df, expected) # ML-775 @@ -304,7 +304,7 @@ def test_write_to_parquet_key_hash_partitioning(setup_teardown_test): read_back_df["hash4_key"] = read_back_df["hash4_key"].astype("int64") read_back_df.sort_values("my_int", inplace=True) read_back_df.reset_index(inplace=True, drop=True) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + pd.testing.assert_frame_equal(read_back_df, expected) # ML-701 @@ -318,12 +318,12 @@ def test_write_to_parquet_to_v3io_force_string_to_timestamp(setup_teardown_test) t = "2021-03-02T19:45:00" controller.emit([t]) expected.append([datetime.datetime.fromisoformat(t)]) - expected = pd.DataFrame(expected, columns=columns) + expected = pd.DataFrame(expected, columns=columns).astype("datetime64[us]") controller.terminate() controller.await_termination() read_back_df = pd.read_parquet(out_file, columns=columns) - assert read_back_df.equals(expected) + pd.testing.assert_frame_equal(read_back_df, expected) def test_write_to_parquet_to_v3io_with_indices(setup_teardown_test): @@ -346,7 +346,7 @@ def test_write_to_parquet_to_v3io_with_indices(setup_teardown_test): controller.await_termination() read_back_df = pd.read_parquet(out_file, columns=columns) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + pd.testing.assert_frame_equal(read_back_df, expected) # ML-602 diff --git a/storey/targets.py b/storey/targets.py index 7590c5bf..491370d0 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -645,7 +645,8 @@ async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_tim kwargs = {} if self._schema is not None: kwargs["schema"] = self._schema - df.to_parquet(path=file, index=bool(self._index_cols), **kwargs) + # version set for pyspark compatibility, and is needed as of pyarrow 13 due to timestamp incompatibility + df.to_parquet(path=file, index=bool(self._index_cols), version="2.4", **kwargs) if not self._last_written_event or last_event_time > self._last_written_event: self._last_written_event = last_event_time diff --git a/tests/test_flow.py b/tests/test_flow.py index 0a402d21..5844dd7f 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2539,7 +2539,7 @@ def test_write_to_parquet_string_as_datetime(tmpdir): expected.append([i, f"this is {i}", my_time.isoformat(sep=" ")]) expected_df = pd.DataFrame(expected, columns=columns) expected_df["my_int"] = expected_df["my_int"].astype("int8") - expected_df["my_datetime"] = expected_df["my_datetime"].astype("datetime64[ns]") + expected_df["my_datetime"] = expected_df["my_datetime"].astype("datetime64[us]") controller.terminate() controller.await_termination()