diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 74915c4e254c..ed711ae00657 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -88,11 +88,13 @@ def wait_until_finish(self, duration=None) -> PipelineState: # Convert milliseconds to seconds duration /= 1000 self.client.wait_for_workers(timeout=duration) - self.client.gather(self.futures, errors='raise') + self.client.gather(self.futures, errors='raise', asynchronous=True) self._state = PipelineState.DONE except: # pylint: disable=broad-except self._state = PipelineState.FAILED raise + # finally: + # self.client.close(timeout=duration) return self._state def cancel(self) -> PipelineState: