Skip to content

Commit

Permalink
Add offload support to context_meter.add_callback (#8360)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Nov 17, 2023
1 parent 412a0a9 commit a923909
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
7 changes: 2 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,16 +1105,13 @@ def context_meter_to_server_digest(digest_tag: str) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self: Server, *args: Any, **kwargs: Any) -> Any:
loop = asyncio.get_running_loop()

def metrics_callback(label: Hashable, value: float, unit: str) -> None:
if not isinstance(label, tuple):
label = (label,)
name = (digest_tag, *label, unit)
# This callback could be called from another thread through offload()
loop.call_soon_threadsafe(self.digest_metric, name, value)
self.digest_metric(name, value)

with context_meter.add_callback(metrics_callback):
with context_meter.add_callback(metrics_callback, allow_offload=True):
return await func(self, *args, **kwargs)

return wrapper
Expand Down
23 changes: 22 additions & 1 deletion distributed/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import collections
import threading
import time as timemod
from collections.abc import Callable, Hashable, Iterator
from contextlib import contextmanager
Expand Down Expand Up @@ -208,6 +210,7 @@ def add_callback(
callback: Callable[[Hashable, float, str], None],
*,
key: Hashable | None = None,
allow_offload: bool = False,
) -> Iterator[None]:
"""Add a callback when entering the context and remove it when exiting it.
The callback must accept the same parameters as :meth:`digest_metric`.
Expand All @@ -219,12 +222,30 @@ def add_callback(
key: Hashable, optional
Unique key for the callback. If two nested calls to ``add_callback`` use the
same key, suppress the outermost callback.
allow_offload: bool, optional
If set to True, this context must be executed inside a running asyncio
event loop. If a call to :meth:`digest_metric` is performed from a different
thread, e.g. from inside :func:`distributed.utils.offload`, ensure that
the callback is executed in the event loop's thread instead.
"""
if allow_offload:
loop = asyncio.get_running_loop()
tid = threading.get_ident()

def safe_cb(label: Hashable, value: float, unit: str, /) -> None:
if threading.get_ident() == tid:
callback(label, value, unit)
else: # We're inside offload()
loop.call_soon_threadsafe(callback, label, value, unit)

else:
safe_cb = callback

if key is None:
key = object()
cbs = self._callbacks.get()
cbs = cbs.copy()
cbs[key] = callback
cbs[key] = safe_cb
tok = self._callbacks.set(cbs)
try:
yield
Expand Down
22 changes: 22 additions & 0 deletions distributed/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import math
import pickle
import threading
import time

import pytest

from distributed import metrics
from distributed.compatibility import WINDOWS
from distributed.utils import offload
from distributed.utils_test import gen_test


@pytest.mark.parametrize("name", ["time", "monotonic"])
Expand Down Expand Up @@ -223,6 +226,25 @@ def raises(*args):
metrics.context_meter.digest_metric("foo", 1, "s")


@gen_test()
async def test_context_meter_allow_offload():
tid = threading.get_ident()
m = []

def cb(label, value, unit):
m.append((threading.get_ident(), label, value, unit))

with metrics.context_meter.add_callback(cb, allow_offload=True):
metrics.context_meter.digest_metric("foo", 1, "x")
await offload(metrics.context_meter.digest_metric, "bar", 1, "x")

assert m == [
(tid, "foo", 1, "x"),
(tid, "offload", m[1][2], "seconds"),
(tid, "bar", 1, "x"),
]


def test_delayed_metrics_ledger():
it = iter([120, 130, 130, 130])
ledger = metrics.DelayedMetricsLedger(func=lambda: next(it))
Expand Down

0 comments on commit a923909

Please sign in to comment.