From 586f007799c4008aa610c7522790a08a15d57921 Mon Sep 17 00:00:00 2001 From: tomer-mamia <125267619+tomerm-iguazio@users.noreply.github.com> Date: Wed, 14 Jun 2023 10:55:14 +0300 Subject: [PATCH 1/2] ML-3895: Fix date parser (#445) * update test_csv_source_with_none_values test to parse dates * update test_csv_source_with_none_values test to check not a time (NaT) * change condition in _datetime_from_timestamp * lint * fix test "test_csv_source_with_none_values" datetime * change assert to support pd.NaT --- storey/sources.py | 2 +- tests/test_flow.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 92d29173..45a4e6ac 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -793,7 +793,7 @@ def _init(self): self._dfs.append(df) def _datetime_from_timestamp(self, timestamp): - if timestamp == "" or timestamp is None: + if timestamp == "" or pd.isna(timestamp): return None if self._timestamp_format: return pandas.to_datetime(timestamp, format=self._timestamp_format).floor("u").to_pydatetime() diff --git a/tests/test_flow.py b/tests/test_flow.py index 0eeb394c..774c95de 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -3473,7 +3473,7 @@ def test_query_by_key_non_aggregate(): def test_csv_source_with_none_values(): controller = build_flow( [ - CSVSource("tests/test-with-none-values.csv", key_field="string"), + CSVSource("tests/test-with-none-values.csv", key_field="string", parse_dates="date_with_none"), Reduce([], append_and_return, full_event=True), ] ).run() @@ -3488,10 +3488,10 @@ def test_csv_source_with_none_values(): False, 1, 2.3, - "2021-04-21 15:56:53.385444", + pd.to_datetime("2021-04-21 15:56:53.385444"), ] assert termination_result[1].key == "b" - excepted_result = ["b", True, math.nan, math.nan, math.nan, math.nan] + excepted_result = ["b", True, math.nan, math.nan, math.nan, pd.NaT] assert len(termination_result[1].body) == len(excepted_result) for x, y in zip(termination_result[1].body, excepted_result): if isinstance(x, float): @@ -3500,6 +3500,8 @@ def test_csv_source_with_none_values(): assert math.isnan(y) else: assert x == y + elif isinstance(x, type(pd.NaT)): + assert isinstance(y, type(pd.NaT)) else: assert x == y From 12921e5383a6fd73937dfe4d511d1381fd7d82d4 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 19 Jun 2023 12:30:30 +0800 Subject: [PATCH 2/2] ML-3806: Log errors raised during batch flush (#446) Co-authored-by: Gal Topper --- storey/flow.py | 12 +++++------- storey/targets.py | 2 +- tests/test_flow.py | 24 ++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/storey/flow.py b/storey/flow.py index 55877aa4..a1ad0abf 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -953,7 +953,7 @@ class _Batching(Flow): def __init__( self, max_events: Optional[int] = None, - flush_after_seconds: Optional[int] = None, + flush_after_seconds: Union[int, float, None] = None, key_field: Optional[Union[str, Callable[[Event], str]]] = None, **kwargs, ): @@ -980,7 +980,6 @@ def _init(self): self._batch_last_event_time: Dict[Optional[str], datetime.datetime] = {} self._batch_start_time: Dict[Optional[str], float] = {} self._timeout_task: Optional[Task] = None - self._timeout_task_ex: Optional[Exception] = None @staticmethod def _create_key_extractor(key_field) -> Callable: @@ -1022,9 +1021,6 @@ async def _do(self, event): elif self._batch_last_event_time[key] < event_time: self._batch_last_event_time[key] = event_time - if self._timeout_task_ex: - raise self._timeout_task_ex - if self._flush_after_seconds is not None and self._timeout_task is None: self._timeout_task = asyncio.get_running_loop().create_task(self._sleep_and_emit()) @@ -1044,8 +1040,10 @@ async def _sleep_and_emit(self): if delta_seconds < self._flush_after_seconds: await asyncio.sleep(self._flush_after_seconds - delta_seconds) await self._emit_batch(key) - except Exception as ex: - self._timeout_task_ex = ex + except Exception: + message = traceback.format_exc() + if self.logger: + self.logger.error(f"Failed to flush batch in step '{self.name}':\n{message}") self._timeout_task = None diff --git a/storey/targets.py b/storey/targets.py index 9fc16a99..02dc04ed 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -524,7 +524,7 @@ def __init__( time_format: Optional[str] = None, infer_columns_from_data: Optional[bool] = None, max_events: Optional[int] = None, - flush_after_seconds: Optional[int] = None, + flush_after_seconds: Union[int, float, None] = None, **kwargs, ): self._single_file_mode = False diff --git a/tests/test_flow.py b/tests/test_flow.py index 774c95de..06d28875 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -783,6 +783,30 @@ def test_write_parquet_flush(tmpdir): asyncio.run(async_test_write_parquet_flush(tmpdir)) +def test_parquet_flush_with_inconsistent_schema_logs_error(tmpdir): + out_dir = f"{tmpdir}/test_parquet_flush_with_inconsistent_schema_logs_error/{uuid.uuid4().hex}/" + + logger = MockLogger() + context = MockContext(logger, False) + + columns = [("my_int_or_string", "int"), ("my_string", "str")] + target = ParquetTarget( + out_dir, + columns=columns, + partition_cols=[], + flush_after_seconds=0.5, + context=context, + ) + controller = build_flow([SyncEmitSource(), target]).run() + controller.emit(["it is actually a string", "abc"]) + time.sleep(1) + + assert logger.logs[0][1][0].startswith("Failed to flush batch in step 'ParquetTarget':") + + controller.terminate() + controller.await_termination() + + def test_error_flow(): controller = build_flow( [