Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add func to remove one dead job #30

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions brq/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ async def count_dead_messages(self, function_name: str) -> int:
return 0
return await self.redis.zcard(dead_key)

async def remove_dead_message(self, function_name: str, job: Job):
dead_key = self.get_dead_message_key(function_name)
await self.redis.zrem(dead_key, job.to_redis())

async def emit_deferred_job(self, function_name: str, defer_until: int, job: Job):
defer_key = self.get_deferred_key(function_name)
await self.redis.zadd(defer_key, {job.to_redis(): defer_until})
Expand Down
38 changes: 38 additions & 0 deletions tests/test_brq.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async def mock_consume_raise_exception(echo_str: str):
called = False


async def mock_always_consume_raise_exception(echo_str: str):
raise Exception("raise exception")


@pytest.mark.parametrize("run_parallel", [False, True])
async def test_consume_function(async_redis_client, capfd, run_parallel):
producer = Producer(async_redis_client)
Expand Down Expand Up @@ -195,6 +199,40 @@ async def test_process_dead_jobs(async_redis_client, capfd, enable_dead_queue):
await consumer.cleanup()


@pytest.mark.parametrize("enable_dead_queue", [True, False])
async def test_delete_dead_jobs(async_redis_client, capfd, enable_dead_queue):
producer = Producer(async_redis_client)
consumer = Consumer(
async_redis_client,
mock_always_consume_raise_exception,
expire_time=0.001,
enable_dead_queue=enable_dead_queue,
)
browser = Browser(async_redis_client)
assert await producer.count_dead_messages("mock_always_consume_raise_exception") == 0

await browser.status()
await producer.run_job("mock_always_consume_raise_exception", ["hello"])
await browser.status()
await consumer.initialize()
await browser.status()
await consumer.run()
await browser.status()
out, err = capfd.readouterr()
assert "hello" not in out
await asyncio.sleep(0.1)
await consumer._move_expired_jobs()
if enable_dead_queue:
assert await producer.count_dead_messages("mock_always_consume_raise_exception") == 1
dead_messages = await producer.get_dead_messages("mock_always_consume_raise_exception")
for job in dead_messages.values():
await producer.remove_dead_message("mock_always_consume_raise_exception", job)
assert await producer.count_dead_messages("mock_always_consume_raise_exception") == 0
else:
assert await producer.count_dead_messages("mock_always_consume_raise_exception") == 0
await consumer.cleanup()


async def test_deferred_job(async_redis_client, capfd):
producer = Producer(async_redis_client)
consumer = Consumer(async_redis_client, mock_consume)
Expand Down
Loading