Skip to content

Commit

Permalink
Fix parallel workers blocking when tasks raise exception.
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasAubry committed Aug 15, 2017
1 parent 088c6a6 commit 53c4abf
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
2 changes: 2 additions & 0 deletions openwebvulndb/common/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ async def consume(self, n):
self._handle_task_timeout(task)
else:
await coroutine(*args, **kwargs)
except Exception as e:
logger.warn("Unexpected exception in {} {}: {}".format(self.name, n, repr(e)))
finally:
self.queue.task_done()

Expand Down
1 change: 0 additions & 1 deletion openwebvulndb/wordpress/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ async def fetch(self, url, parser):
try:
async with self.session.get(url) as response:
data = await response.text()
response.close()
return parser.parse(data)
except SoftwareNotFound:
raise
Expand Down
18 changes: 17 additions & 1 deletion tests/common_test/parallel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

from unittest import TestCase
from fixtures import async_test
import async_timeout
from aiohttp.test_utils import make_mocked_coro

from openwebvulndb.common.parallel import BackgroundRunner
from openwebvulndb.common.parallel import BackgroundRunner, ParallelWorker


class ParallelTest(TestCase):
Expand All @@ -43,3 +45,17 @@ async def test_configured_runner(self, loop):
def test_no_loop_uses_default(self):
runner = BackgroundRunner(None)
self.assertIs(runner.run, BackgroundRunner.default)

@async_test()
async def test_consume_do_not_block_on_exception(self, loop):
async def coro0():
raise Exception()

coro = make_mocked_coro()
worker = ParallelWorker(1, loop=loop)
await worker.request(coro0)
await worker.request(coro)
with async_timeout.timeout(timeout=0.01, loop=loop):
await worker.wait()

coro.assert_called_once_with()

0 comments on commit 53c4abf

Please sign in to comment.