diff --git a/brq/operator.py b/brq/operator.py index 40e810b..13efc39 100644 --- a/brq/operator.py +++ b/brq/operator.py @@ -209,7 +209,9 @@ async def count_unprocessed_jobs( gropu_infos = await self.redis.xinfo_groups(stream_name) for group_info in gropu_infos: if group_info["name"] == group_name: - return group_info["pending"] + group_info.get("lag", 0) + lag = group_info.get("lag") or 0 + pending = group_info.get("pending") or 0 + return lag + pending return await self.count_stream(function_name) async def count_dead_messages(self, function_name: str) -> int: