From 53c4abff798239c35f121b158f354607eb54085a Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 15 Aug 2017 10:41:13 -0400 Subject: [PATCH] Fix parallel workers blocking when tasks raise exception. --- openwebvulndb/common/parallel.py | 2 ++ openwebvulndb/wordpress/repository.py | 1 - tests/common_test/parallel_test.py | 18 +++++++++++++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/openwebvulndb/common/parallel.py b/openwebvulndb/common/parallel.py index 898d412..039e4c1 100644 --- a/openwebvulndb/common/parallel.py +++ b/openwebvulndb/common/parallel.py @@ -51,6 +51,8 @@ async def consume(self, n): self._handle_task_timeout(task) else: await coroutine(*args, **kwargs) + except Exception as e: + logger.warn("Unexpected exception in {} {}: {}".format(self.name, n, repr(e))) finally: self.queue.task_done() diff --git a/openwebvulndb/wordpress/repository.py b/openwebvulndb/wordpress/repository.py index 94fc49b..67d37de 100644 --- a/openwebvulndb/wordpress/repository.py +++ b/openwebvulndb/wordpress/repository.py @@ -60,7 +60,6 @@ async def fetch(self, url, parser): try: async with self.session.get(url) as response: data = await response.text() - response.close() return parser.parse(data) except SoftwareNotFound: raise diff --git a/tests/common_test/parallel_test.py b/tests/common_test/parallel_test.py index fe67899..5ddecb9 100644 --- a/tests/common_test/parallel_test.py +++ b/tests/common_test/parallel_test.py @@ -17,8 +17,10 @@ from unittest import TestCase from fixtures import async_test +import async_timeout +from aiohttp.test_utils import make_mocked_coro -from openwebvulndb.common.parallel import BackgroundRunner +from openwebvulndb.common.parallel import BackgroundRunner, ParallelWorker class ParallelTest(TestCase): @@ -43,3 +45,17 @@ async def test_configured_runner(self, loop): def test_no_loop_uses_default(self): runner = BackgroundRunner(None) self.assertIs(runner.run, BackgroundRunner.default) + + @async_test() + async def test_consume_do_not_block_on_exception(self, loop): + async def coro0(): + raise Exception() + + coro = make_mocked_coro() + worker = ParallelWorker(1, loop=loop) + await worker.request(coro0) + await worker.request(coro) + with async_timeout.timeout(timeout=0.01, loop=loop): + await worker.wait() + + coro.assert_called_once_with()