Skip to content

Commit

Permalink
Handle delete events from change log where there is no after (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tankilevitch authored Aug 16, 2023
1 parent 0b08440 commit a8dfb17
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog/2.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move Resource Config Selector class to public
1 change: 1 addition & 0 deletions changelog/3.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle delete events from change log where there is no after
13 changes: 8 additions & 5 deletions port_ocean/core/event_listener/kafka/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ async def _get_kafka_config(self) -> KafkaConsumerConfig:

return KafkaConsumerConfig.parse_obj(self.event_listener_config.dict())

def should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool:
integration_identifier = (
msg_value.get("diff", {}).get("after", {}).get("identifier")
)
def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool:
after = msg_value.get("diff", {}).get("after", {})
# handles delete events from change log where there is no after
if after is None:
return False

integration_identifier = after.get("identifier")
if integration_identifier == self.integration_identifier and (
"change.log" in topic
):
Expand All @@ -64,7 +67,7 @@ def should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool:
return False

async def _handle_message(self, message: dict[Any, Any], topic: str) -> None:
if not self.should_be_processed(message, topic):
if not self._should_be_processed(message, topic):
return

if "change.log" in topic and message is not None:
Expand Down

0 comments on commit a8dfb17

Please sign in to comment.