diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 6d1f12a5b..4452d85ff 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -435,7 +435,6 @@ class Consumer(Service, ConsumerT): flow_active: bool = True can_resume_flow: Event - suspend_flow: Event def __init__( self, @@ -476,7 +475,6 @@ def __init__( self._end_offset_monitor_interval = self.commit_interval * 2 self.randomly_assigned_topics = set() self.can_resume_flow = Event() - self.suspend_flow = Event() self._reset_state() super().__init__(loop=loop or self.transport.loop, **kwargs) self.transactions = self.transport.create_transaction_manager( @@ -499,7 +497,6 @@ def _reset_state(self) -> None: self._paused_partitions = set() self._buffered_partitions = set() self.can_resume_flow.clear() - self.suspend_flow.clear() self.flow_active = True self._time_start = monotonic() @@ -576,13 +573,11 @@ def stop_flow(self) -> None: """Block consumer from processing any more messages.""" self.flow_active = False self.can_resume_flow.clear() - self.suspend_flow.set() def resume_flow(self) -> None: """Allow consumer to process messages.""" self.flow_active = True self.can_resume_flow.set() - self.suspend_flow.clear() def pause_partitions(self, tps: Iterable[TP]) -> None: """Pause fetching from partitions.""" @@ -1125,9 +1120,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover if self._n_acked >= commit_every: self._n_acked = 0 await self.commit() - await self.wait_first( - callback(message), self.suspend_flow.wait() - ) + await callback(message) set_read_offset(tp, offset) else: self.log.dev(