Skip to content

Commit

Permalink
Use dumps_msgpack and loads_msgpack when packing high level graphs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk authored Jan 7, 2021
1 parent 5812314 commit f33edf8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 19 deletions.
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2585,11 +2585,11 @@ def _graph_to_futures(
if not isinstance(dsk, HighLevelGraph):
dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())

dsk = highlevelgraph_pack(dsk, self, keyset)

if isinstance(retries, Number) and retries > 0:
retries = {k: retries for k in dsk}

dsk = highlevelgraph_pack(dsk, self, keyset)

# Create futures before sending graph (helps avoid contention)
futures = {key: Future(key, self, inform=False) for key in keyset}
self._send_to_scheduler(
Expand Down
25 changes: 8 additions & 17 deletions distributed/protocol/highlevelgraph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import msgpack
from tlz import valmap

from dask.core import keys_in_tasks
Expand All @@ -10,12 +9,8 @@

from ..utils import CancelledError

from .utils import msgpack_opts
from .serialize import (
import_allowed_module,
msgpack_encode_default,
msgpack_decode_default,
)
from .core import dumps_msgpack, loads_msgpack
from .serialize import import_allowed_module


def _materialized_layer_pack(
Expand Down Expand Up @@ -93,8 +88,8 @@ def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys):
Returns
-------
data: bytes
Packed high level graph serialized by msgpack
data: list of header and payload
Packed high level graph serialized by dumps_msgpack
"""
layers = []

Expand Down Expand Up @@ -126,8 +121,7 @@ def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys):
),
}
)

return msgpack.dumps({"layers": layers}, default=msgpack_encode_default)
return dumps_msgpack({"layers": layers})


def _materialized_layer_unpack(state, dsk, dependencies, annotations):
Expand All @@ -151,8 +145,8 @@ def highlevelgraph_unpack(dumped_hlg):
Parameters
----------
dumped_hlg: bytes
Packed high level graph serialized by msgpack
dumped_hlg: list of header and payload
Packed high level graph serialized by dumps_msgpack
Returns
-------
Expand All @@ -164,10 +158,7 @@ def highlevelgraph_unpack(dumped_hlg):
Annotations for `dsk`
"""

# Notice, we set `use_list=False`, which makes msgpack convert lists to tuples
hlg = msgpack.loads(
dumped_hlg, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts
)
hlg = loads_msgpack(*dumped_hlg)

dsk = {}
deps = {}
Expand Down

0 comments on commit f33edf8

Please sign in to comment.