From 6862ed36027c9935e2d4ef8d0804118f1ad4e376 Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Fri, 24 May 2024 10:21:19 +0200 Subject: [PATCH] fix: event listener (#2119) * fix: commit transaction after taking event * feat: allow to reconnect to postgres for event listener * chore: log sync events pending to process to metrics * fix: make dead_letter runner able to process events without needing to have lock on the event * chore: close Session after reconnect * refactor: make EventSource emit only events that can be processed --- app/models.py | 3 +++ events/event_source.py | 29 +++++++++++++++++++++++++++-- events/runner.py | 40 +++++++++++++++++----------------------- monitoring.py | 12 ++++++++++++ 4 files changed, 59 insertions(+), 25 deletions(-) diff --git a/app/models.py b/app/models.py index 9783312b2..fe5d557f2 100644 --- a/app/models.py +++ b/app/models.py @@ -3699,7 +3699,10 @@ def mark_as_taken(self) -> bool: AND taken_time IS NULL """ args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id} + res = Session.execute(sql, args) + Session.commit() + return res.rowcount > 0 @classmethod diff --git a/events/event_source.py b/events/event_source.py index f23ea3f32..f4f893729 100644 --- a/events/event_source.py +++ b/events/event_source.py @@ -13,6 +13,8 @@ _DEAD_LETTER_THRESHOLD_MINUTES = 10 _DEAD_LETTER_INTERVAL_SECONDS = 30 +_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5 + class EventSource(ABC): @abstractmethod @@ -22,9 +24,19 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]): class PostgresEventSource(EventSource): def __init__(self, connection_string: str): - self.__connection = psycopg2.connect(connection_string) + self.__connection_string = connection_string + self.__connect() def run(self, on_event: Callable[[SyncEvent], NoReturn]): + while True: + try: + self.__listen(on_event) + except Exception as e: + LOG.warn(f"Error listening to events: {e}") + sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS) + self.__connect() + + def __listen(self, on_event: Callable[[SyncEvent], NoReturn]): self.__connection.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT ) @@ -44,12 +56,24 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]): webhook_id = int(notify.payload) event = SyncEvent.get_by(id=webhook_id) if event is not None: - on_event(event) + if event.mark_as_taken(): + on_event(event) + else: + LOG.info( + f"Event {event.id} was handled by another runner" + ) else: LOG.info(f"Could not find event with id={notify.payload}") except Exception as e: LOG.warn(f"Error getting event: {e}") + def __connect(self): + self.__connection = psycopg2.connect(self.__connection_string) + + from app.db import Session + + Session.close() + class DeadLetterEventSource(EventSource): @newrelic.agent.background_task() @@ -73,3 +97,4 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]): sleep(_DEAD_LETTER_INTERVAL_SECONDS) except Exception as e: LOG.warn(f"Error getting dead letter event: {e}") + sleep(_DEAD_LETTER_INTERVAL_SECONDS) diff --git a/events/runner.py b/events/runner.py index e28ff1528..d6f9c2e02 100644 --- a/events/runner.py +++ b/events/runner.py @@ -18,31 +18,25 @@ def run(self): @newrelic.agent.background_task() def __on_event(self, event: SyncEvent): try: - can_process = event.mark_as_taken() - if can_process: - event_created_at = event.created_at - start_time = arrow.now() - success = self.__sink.process(event) - if success: - event_id = event.id - SyncEvent.delete(event.id, commit=True) - LOG.info(f"Marked {event_id} as done") + event_created_at = event.created_at + start_time = arrow.now() + success = self.__sink.process(event) + if success: + event_id = event.id + SyncEvent.delete(event.id, commit=True) + LOG.info(f"Marked {event_id} as done") - end_time = arrow.now() - start_time - time_between_taken_and_created = start_time - event_created_at + end_time = arrow.now() - start_time + time_between_taken_and_created = start_time - event_created_at - newrelic.agent.record_custom_metric( - "Custom/sync_event_processed", 1 - ) - newrelic.agent.record_custom_metric( - "Custom/sync_event_process_time", end_time.total_seconds() - ) - newrelic.agent.record_custom_metric( - "Custom/sync_event_elapsed_time", - time_between_taken_and_created.total_seconds(), - ) - else: - LOG.info(f"{event.id} was handled by another runner") + newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1) + newrelic.agent.record_custom_metric( + "Custom/sync_event_process_time", end_time.total_seconds() + ) + newrelic.agent.record_custom_metric( + "Custom/sync_event_elapsed_time", + time_between_taken_and_created.total_seconds(), + ) except Exception as e: LOG.warn(f"Exception processing event [id={event.id}]: {e}") newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1) diff --git a/monitoring.py b/monitoring.py index 29c88f17f..c9d228ccc 100644 --- a/monitoring.py +++ b/monitoring.py @@ -93,11 +93,23 @@ def log_nb_db_connection(): newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection) +@newrelic.agent.background_task() +def log_pending_to_process_events(): + r = Session.execute("select count(*) from sync_events WHERE taken_time IS NULL;") + events_pending = list(r)[0][0] + + LOG.d("number of events pending to process %s", events_pending) + newrelic.agent.record_custom_metric( + "Custom/sync_events_pending_to_process", events_pending + ) + + if __name__ == "__main__": exporter = MetricExporter(get_newrelic_license()) while True: log_postfix_metrics() log_nb_db_connection() + log_pending_to_process_events() Session.close() exporter.run()