Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask internal inherit config #4364

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ def main(

if "DASK_INTERNAL_INHERIT_CONFIG" in os.environ:
config = deserialize_for_cli(os.environ["DASK_INTERNAL_INHERIT_CONFIG"])
# Update the global config given priority to the existing global config
dask.config.update(dask.config.global_config, config, priority="old")
dask.config.update(dask.config.global_config, config)

if not host and (tls_ca_file or tls_cert or tls_key):
host = "tls://"
Expand Down
8 changes: 8 additions & 0 deletions distributed/cli/dask_spec.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import click
import json
import os
import sys
import yaml

import dask.config
from distributed.deploy.spec import run_spec
from distributed.utils import deserialize_for_cli


@click.command(context_settings=dict(ignore_unknown_options=True))
Expand All @@ -13,6 +16,11 @@
@click.option("--spec-file", type=str, default=None, help="")
@click.version_option()
def main(args, spec: str, spec_file: str):

if "DASK_INTERNAL_INHERIT_CONFIG" in os.environ:
config = deserialize_for_cli(os.environ["DASK_INTERNAL_INHERIT_CONFIG"])
dask.config.update(dask.config.global_config, config)

if spec and spec_file or not spec and not spec_file:
print("Must specify exactly one of --spec and --spec-file")
sys.exit(1)
Expand Down
3 changes: 1 addition & 2 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ def del_pid_file():

if "DASK_INTERNAL_INHERIT_CONFIG" in os.environ:
config = deserialize_for_cli(os.environ["DASK_INTERNAL_INHERIT_CONFIG"])
# Update the global config given priority to the existing global config
dask.config.update(dask.config.global_config, config, priority="old")
dask.config.update(dask.config.global_config, config)

nannies = [
t(
Expand Down
18 changes: 18 additions & 0 deletions distributed/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
LRU,
offload,
TimeoutError,
deserialize_for_cli,
serialize_for_cli,
)
from distributed.utils_test import loop, loop_in_thread # noqa: F401
from distributed.utils_test import div, has_ipv6, inc, throws, gen_test, captured_logger
Expand Down Expand Up @@ -606,3 +608,19 @@ def test_lru():
async def test_offload():
assert (await offload(inc, 1)) == 2
assert (await offload(lambda x, y: x + y, 1, y=2)) == 3


def test_cli_serialization():
# Use context manager without changing the value to ensure test side effects are restored
with dask.config.set(
{
"distributed.comm.default-scheme": dask.config.get(
"distributed.comm.default-scheme"
)
}
):
config = deserialize_for_cli(
serialize_for_cli({"distributed": {"comm": {"default-scheme": "tls"}}})
) # Take a round trip through the serialization
dask.config.update(dask.config.global_config, config)
assert dask.config.get("distributed.comm.default-scheme") == "tls"