Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize High Level Layers with Pickle #5581

Closed
mrocklin opened this issue Dec 9, 2021 · 30 comments
Closed

Serialize High Level Layers with Pickle #5581

mrocklin opened this issue Dec 9, 2021 · 30 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@mrocklin
Copy link
Member

mrocklin commented Dec 9, 2021

Right now serialization of high level graphs is a bit of a mess. It was evolved organically and without much architectural thought. This seems to be a source of technical debt.

A lot of the challenge to serializing high level graphs is in order to keep the scheduler protected in a few ways ...

  1. We don't want to unpickle client code in a potentially protected environment
  2. We don't want to couple the scheduler software version to the client and worker versions
  3. We want to allow the scheduler and client and workers to be written in different languages in the future

I like these constraints, but perhaps they're causing more havoc than they're worth. It might be time to reconsider these constraints and instead allow the client and scheduler to communicate by pickle. I think that this would allow us to remove a lot of currently painful code, and accelerate high level graph work in the future.

cc @quasiben and team

@quasiben
Copy link
Member

quasiben commented Dec 9, 2021

I believe @madsbk and @jakirkham did a fair amount of work around serialization generally and specifically for HLGs. If we relaxed the requirement for say 1) how would this make it easier ? I suppose we could remove the __Serialized__ checking in various places -- this is probably the discussion we need to have.

As an aside, have you heard any folks express any security concerns ? I have not

@jakirkham
Copy link
Member

Are these causing havoc? Do you have some examples you can share?

At least the way this was implemented was Dask just creates dicts and lists in __dask_distributed_pack__ for example. We do call to_serialize on objects that need more serialization by Distributed. So the content created for serialization is pretty simple and doesn't leave a lot of room for things falling down.

That said, given we are thinking about how to make things easier, I wonder if it makes sense to have some very simple barebones framework for serialization in Dask that Distributed then extends. One advantage of this is we wouldn't need to import things like to_serialize. Another advantage is other local schedulers (using concurrent.future's based APIs) could potentially benefit from serialization. Just a thought 🙂

@madsbk
Copy link
Contributor

madsbk commented Dec 10, 2021

I like these constraints, but perhaps they're causing more havoc than they're worth. It might be time to reconsider these constraints and instead allow the client and scheduler to communicate by pickle. I think that this would allow us to remove a lot of currently painful code, and accelerate high level graph work in the future.

I agree, this would remove a lot of complexity. Must importantly, it would remove all the code surrounding the partial-deserialization of the HLG.
I think the protocol could be boiled down to:

  1. pickle HLG while keeping host/device buffers out-of-band
  2. unpickle HLG

@jrbourbeau jrbourbeau added the discussion Discussing a topic with no specific actions yet label Dec 10, 2021
@gjoseph92
Copy link
Collaborator

I think pickle is a red herring. We could allow the scheduler to un-pickle things and it wouldn't solve our problem—it would just allow us to ignore the problem by being sloppier.

Generating tasks usually requires inlining user data or arbitrary functions into the task-tuples.

This is trivial on the client; you insert the thing into the tuple, then eventually pickle the whole tuple and send it to the scheduler. It remains an opaque binary blob on the scheduler, and eventually gets un-pickled wholesale on the worker.

But if you're generating the tasks on the scheduler, you need the client to send you those user-defined things to get inserted into the graph.
Currently, HLG layers tend to wrap these things in to_serialize and return them in the ambiguously-defined __dask_distributed_pack__ interface. When received on the scheduler, they're Serialized objects—opaque binary blobs.
The problem (illustrated in detail here) is that when you stick Serialized objects into arbitrarily-nested tuples, and then serialize the whole thing using a different serialization method (worker.dumps_task) or odd conditional logic, and then deserialize the whole thing in the worker comms, and then _maybe_deserialize_task that, very ambiguous behavior occurs. In some cases, the nested Serialized objects come out as actual objects; in other cases, they remain Serialized objects. Since it's not written down anywhere what the expected behavior is, you don't know whether you're dealing with a bug or user error, which makes developing against the API very unproductive.

With this proposal, we'd instead just un-pickle the user-defined things on the scheduler, stick them into the tasks, and then re-serialize the entire task. It would be just like on the client. It certainly would make the logic easier!

But that's just sloppy! We'd be de-serializing things just in order to immediately re-serialize them! We already do sloppy things like this today when transferring spilled-to-disk data (see end of #4424 (comment)) and should move away from it.
Sure, you might say "we can assume the data being inserted into the graph is small". That may often be true. But it's user input. Users will do things wrong. By passing too large of an argument, or an argument with some oddly slow custom pickle hooks that make an authenticated network call to your database, is it acceptable that a user could lock up the scheduler and destabilize the cluster? "You shouldn't do that" isn't a good answer for stability problems—you just shouldn't be able to to do that.

So I don't think the problem has much to do with pickle. The problem has to do with:

  1. Poorly-defined serialization interfaces
  2. Multiple layers of serialization (comm serialization vs worker dumps_task and _deserialize)
  3. Poorly-defined behavior for nested Serialize and Serialized objects
  4. Inconsistent representations for a task (is it a function, args, kwargs? a tuple with a callable as the first element? can they be nested or not? where is which representation accepted?)
  5. Poorly-defined __dask_distributed_pack__ and __dask_distributed_unpack__ interfaces. Why are they not inverses of each other (makes it harder to test)? What are they required or allowed to return? Who is responsible for serialization, and where does it happen?
  6. Serialization logic leaking into graph-construction logic in conditional branches (different ways of defining tasks, wrapping things to to_serialize or CallableLazyImports, even ending up with pickle.loads inside tasks because figuring out an alternative is just too annoying)

I think the currently-painful code could be resolved by

  1. A well-defined serialization protocol with robust support for sticking Serialized-like objects into other objects. Something in the same spirit as pickle5 (perhaps even just pickle5), where there are two streams (the pickled stuff, and the stuff you encountered while pickling that should be handled out-of-band), and the ability for the first stream to reference things in the second stream by pointers. In our case, it might be a stream of "things to unpack on the scheduler", and "things to not unpack on the scheduler". I actually have a feeling we could leverage pickle5 to do this "partial deserialization" if we wanted.
  2. Consistent representation of a task, and eliminating specialized task serialization-deserialization code

I personally don't care much about the security, versioning, or rewriting arguments. I just care about stability and performance.

I could certainly be convinced that deserializing things on the scheduler only to immediately re-serialize them is very low overhead, and worth it for the simplification of the developer experience.

I just think it's possible to have both a simple developer experience and no unnecessary deserialization if we take the time to define the interfaces and protocols well. But I'm fine if we decide that's not worth the time.

@gjoseph92
Copy link
Collaborator

The problem is actually less about nested serialization, and more just about the representation of a task, and the fact that tasks have special serialization logic that's disjoint from normal comms serialization.

Really the core pain point is that HLG code should emit two different representations of its tasks, depending on whether it's running locally or on the scheduler.

  • Locally, it should emit normal tuples like (callable, "keyref", literal, (subtask_callable, ["keyref"], ...))
  • On the scheduler, it should emit dicts like {"function": pickled_bytes, "args": pickled_bytes, "kwargs": pickled_bytes} and explicitly list the dependencies the tasks.

To make things worse, the pickled_bytes shouldn't contain Serialize/Serialized objects (or any form of nested serialization), because the un-pickling code path doesn't know about them. But the canonical way to get data from the client to the scheduler involves it arriving in a Serialized. So there's pretty much no clean way to insert already-serialized data into the worker's representation of a task, besides pre-picking the bytes on the client in your __dask_distributed_pack__ method and basically handling serialization yourself.

And if you want to use the normal-tuple representation instead (which is also supported on the scheduler; it just gets wrapped in to_serialize), you have to contend with the fact that any nested Serialized objects within the task may or may not get unpacked properly when they reach the worker, depending on the level of nesting, order in the collection, etc. And even if you do get this to work, the serialized functions aren't cached like they are in dict-format, so it's less efficient.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 4, 2022

Jim, Mads, James and I met this morning. We listed a few requirements:

Requirements

  • Support various existing comm systems, TCP, TLS, UCX, Inproc, (ws?)
  • Support fast sending of many system messages
  • Support bulk data transfer with minimal copies
  • Avoid hacky “is this key in the dict” hacks
  • Support sending high level layers to scheduler and constructing graphs there without a complex serialization protocol native to layers
  • Send system/structured data and user data in the same message
  • Ability to combine system message objects (not byte buffers) into larger messages in a clean way while keeping zero-copy -properties of large buffers in sub-objects (think a list objects, each of which may contain a large buffer).
  • Messages accurately roundtrip through encode/decode
  • Performant serialization/deserialization
    • In particular we found that passing over data structures repeatedly was costly
  • User data should skip deserialization on the scheduler
    • We could do this with to_serialize
    • We could do this with structured message types
  • Don’t deserialize with pickle on the scheduler
  • Don’t deserialize large/expensive objects on the scheduler
  • Ability to store user-data on disk in same format as wire format
  • Simple and easy to understand

Maybe-requirements

  • Enforce stricter typing on messages
  • Version checking built-in
  • GPU data on client and worker without a GPU on the scheduler

Open questions

  • How much do we care about security on the scheduler
  • How much structure / strictness do we want on messages

These requirements could be cleaned up. A lot of them are of the form "we shouldn't be doing this thing that we were doing" instead of saying "the solution should have such and such a property".

I think that the next step here is to see some design proposals on what we could do. Right now, most conversation is of the form "we're currently doing X, which is bad because of Y and Z". I think that these conversations are helpful in highlighting problems. We've also had them for a while now and it's probably time to move on towards constructive plans.

I'll provide a base plan, which I hope others can surplant with better plans.

Base plan

We continue using non-strict messages. We use msgpack to traverse them. We label known user data with to_serialize and handle that with msgpack handlers. We accept other kinds of unknown data types with pickle (this is new!) and handle those with msgpack handlers.

This is exactly the same as the current system, except that now we allow for arbitrary objects with pickle. It's fast in the common case, easy to implement (it's mostly what we have today), and allows us to avoid all of the crap that evolved out of HighLevelLayers (which seemed to cause some cascade of serialized data within serialize data which seems unpleasant).

On the open questions it does not provide scheduler security, and chooses loose vs strict specification.

I think that it covers all of the requirements listed above. I think that it allows us to clean up a bunch of crap that evolved out of HLGs. It doesn't achieve the goals of better specification, but will I think result in a net improvement on simplicity.

I'm not pushing for this approach, but I think that it suits our needs.

Example

So if there was a message, like the following:

msg = {
    "op": "foo",
    "data": [to_serialize(my_numpy_array)],
    "object": my_object,
}

msgpack would go through this, pack up the entire thing as it does today, but special handle both msg["data"][0] with dask serialization (as it does today) as well as msg["object"] with pickle serialization. Maybe these live in a separate list of buffers (as happens today) or maybe they're in-lined for pickle.

Layers (maybe counter-example?)

So let's say that the user creates a layer

layer = MyLayer("metadata", 123, my_numpy_array)

msg = {
    "op": "update-single-layer", 
    "layers": [layer],
}

We're not trying to call this thing user data, so the scheduler just gets a complete copy of it unpickled locally. This includes the numpy array, which was handled with pickle rather than Dask serialization (we don't try to traverse and blend between pickle and dask serialization).

This is a little bit dangerous because, as Gabe points out, my_numpy_array could be big or awkward in some way. My guess is that it's also not possible to both protect the scheduler from user data, and also manage graph construction on the scheduler. My guess is that we'll need user data in order to construct graphs in many cases (or not, if folks can come up with a counter-proposal).

@mrocklin
Copy link
Member Author

mrocklin commented Jan 4, 2022

I want to reiterate that I'm not pushing for the plan above. I would like to see designs that are better than that plan. If no designs materialize in, say, a month, then I would recommend that we empower people to go ahead and implement the plan above. (it should be, I think, mostly a net improvement on what we have today).

@jakirkham
Copy link
Member

cc @Kobzol @spirali (in case you have thoughts here given we are starting to rethink serialization a bit)

@gjoseph92
Copy link
Collaborator

It seems this issue is no longer just about HLG layer serialization getting to use pickle, or even about HLG serialization specifically, but about the serialization system within distributed in general. Should we update the title and description accordingly? Or maybe make #5581 (comment) into a new issue?

@Kobzol
Copy link
Contributor

Kobzol commented Jan 5, 2022

I don't have enough knowledge about the inner workings of the current Dask version to talk about details, but in general, when we were working on the Dask scheduler written in Rust, there were two broad problems with the communication protocol and serialization that we have faced:

  1. The serialization/protocol was too "polymorphic". I suppose that it was caused by "organic growth" of the (de)serialization code over time, which is natural. Tasks were represented in multiple ways, which we were mostly discovering by some new Dask client code crashing our scheduler. It was sometimes difficult to work with and understand things that were serialized in a nested way. It was difficult to reconstruct the messages, since they were containing instructions like "put this object into the first item of a list contained in a field named foo contained in...", which was near impossible to do in a statically typed language, which was also why we had to slightly modify the serialization interface for our needs.

    It would be nice if things were represented in an unified way (if possible), without a lot of edge cases. Although I haven't looked at the Dask implementation for some time, so it's possible that some of these points are no longer true.

  2. The serialization/protocol was mostly undocumented. This is mostly an orthogonal issue to a specific serialization implementation, but if it will be rewritten in a fundamental way, it might be worth it to properly document its interface and potential gotchas to simplify future efforts of cross-language Dask components and also to make it easier to understand.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 7, 2022

In a call @jcrist also mentioned an alternate solution which was "just use pickle everywhere", both for system messages (currently using msgpack) as well as user data (currently using Dask's serialization).

Short term, one way to work towards this would be to switch out Dask serialization to use Pickle everywhere with some custom registered pickle handlers.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 7, 2022

From a call today it seemed like a path forward might be the following phases:

  1. Add pickle as a fallback to the normal serialization. This should allow us to remove a lot of the cruft that has accumlated around HLGs
  2. Try replacing Dask serialization with pickle, see how this works
  3. Try replacing everything with pickle and see if that works.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 7, 2022

Additionally, @jcrist (I think) mentioned that we might want to opt-in to pickle, similar to how we opt-in to dask serialization with the to_serialize function. There might be a to_pickle function as well. This would help us to have thing fail quickly and add more structure.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 7, 2022

I'll also say that there was increased comfort with the proposal above on the call, at least along the lines of "well it seems easy and it's clearly better than what we have now".

There was still significant interest in having something more rigid/predictable, but that seemed like it would likely take several months to design and build.

We're still open to proposals here, but right now the msgpack+(dask-serialization, pickle) approach is looking more likely.

@jakirkham
Copy link
Member

Would it make sense to move some of the serialization logic to Dask itself? This might make it easier to handle things with msgpack + pickle there. If we need more customization, we could allow for the serialization functionality in Dask to be stubbed out and leverage that in Distributed

@rjzamora
Copy link
Member

Basic question: Is the current plan to allow pickle.loads to be called on the scheduler (or at least consider such behavior)? I don’t personally have a strong stance on this, but I don’t quite understand the consensus on this detail.

@jakirkham
Copy link
Member

Basic question: Is the current plan to allow pickle.loads to be called on the scheduler (or at least consider such behavior)? I don’t personally have a strong stance on this, but I don’t quite understand the consensus on this detail.

That's my understanding from @mrocklin's comments above, but perhaps he can clarify

@rjzamora
Copy link
Member

rjzamora commented Feb 2, 2022

That's my understanding from @mrocklin's comments above, but perhaps he can clarify

Sorry - I should have followed up on this. Matt confirmed offline that the plan is indeed to consider allowing pickle.loads on the scheduler (if it gains us enough simplification in HLGs or otherwise). Mads' PR demonstrates one possible solution, and I intend to explore how such a design can simplify HLG packing/unpacking logic.

@rjzamora
Copy link
Member

rjzamora commented Feb 7, 2022

Note that dask#8672 demonstrates how using pickle (via #5728) would certainly help simplify the existing Layer definitions in dask/dask (current PR removes ~350 lines of code, and we can certainly simplify further).

@naveenfaclon
Copy link

2024-02-28 09:25:22,819 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f934c1a4eb0>
0. generate_answer-7a2386c0-ac88-4dc3-bf86-a3caf2a20b76

.
Traceback (most recent call last):
File "/home/ubuntu/Mistral/new/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function generate_answer at 0x7f9528e1c820>: it's not the same object as utility.generate_answer

i am facing above error

@hinfsynz
Copy link

hinfsynz commented Mar 6, 2024

2024-02-28 09:25:22,819 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers. <dask.highlevelgraph.HighLevelGraph object at 0x7f934c1a4eb0> 0. generate_answer-7a2386c0-ac88-4dc3-bf86-a3caf2a20b76

.
Traceback (most recent call last):
File "/home/ubuntu/Mistral/new/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function generate_answer at 0x7f9528e1c820>: it's not the same object as utility.generate_answer

i am facing above error

Same issue. Did you get it resolved?

@naveenfaclon
Copy link

No, i didn't get the solution yet

@efocht
Copy link

efocht commented Mar 21, 2024

Seeing this, too:

  File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 379, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f017fb5b790>\n 0. 139644510442944\n>')

dask/distributed versions are 2024.2.0

@efocht
Copy link

efocht commented Mar 28, 2024

My problem was solved by downgrading dask/distributed to version 2023.3.2, the version before the big change in the way how HighLevelGraphs are serialized. All versions after and including 2023.4.0 fail when facing HighLevelGraphs which serialize to objects larger than 2GB (more or less). The failure is reflected by an error message like this:

Traceback (most recent call last):
  File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/comm/utils.py", line 34, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/scratch/develop/venv/lib/python3.9/site-packages/msgpack/__init__.py", line 36, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large

The problem seems to be that msgpack is attempting to serialize a huge object in distributed/protocol/core.py:dumps(), it handles the TypeError case by trying other serializers, but cannot deal with the object too large case.

@fjetter
Copy link
Member

fjetter commented Mar 28, 2024

The too large payload problem should've been addressed by #8507 which has been released in 2024.2.1

@efocht
Copy link

efocht commented Mar 28, 2024

Sorry, I still see the error, even with dask/distributed 2024.2.1

2024-03-28 22:15:16,208 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/scratch/focht/Issue_xgb_task_graph/venv-dasknew/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/scratch/focht/Issue_xgb_task_graph/venv-dasknew/lib/python3.10/site-packages/msgpack/__init__.py", line 36, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large

The relevant code is in msgpack is here and it fails because ITEM_LIMIT > 2**32 - 1. The test for your fix #8507 tests for 2**31+1.

@yoninachmany
Copy link

yoninachmany commented Jun 10, 2024

I'm getting a similar pickling issue for a HighLevelGraph with 20 layers and 21890 keys (created from an xarray apply_ufunc) - dask/distributed version 2024.5.2

TypeError                                 Traceback (most recent call last)
Cell In[10], line 4
      1 from dask.diagnostics import ProgressBar
      3 with ProgressBar():
----> 4     out_ds = out_ds.compute(scheduler='processes')
      6 out_ds

File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataarray.py:1178, in DataArray.compute(self, **kwargs)
   1153 """Manually trigger loading of this array's data from disk or a
   1154 remote source into memory and return a new array.
   1155 
   (...)
   1175 dask.compute
   1176 """
   1177 new = self.copy(deep=False)
-> 1178 return new.load(**kwargs)

File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataarray.py:1146, in DataArray.load(self, **kwargs)
   1126 def load(self, **kwargs) -> Self:
   1127     """Manually trigger loading of this array's data from disk or a
   1128     remote source into memory and return this array.
   1129 
   (...)
   1144     dask.compute
   1145     """
-> 1146     ds = self._to_temp_dataset().load(**kwargs)
   1147     new = self._from_temp_dataset(ds)
   1148     self._variable = new._variable

File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataset.py:862, in Dataset.load(self, **kwargs)
    859 chunkmanager = get_chunked_array_type(*lazy_data.values())
    861 # evaluate all the chunked arrays simultaneously
--> 862 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    863     *lazy_data.values(), **kwargs
    864 )
    866 for k, data in zip(lazy_data, evaluated_data):
    867     self.variables[k].data = data

File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/namedarray/daskmanager.py:86, in DaskManager.compute(self, *data, **kwargs)
     81 def compute(
     82     self, *data: Any, **kwargs: Any
     83 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     84     from dask.array import compute
---> 86     return compute(*data, **kwargs)

File ~/.conda/envs/zeus/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.conda/envs/zeus/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
   1477 with io.BytesIO() as file:
   1478     cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479     cp.dump(obj)
   1480     return file.getvalue()

File ~/.conda/envs/zeus/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
   1243 def dump(self, obj):
   1244     try:
-> 1245         return super().dump(obj)
   1246     except RuntimeError as e:
   1247         if len(e.args) > 0 and "recursion" in e.args[0]:

TypeError: cannot pickle 'weakref.ReferenceType' object

@fjetter
Copy link
Member

fjetter commented Jun 10, 2024

@yoninachmany I assume you should receive this error message with pretty much every version of dask. At least in your example, you are using processes to schedule your computation. In this case, we've always had to serialize the graph to send it to the subprocess.
If this did work for you on another version, please open a new ticket, ideally with a reproducer.

About what I'm seeing, it looks like something in your code is using weakrefs which cannot be serialized. I suspect this is some custom code on your end. You can probably figure out which layer is the culprit by doign something like

from distributed.protocol import dumps
for name, layer in out_ds.dask.layers.items():
     try:
         dumps(layer)
     except:
         print(f"{name} is the culprit")

@yoninachmany
Copy link

yoninachmany commented Jun 10, 2024

Thanks for the pointer - FYI msgpack failing to serialize the 'Blockwise'/'MaterializedLayer' objects in the layer that's vectorizing the function from xr.apply_ufunc and the fallback, cloudpickle, can't pickle a 'weakref.ReferenceType' object

Related to the use of metpy/pint units

Screenshot 2024-06-10 at 10 53 30 AM

@fjetter
Copy link
Member

fjetter commented Jun 10, 2024

cloudpickle, can't pickle a 'weakref.ReferenceType' object

Related to the use of metpy/pint units

The pickle vs cloudpickle thing here is not that important. I suggest to inspect whatever function you're putting into your ufunc and make sure that the objects don't use weakref. Maybe this is a pint thing, I haven't worked much with this.


@efocht sorry for the late reply

Sorry, I still see the error, even with dask/distributed 2024.2.1

I might have a fix for this in #8684 but I still need to run perf tests.


I'm going to close this issue now since this was actually completed more than a year ago in #7564

I understand that there may be follow up issues with this (as we saw above) but I would ask anybody who is encountering issues to open a new one (and link back to this if you like)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests