From 1b85485a843953e5df66d53717d6a410c90a34af Mon Sep 17 00:00:00 2001 From: GueroudjiAmal Date: Thu, 21 Dec 2023 09:57:52 -0600 Subject: [PATCH] stimulus external task --- distributed/bridge.py | 4 +++- distributed/scheduler.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/distributed/bridge.py b/distributed/bridge.py index a180fb47a18..fe83d0aaebc 100644 --- a/distributed/bridge.py +++ b/distributed/bridge.py @@ -30,9 +30,11 @@ from distributed import preloading from distributed.core import ConnectionPool from distributed.security import Security -from distributed.utils import LoopRunner, SyncMethodMixin, no_default, sync +from distributed.utils import LoopRunner, SyncMethodMixin, sync from distributed.utils_comm import gather_from_workers, scatter_to_workers +from dask.typing import no_default + logger = logging.getLogger(__name__) def _handle_print(event): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f1fb6183ce6..80ab9335fda 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4998,7 +4998,10 @@ def stimulus_external_task_finished( ws: WorkerState = self.workers[worker] ts: TaskState = self.tasks.get(key) if ts.state == "external": - ts.metadata.update(kwargs["metadata"]) + if kwargs["metadata"]: + if ts.metadata is None: + ts.metadata = dict() + ts.metadata.update(kwargs["metadata"]) r: tuple = self._transition( key, "memory", stimulus_id, worker=worker, **kwargs )