Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dask/distributed into to_pickle
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Feb 10, 2022
2 parents 5e1f02c + 8a5e014 commit 3ae389e
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 33 deletions.
9 changes: 7 additions & 2 deletions continuous_integration/scripts/test_report.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import html
import io
import os
import re
Expand Down Expand Up @@ -137,7 +138,7 @@ def dataframe_from_jxml(run):
else:
s = "x"
status.append(s)
message.append(m)
message.append(html.escape(m))
df = pandas.DataFrame(
{"file": fname, "test": tname, "status": status, "message": message}
)
Expand Down Expand Up @@ -165,7 +166,7 @@ def dedup(group):
print("Getting all recent workflows...")
workflows = get_workflow_listing()

# Filter the workflows listing to be in the last month,
# Filter the workflows listing to be in the retention period,
# and only be test runs (i.e., no linting) that completed.
workflows = [
w
Expand All @@ -177,6 +178,10 @@ def dedup(group):
and w["name"].lower() == "tests"
)
]
# Each workflow processed takes ~10-15 API requests. To avoid being
# rate limited by GitHub (1000 requests per hour) we choose just the
# most recent N runs. This also keeps the viz size from blowing up.
workflows = sorted(workflows, key=lambda w: w["created_at"])[-50:]

print("Getting the artifact listing for each workflow...")
for w in workflows:
Expand Down
8 changes: 5 additions & 3 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,11 @@ def time_left():
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc

# The intermediate capping is mostly relevant for the initial
# connect. Afterwards we should be more forgiving
intermediate_cap = intermediate_cap * 1.5
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

upper_cap = min(time_left(), backoff_base * (2 ** attempt))
Expand Down
4 changes: 2 additions & 2 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def teardown(self, worker):

await c.register_worker_plugin(ReleaseKeyDeprecated())

with pytest.deprecated_call(
match="The `WorkerPlugin.release_key` hook is depreacted"
with pytest.warns(
FutureWarning, match="The `WorkerPlugin.release_key` hook is deprecated"
):
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
Expand Down
5 changes: 0 additions & 5 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def __init__(
scheduler_file=None,
worker_port=0,
nthreads=None,
ncores=None,
loop=None,
local_dir=None,
local_directory=None,
Expand Down Expand Up @@ -172,10 +171,6 @@ def __init__(
if len(protocol_address) == 2:
protocol = protocol_address[0]

if ncores is not None:
warnings.warn("the ncores= parameter has moved to nthreads=")
nthreads = ncores

self._given_worker_port = worker_port
self.nthreads = nthreads or CPU_COUNT
self.reconnect = reconnect
Expand Down
5 changes: 0 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,11 +787,6 @@ def identity(self) -> dict:
**self._extra,
}

@property
def ncores(self):
warnings.warn("WorkerState.ncores has moved to WorkerState.nthreads")
return self._nthreads


@final
@cclass
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3461,7 +3461,6 @@ async def test_Worker__to_dict(c, s, a):
"id",
"scheduler",
"nthreads",
"ncores",
"memory_limit",
"address",
"status",
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ async def retry(
Returns
-------
Any
Whatever `await `coro()` returned
Whatever `await coro()` returned
"""
# this loop is a no-op in case max_retries<=0
for i_try in range(count):
Expand Down
5 changes: 0 additions & 5 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import tempfile
import threading
import uuid
import warnings
import weakref
from collections import defaultdict
from collections.abc import Callable
Expand Down Expand Up @@ -883,7 +882,6 @@ def gen_cluster(
("127.0.0.1", 1),
("127.0.0.1", 2),
],
ncores: None = None, # deprecated
scheduler="127.0.0.1",
timeout: float = _TEST_TIMEOUT,
security: Security | dict[str, Any] | None = None,
Expand Down Expand Up @@ -923,9 +921,6 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
"timeout should always be set and it should be smaller than the global one from"
"pytest-timeout"
)
if ncores is not None:
warnings.warn("ncores= has moved to nthreads=", stacklevel=2)
nthreads = ncores

scheduler_kwargs = merge(
{"dashboard": False, "dashboard_address": ":0"}, scheduler_kwargs
Expand Down
12 changes: 3 additions & 9 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ def __init__(
scheduler_port: int | None = None,
*,
scheduler_file: str | None = None,
ncores: None = None, # Deprecated, use nthreads instead
nthreads: int | None = None,
loop: IOLoop | None = None,
local_dir: None = None, # Deprecated, use local_directory instead
Expand Down Expand Up @@ -837,10 +836,6 @@ def __init__(
self._interface = interface
self._protocol = protocol

if ncores is not None:
warnings.warn("the ncores= parameter has moved to nthreads=")
nthreads = ncores

self.nthreads = nthreads or CPU_COUNT
if resources is None:
resources = dask.config.get("distributed.worker.resources", None)
Expand Down Expand Up @@ -1184,7 +1179,6 @@ def identity(self, comm=None):
"id": self.id,
"scheduler": self.scheduler.address,
"nthreads": self.nthreads,
"ncores": self.nthreads, # backwards compatibility
"memory_limit": self.memory_limit,
}

Expand Down Expand Up @@ -2909,7 +2903,7 @@ def total_comm_bytes(self):
warnings.warn(
"The attribute `Worker.total_comm_bytes` has been renamed to `comm_threshold_bytes`. "
"Future versions will only support the new name.",
DeprecationWarning,
FutureWarning,
)
return self.comm_threshold_bytes

Expand Down Expand Up @@ -3880,11 +3874,11 @@ def _notify_plugins(self, method_name, *args, **kwargs):
if hasattr(plugin, method_name):
if method_name == "release_key":
warnings.warn(
"The `WorkerPlugin.release_key` hook is depreacted and will be "
"The `WorkerPlugin.release_key` hook is deprecated and will be "
"removed in a future version. A similar event can now be "
"caught by filtering for a `finish=='released'` event in the "
"`WorkerPlugin.transition` hook.",
DeprecationWarning,
FutureWarning,
)

try:
Expand Down

0 comments on commit 3ae389e

Please sign in to comment.