Skip to content

Commit

Permalink
Merge branch 'development' into ML-1325
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Topper committed Jul 9, 2023
2 parents 4d1619d + 12921e5 commit fa9adff
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
12 changes: 5 additions & 7 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ class _Batching(Flow):
def __init__(
self,
max_events: Optional[int] = None,
flush_after_seconds: Optional[int] = None,
flush_after_seconds: Union[int, float, None] = None,
key_field: Optional[Union[str, Callable[[Event], str]]] = None,
**kwargs,
):
Expand Down Expand Up @@ -986,7 +986,6 @@ def _init(self):
self._batch_last_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_start_time: Dict[Optional[str], float] = {}
self._timeout_task: Optional[Task] = None
self._timeout_task_ex: Optional[Exception] = None

@staticmethod
def _create_key_extractor(key_field) -> Callable:
Expand Down Expand Up @@ -1028,9 +1027,6 @@ async def _do(self, event):
elif self._batch_last_event_time[key] < event_time:
self._batch_last_event_time[key] = event_time

if self._timeout_task_ex:
raise self._timeout_task_ex

if self._flush_after_seconds is not None and self._timeout_task is None:
self._timeout_task = asyncio.get_running_loop().create_task(self._sleep_and_emit())

Expand All @@ -1051,8 +1047,10 @@ async def _sleep_and_emit(self):
if delta_seconds < self._flush_after_seconds:
await asyncio.sleep(self._flush_after_seconds - delta_seconds)
await self._emit_batch(key)
except Exception as ex:
self._timeout_task_ex = ex
except Exception:
message = traceback.format_exc()
if self.logger:
self.logger.error(f"Failed to flush batch in step '{self.name}':\n{message}")

self._timeout_task = None

Expand Down
2 changes: 1 addition & 1 deletion storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def _init(self):
self._dfs.append(df)

def _datetime_from_timestamp(self, timestamp):
if timestamp == "" or timestamp is None:
if timestamp == "" or pd.isna(timestamp):
return None
if self._timestamp_format:
return pandas.to_datetime(timestamp, format=self._timestamp_format).floor("u").to_pydatetime()
Expand Down
2 changes: 1 addition & 1 deletion storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def __init__(
time_format: Optional[str] = None,
infer_columns_from_data: Optional[bool] = None,
max_events: Optional[int] = None,
flush_after_seconds: Optional[int] = None,
flush_after_seconds: Union[int, float, None] = None,
**kwargs,
):
self._single_file_mode = False
Expand Down
32 changes: 29 additions & 3 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,30 @@ def test_write_parquet_flush(tmpdir):
asyncio.run(async_test_write_parquet_flush(tmpdir))


def test_parquet_flush_with_inconsistent_schema_logs_error(tmpdir):
out_dir = f"{tmpdir}/test_parquet_flush_with_inconsistent_schema_logs_error/{uuid.uuid4().hex}/"

logger = MockLogger()
context = MockContext(logger, False)

columns = [("my_int_or_string", "int"), ("my_string", "str")]
target = ParquetTarget(
out_dir,
columns=columns,
partition_cols=[],
flush_after_seconds=0.5,
context=context,
)
controller = build_flow([SyncEmitSource(), target]).run()
controller.emit(["it is actually a string", "abc"])
time.sleep(1)

assert logger.logs[0][1][0].startswith("Failed to flush batch in step 'ParquetTarget':")

controller.terminate()
controller.await_termination()


def test_error_flow():
controller = build_flow(
[
Expand Down Expand Up @@ -3690,7 +3714,7 @@ def test_query_by_key_non_aggregate():
def test_csv_source_with_none_values():
controller = build_flow(
[
CSVSource("tests/test-with-none-values.csv", key_field="string"),
CSVSource("tests/test-with-none-values.csv", key_field="string", parse_dates="date_with_none"),
Reduce([], append_and_return, full_event=True),
]
).run()
Expand All @@ -3705,10 +3729,10 @@ def test_csv_source_with_none_values():
False,
1,
2.3,
"2021-04-21 15:56:53.385444",
pd.to_datetime("2021-04-21 15:56:53.385444"),
]
assert termination_result[1].key == "b"
excepted_result = ["b", True, math.nan, math.nan, math.nan, math.nan]
excepted_result = ["b", True, math.nan, math.nan, math.nan, pd.NaT]
assert len(termination_result[1].body) == len(excepted_result)
for x, y in zip(termination_result[1].body, excepted_result):
if isinstance(x, float):
Expand All @@ -3717,6 +3741,8 @@ def test_csv_source_with_none_values():
assert math.isnan(y)
else:
assert x == y
elif isinstance(x, type(pd.NaT)):
assert isinstance(y, type(pd.NaT))
else:
assert x == y

Expand Down

0 comments on commit fa9adff

Please sign in to comment.