Skip to content

Commit

Permalink
Fix for canceled from mode (#42)
Browse files Browse the repository at this point in the history
* Fix for canceled from mode

* Fix for failing to ack consumer messages
  • Loading branch information
patkivikram authored Nov 20, 2020
1 parent 69c5af6 commit 1131106
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion faust/_cython/streams.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ cdef class StreamIterator:
if self.acks_enabled_for(message.topic):
committed = consumer._committed_offset[tp]
try:
if committed is None or offset > committed:
if committed is None or offset >= committed:
acked_index = consumer._acked_index[tp]
if offset not in acked_index:
self.unacked.discard(message)
Expand Down
7 changes: 6 additions & 1 deletion faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,14 @@ async def _execute_actor(self, coro: Awaitable, aref: ActorRefT) -> None:
_current_agent.set(self)
try:
await coro
except asyncio.CancelledError:
except asyncio.CancelledError as exc:
if self.should_stop:
raise
else:
self.log.info("Restarting on rebalance")
await aref.crash(exc)
self.supervisor.wakeup()

except Exception as exc:
if self._on_error is not None:
await self._on_error(self, exc)
Expand Down
9 changes: 6 additions & 3 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ async def _fetcher(self) -> None:
try:
consumer = cast(Consumer, self.app.consumer)
await consumer._drain_messages(self)
except asyncio.CancelledError:
pass
except asyncio.CancelledError as exc:
if self.app.rebalancing:
self.log.info("Restarting on rebalance")
await self.crash(exc)
self.supervisor.wakeup()
finally:
self.set_shutdown()

Expand Down Expand Up @@ -1025,7 +1028,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
# find first list of consecutive numbers
batch = next(consecutive_numbers(acked))
# remove them from the list to clean up.
acked[: len(batch) - 1] = []
acked[: len(batch)] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1] + 1
Expand Down
9 changes: 9 additions & 0 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,12 +526,21 @@ async def _commit(self, offsets: Mapping[TP, int]) -> bool:
return False
self.log.exception("Committing raised exception: %r", exc)
await self.crash(exc)
self.supervisor.wakeup()
return False
except IllegalStateError as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
self.supervisor.wakeup()
return False
except Exception as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
self.supervisor.wakeup()
return False
return True

Expand Down

0 comments on commit 1131106

Please sign in to comment.