From a8dfb171b89c560e1cf74fedfe5f68298eaf1b21 Mon Sep 17 00:00:00 2001 From: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com> Date: Wed, 16 Aug 2023 11:19:21 +0300 Subject: [PATCH] Handle delete events from change log where there is no after (#73) --- changelog/2.bugfix.md | 1 + changelog/3.bugfix.md | 1 + .../core/event_listener/kafka/event_listener.py | 13 ++++++++----- 3 files changed, 10 insertions(+), 5 deletions(-) create mode 100644 changelog/2.bugfix.md create mode 100644 changelog/3.bugfix.md diff --git a/changelog/2.bugfix.md b/changelog/2.bugfix.md new file mode 100644 index 0000000000..3c4187f3c1 --- /dev/null +++ b/changelog/2.bugfix.md @@ -0,0 +1 @@ +Move Resource Config Selector class to public \ No newline at end of file diff --git a/changelog/3.bugfix.md b/changelog/3.bugfix.md new file mode 100644 index 0000000000..8f2023b093 --- /dev/null +++ b/changelog/3.bugfix.md @@ -0,0 +1 @@ +Handle delete events from change log where there is no after \ No newline at end of file diff --git a/port_ocean/core/event_listener/kafka/event_listener.py b/port_ocean/core/event_listener/kafka/event_listener.py index f1baa65fbd..2bce4bd56f 100644 --- a/port_ocean/core/event_listener/kafka/event_listener.py +++ b/port_ocean/core/event_listener/kafka/event_listener.py @@ -52,10 +52,13 @@ async def _get_kafka_config(self) -> KafkaConsumerConfig: return KafkaConsumerConfig.parse_obj(self.event_listener_config.dict()) - def should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: - integration_identifier = ( - msg_value.get("diff", {}).get("after", {}).get("identifier") - ) + def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: + after = msg_value.get("diff", {}).get("after", {}) + # handles delete events from change log where there is no after + if after is None: + return False + + integration_identifier = after.get("identifier") if integration_identifier == self.integration_identifier and ( "change.log" in topic ): @@ -64,7 +67,7 @@ def should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: return False async def _handle_message(self, message: dict[Any, Any], topic: str) -> None: - if not self.should_be_processed(message, topic): + if not self._should_be_processed(message, topic): return if "change.log" in topic and message is not None: