From df7f3b03cdc8a2e789354711dd3e1caf4284472c Mon Sep 17 00:00:00 2001 From: FrankK-1234 Date: Thu, 5 Sep 2024 13:46:04 +0200 Subject: [PATCH] Revert: allow users to disable broker heartbeats by not providing a timeout (#2097, #2016) --- kombu/mixins.py | 9 ++++----- t/unit/test_mixins.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/kombu/mixins.py b/kombu/mixins.py index 92acee980..14c1c1b9f 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -195,11 +195,10 @@ def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): try: conn.drain_events(timeout=safety_interval) except socket.timeout: - if timeout: - conn.heartbeat_check() - elapsed += safety_interval - if elapsed >= timeout: - raise + conn.heartbeat_check() + elapsed += safety_interval + if timeout and elapsed >= timeout: + raise except OSError: if not self.should_stop: raise diff --git a/t/unit/test_mixins.py b/t/unit/test_mixins.py index a33412eb9..3ef4b9584 100644 --- a/t/unit/test_mixins.py +++ b/t/unit/test_mixins.py @@ -91,7 +91,7 @@ def se(*args, **kwargs): next(it) c.connection.heartbeat_check.assert_called() - def test_consume_drain_no_heartbeat_check(self): + def test_consume_drain_heartbeat_check_no_timeout(self): c, Acons, Bcons = self._context() c.should_stop = False it = c.consume(no_ack=True, timeout=None) @@ -102,13 +102,13 @@ def se(*args, **kwargs): c.connection.drain_events.side_effect = se with pytest.raises(StopIteration): next(it) - c.connection.heartbeat_check.assert_not_called() + c.connection.heartbeat_check.assert_called() it = c.consume(no_ack=True, timeout=0) c.connection.drain_events.side_effect = se with pytest.raises(StopIteration): next(it) - c.connection.heartbeat_check.assert_not_called() + c.connection.heartbeat_check.assert_called() def test_Consumer_context(self): c, Acons, Bcons = self._context()