Skip to content

Commit

Permalink
Remove allow-pickle option
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Dec 13, 2023
1 parent 05ba316 commit d610122
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 58 deletions.
17 changes: 0 additions & 17 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,23 +402,6 @@ def start(self, scheduler):
assert s.foo == "bar"


@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
async def test_register_plugin_pickle_disabled(c, s, a, b):
class Dummy1(SchedulerPlugin):
def start(self, scheduler):
scheduler.foo = "bar"

n_plugins = len(s.plugins)
with pytest.raises(ValueError) as excinfo:
await c.register_plugin(Dummy1())

msg = str(excinfo.value)
assert "disallowed from deserializing" in msg
assert "distributed.scheduler.pickle" in msg

assert n_plugins == len(s.plugins)


@gen_cluster(nthreads=[])
async def test_unregister_scheduler_plugin(s):
class Plugin(SchedulerPlugin):
Expand Down
10 changes: 0 additions & 10 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ properties:
If we don't receive a heartbeat faster than this then we assume that the worker has died.
pickle:
type: boolean
description: |
Is the scheduler allowed to deserialize arbitrary bytestrings?
The scheduler almost never deserializes user data.
However there are some cases where the user can submit functions to run directly on the scheduler.
This can be convenient for debugging, but also introduces some security risk.
By setting this to false we ensure that the user is unable to run arbitrary code on the scheduler.
preload:
type: array
description: |
Expand Down
1 change: 0 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ distributed:
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html
unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h")
Expand Down
12 changes: 1 addition & 11 deletions distributed/protocol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import msgpack

import dask.config

from distributed.protocol import pickle
from distributed.protocol.compression import decompress, maybe_compress
from distributed.protocol.serialize import (
Expand Down Expand Up @@ -117,8 +115,6 @@ def _encode_default(obj):
def loads(frames, deserialize=True, deserializers=None):
"""Transform bytestream back into Python value"""

allow_pickle = dask.config.get("distributed.scheduler.pickle")

try:

def _decode_default(obj):
Expand Down Expand Up @@ -148,13 +144,7 @@ def _decode_default(obj):
sub_frames = frames[offset : offset + sub_header["num-sub-frames"]]
if "compression" in sub_header:
sub_frames = decompress(sub_header, sub_frames)
if allow_pickle:
return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
else:
raise ValueError(
"Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`"
)

return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
return msgpack_decode_default(obj)

return msgpack.loads(
Expand Down
4 changes: 0 additions & 4 deletions distributed/protocol/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,6 @@ class ToPickle(Generic[T]):
Both the scheduler and workers with automatically unpickle this
object on arrival.
Notice, this requires that the scheduler is allowed to use pickle.
If the configuration option "distributed.scheduler.pickle" is set
to False, the scheduler will raise an exception instead.
"""

data: T
Expand Down
13 changes: 5 additions & 8 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3533,6 +3533,10 @@ def __init__(
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(

Check warning on line 3537 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3537

Added line #L3537 was not covered by tests
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
Expand Down Expand Up @@ -5868,13 +5872,6 @@ async def register_scheduler_plugin(
idempotent: bool | None = None,
) -> None:
"""Register a plugin on the scheduler."""
if not dask.config.get("distributed.scheduler.pickle"):
raise ValueError(
"Cannot register a scheduler plugin as the scheduler "
"has been explicitly disallowed from deserializing "
"arbitrary bytestrings using pickle via the "
"'distributed.scheduler.pickle' configuration setting."
)
if idempotent is None:
warnings.warn(
"The signature of `Scheduler.register_scheduler_plugin` now requires "
Expand Down Expand Up @@ -6922,7 +6919,7 @@ def workers_to_close(

if key is None:
key = operator.attrgetter("address")
if isinstance(key, bytes) and dask.config.get("distributed.scheduler.pickle"):
if isinstance(key, bytes):
key = pickle.loads(key)

groups = groupby(key, self.workers.values())
Expand Down
12 changes: 5 additions & 7 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1799,13 +1799,11 @@ def f(dask_scheduler=None):
assert response == s.address


@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
async def test_run_on_scheduler_disabled(c, s, a, b):
def f(dask_scheduler=None):
return dask_scheduler.address

with pytest.raises(ValueError, match="disallowed from deserializing"):
await c._run_on_scheduler(f)
@gen_test()
async def test_allow_pickle_false():
with dask.config.set({"distributed.scheduler.pickle": False}):
with pytest.raises(RuntimeError, match="Pickling can no longer be disabled"):
await Scheduler()


@gen_cluster()
Expand Down

0 comments on commit d610122

Please sign in to comment.