diff --git a/distributed/core.py b/distributed/core.py index 66470ef224..d70ae3f383 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import functools import inspect import logging import math @@ -121,6 +122,7 @@ def _raise(*args, **kwargs): LOG_PDB = dask.config.get("distributed.admin.pdb-on-err") +@functools.lru_cache def _expects_comm(func: Callable) -> bool: sig = inspect.signature(func) params = list(sig.parameters) diff --git a/distributed/dashboard/components/nvml.py b/distributed/dashboard/components/nvml.py index 20b0f6856b..6a381005c5 100644 --- a/distributed/dashboard/components/nvml.py +++ b/distributed/dashboard/components/nvml.py @@ -12,14 +12,13 @@ TapTool, ) from bokeh.plotting import figure -from tornado import escape from dask.utils import format_bytes from distributed.dashboard.components import DashboardComponent, add_periodic_callback from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024, env from distributed.dashboard.utils import update -from distributed.utils import log_errors +from distributed.utils import log_errors, url_escape class GPUCurrentLoad(DashboardComponent): @@ -149,7 +148,7 @@ def update(self): "worker": worker, "gpu-index": gpu_index, "y": y, - "escaped_worker": [escape.url_escape(w) for w in worker], + "escaped_worker": [url_escape(w) for w in worker], } self.memory_figure.title.text = "GPU Memory: {} / {}".format( diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py index 7376476570..f37380f68f 100644 --- a/distributed/dashboard/components/rmm.py +++ b/distributed/dashboard/components/rmm.py @@ -14,7 +14,6 @@ TapTool, ) from bokeh.plotting import figure -from tornado import escape from dask.utils import format_bytes @@ -26,7 +25,7 @@ MemoryColor, ) from distributed.dashboard.utils import update -from distributed.utils import log_errors +from distributed.utils import log_errors, url_escape T = TypeVar("T") @@ -191,7 +190,7 @@ def quadlist(i: Iterable[T]) -> list[T]: "color": color, "alpha": [1, 0.7, 0.4, 1] * len(workers), "worker": quadlist(ws.address for ws in workers), - "escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers), + "escaped_worker": quadlist(url_escape(ws.address) for ws in workers), "rmm_used": quadlist(rmm_used), "rmm_total": quadlist(rmm_total), "gpu_used": quadlist(gpu_used), diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index af669c8c23..7f5b391ca0 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -53,7 +53,6 @@ from jinja2 import Environment, FileSystemLoader from tlz import curry, pipe, second, valmap from tlz.curried import concat, groupby, map -from tornado import escape import dask from dask import config @@ -91,7 +90,7 @@ from distributed.metrics import time from distributed.scheduler import Scheduler from distributed.spans import SpansSchedulerExtension -from distributed.utils import Log, log_errors +from distributed.utils import Log, log_errors, url_escape if dask.config.get("distributed.dashboard.export-tool"): from distributed.dashboard.export_tool import ExportTool @@ -199,7 +198,7 @@ def update(self): "worker": [ws.address for ws in workers], "ms": ms, "color": color, - "escaped_worker": [escape.url_escape(ws.address) for ws in workers], + "escaped_worker": [url_escape(ws.address) for ws in workers], "x": x, "y": y, } @@ -581,7 +580,7 @@ def quadlist(i: Iterable[T]) -> list[T]: "color": color, "alpha": [1, 0.7, 0.4, 1] * len(workers), "worker": quadlist(ws.address for ws in workers), - "escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers), + "escaped_worker": quadlist(url_escape(ws.address) for ws in workers), "y": quadlist(range(len(workers))), "proc_memory": quadlist(procmemory), "managed": quadlist(managed), @@ -732,7 +731,7 @@ def update(self): ws.metrics["transfer"]["outgoing_bytes"] for ws in wss ] workers = [ws.address for ws in wss] - escaped_workers = [escape.url_escape(worker) for worker in workers] + escaped_workers = [url_escape(worker) for worker in workers] if wss: x_limit = max( @@ -1840,7 +1839,7 @@ def update(self): "nprocessing-half": [np / 2 for np in nprocessing], "nprocessing-color": nprocessing_color, "worker": [ws.address for ws in workers], - "escaped_worker": [escape.url_escape(ws.address) for ws in workers], + "escaped_worker": [url_escape(ws.address) for ws in workers], "y": list(range(len(workers))), } @@ -2381,7 +2380,7 @@ def add_new_nodes_edges(self, new, new_edges, update=False): continue xx = x[key] yy = y[key] - node_key.append(escape.url_escape(str(key))) + node_key.append(url_escape(str(key))) node_x.append(xx) node_y.append(yy) node_state.append(task.state) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 78a9744ce1..4b647dc6f2 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -7,7 +7,6 @@ from unittest import mock import pytest -from tornado.escape import url_escape from tornado.httpclient import AsyncHTTPClient, HTTPClientError import dask.config @@ -16,7 +15,7 @@ from distributed import Event, Lock, Scheduler from distributed.client import wait from distributed.core import Status -from distributed.utils import is_valid_xml +from distributed.utils import is_valid_xml, url_escape from distributed.utils_test import ( async_poll_for, div, diff --git a/distributed/utils.py b/distributed/utils.py index 6fbbf0f224..9658dffa8a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -52,6 +52,7 @@ import click import psutil import tblib.pickling_support +from tornado import escape from distributed.compatibility import asyncio_run from distributed.config import get_loop_factory @@ -1994,3 +1995,11 @@ def __eq__(self, other): def __lt__(self, other): return self.obj < other.obj + + +@functools.lru_cache +def url_escape(url, *args, **kwargs): + """ + Escape a URL path segment. Cache results for better performance. + """ + return escape.url_escape(url, *args, **kwargs)