From 676d75226c3f517083baef5dab4a52e5cde0f1a0 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 21 Sep 2022 14:29:37 -0700 Subject: [PATCH] Changing to async produces a timeout error instead of stuck in infinite loop. --- sdks/python/apache_beam/runners/dask/dask_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 74915c4e254c..ed711ae00657 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -88,11 +88,13 @@ def wait_until_finish(self, duration=None) -> PipelineState: # Convert milliseconds to seconds duration /= 1000 self.client.wait_for_workers(timeout=duration) - self.client.gather(self.futures, errors='raise') + self.client.gather(self.futures, errors='raise', asynchronous=True) self._state = PipelineState.DONE except: # pylint: disable=broad-except self._state = PipelineState.FAILED raise + # finally: + # self.client.close(timeout=duration) return self._state def cancel(self) -> PipelineState: