Skip to content

Commit

Permalink
Cache URL encoding of worker addresses in dashboard (#8725)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jun 24, 2024
1 parent 33a281f commit 5b2e2b4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
2 changes: 2 additions & 0 deletions distributed/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import functools
import inspect
import logging
import math
Expand Down Expand Up @@ -121,6 +122,7 @@ def _raise(*args, **kwargs):
LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")


@functools.lru_cache
def _expects_comm(func: Callable) -> bool:
sig = inspect.signature(func)
params = list(sig.parameters)
Expand Down
5 changes: 2 additions & 3 deletions distributed/dashboard/components/nvml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
TapTool,
)
from bokeh.plotting import figure
from tornado import escape

from dask.utils import format_bytes

from distributed.dashboard.components import DashboardComponent, add_periodic_callback
from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024, env
from distributed.dashboard.utils import update
from distributed.utils import log_errors
from distributed.utils import log_errors, url_escape


class GPUCurrentLoad(DashboardComponent):
Expand Down Expand Up @@ -149,7 +148,7 @@ def update(self):
"worker": worker,
"gpu-index": gpu_index,
"y": y,
"escaped_worker": [escape.url_escape(w) for w in worker],
"escaped_worker": [url_escape(w) for w in worker],
}

self.memory_figure.title.text = "GPU Memory: {} / {}".format(
Expand Down
5 changes: 2 additions & 3 deletions distributed/dashboard/components/rmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
TapTool,
)
from bokeh.plotting import figure
from tornado import escape

from dask.utils import format_bytes

Expand All @@ -26,7 +25,7 @@
MemoryColor,
)
from distributed.dashboard.utils import update
from distributed.utils import log_errors
from distributed.utils import log_errors, url_escape

T = TypeVar("T")

Expand Down Expand Up @@ -191,7 +190,7 @@ def quadlist(i: Iterable[T]) -> list[T]:
"color": color,
"alpha": [1, 0.7, 0.4, 1] * len(workers),
"worker": quadlist(ws.address for ws in workers),
"escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers),
"escaped_worker": quadlist(url_escape(ws.address) for ws in workers),
"rmm_used": quadlist(rmm_used),
"rmm_total": quadlist(rmm_total),
"gpu_used": quadlist(gpu_used),
Expand Down
13 changes: 6 additions & 7 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from jinja2 import Environment, FileSystemLoader
from tlz import curry, pipe, second, valmap
from tlz.curried import concat, groupby, map
from tornado import escape

import dask
from dask import config
Expand Down Expand Up @@ -91,7 +90,7 @@
from distributed.metrics import time
from distributed.scheduler import Scheduler
from distributed.spans import SpansSchedulerExtension
from distributed.utils import Log, log_errors
from distributed.utils import Log, log_errors, url_escape

if dask.config.get("distributed.dashboard.export-tool"):
from distributed.dashboard.export_tool import ExportTool
Expand Down Expand Up @@ -199,7 +198,7 @@ def update(self):
"worker": [ws.address for ws in workers],
"ms": ms,
"color": color,
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"escaped_worker": [url_escape(ws.address) for ws in workers],
"x": x,
"y": y,
}
Expand Down Expand Up @@ -581,7 +580,7 @@ def quadlist(i: Iterable[T]) -> list[T]:
"color": color,
"alpha": [1, 0.7, 0.4, 1] * len(workers),
"worker": quadlist(ws.address for ws in workers),
"escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers),
"escaped_worker": quadlist(url_escape(ws.address) for ws in workers),
"y": quadlist(range(len(workers))),
"proc_memory": quadlist(procmemory),
"managed": quadlist(managed),
Expand Down Expand Up @@ -732,7 +731,7 @@ def update(self):
ws.metrics["transfer"]["outgoing_bytes"] for ws in wss
]
workers = [ws.address for ws in wss]
escaped_workers = [escape.url_escape(worker) for worker in workers]
escaped_workers = [url_escape(worker) for worker in workers]

if wss:
x_limit = max(
Expand Down Expand Up @@ -1840,7 +1839,7 @@ def update(self):
"nprocessing-half": [np / 2 for np in nprocessing],
"nprocessing-color": nprocessing_color,
"worker": [ws.address for ws in workers],
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"escaped_worker": [url_escape(ws.address) for ws in workers],
"y": list(range(len(workers))),
}

Expand Down Expand Up @@ -2381,7 +2380,7 @@ def add_new_nodes_edges(self, new, new_edges, update=False):
continue
xx = x[key]
yy = y[key]
node_key.append(escape.url_escape(str(key)))
node_key.append(url_escape(str(key)))
node_x.append(xx)
node_y.append(yy)
node_state.append(task.state)
Expand Down
3 changes: 1 addition & 2 deletions distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from unittest import mock

import pytest
from tornado.escape import url_escape
from tornado.httpclient import AsyncHTTPClient, HTTPClientError

import dask.config
Expand All @@ -16,7 +15,7 @@
from distributed import Event, Lock, Scheduler
from distributed.client import wait
from distributed.core import Status
from distributed.utils import is_valid_xml
from distributed.utils import is_valid_xml, url_escape
from distributed.utils_test import (
async_poll_for,
div,
Expand Down
9 changes: 9 additions & 0 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import click
import psutil
import tblib.pickling_support
from tornado import escape

from distributed.compatibility import asyncio_run
from distributed.config import get_loop_factory
Expand Down Expand Up @@ -1994,3 +1995,11 @@ def __eq__(self, other):

def __lt__(self, other):
return self.obj < other.obj


@functools.lru_cache
def url_escape(url, *args, **kwargs):
"""
Escape a URL path segment. Cache results for better performance.
"""
return escape.url_escape(url, *args, **kwargs)

0 comments on commit 5b2e2b4

Please sign in to comment.