Skip to content

Commit

Permalink
fixed recovery hang and undefined set_close method in aiokafka
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Nov 8, 2020
1 parent 41e358c commit 06755bc
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def on_partitions_revoked(self, revoked: Set[TP]) -> None:
T = traced_from_parent_span()
T(self.flush_buffers)()
self.signal_recovery_reset.set()
self.signal_recovery_start.set()

async def on_rebalance(
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]
Expand Down Expand Up @@ -294,7 +295,7 @@ async def on_rebalance(
child_of=rebalancing_span,
)
app._span_add_default_tags(self._recovery_span)
self.signal_recovery_reset.clear()
self.signal_recovery_reset.set()
self.signal_recovery_start.set()

async def _resume_streams(self) -> None:
Expand Down

0 comments on commit 06755bc

Please sign in to comment.