Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single pass serialization #4699

Closed
wants to merge 25 commits into from
Closed

Conversation

madsbk
Copy link
Contributor

@madsbk madsbk commented Apr 13, 2021

This PR streamlines the serialization in Distributed by relying on msgpack and only refer to the serialize()/deserialize() infrastructure when encountering objects not supported by msgpack.

  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

@madsbk madsbk marked this pull request as ready for review April 16, 2021 15:06
@madsbk
Copy link
Contributor Author

madsbk commented Apr 16, 2021

@jrbourbeau @mrocklin @jakirkham, this is ready for the first round of reviews :)

try:
with cache_dumps_lock:
result = cache_dumps[func]
except KeyError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't have a good feeling on what timescales where currently optimizing or whether this is a particularly performance critical section. Therefore, this comment might be irrelevant.
However, exception handling is relatively expensive and if we encounter a lot of cache misses a isin should be faster. That's ns level optimization. I could imagine the pickling is usually an order of magnitude slower and this doesn't matter at all

with cache_dumps_lock:
result = cache_dumps[func]
except KeyError:
result = pickle.dumps(func, protocol=4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why protocol=4 is hard coded?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this

Copy link
Contributor Author

@madsbk madsbk Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious too :)
This is taken directly from

result = pickle.dumps(func, protocol=4)

@jakirkham do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line comes from @mrocklin's PR ( #4019 ), which allowed connections to dynamically determine what compression and pickle protocols are supported and then use them in communication. In a few places I think Matt found it easier to simply force pickle protocol 4 than allow it to be configurable. So if this is coming from that worker code, that is the history

result = cache_dumps[func]
except KeyError:
result = pickle.dumps(func, protocol=4)
if len(result) < 100000:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can be a bit more generous with the cache size. currently we're at 100 (LRU maxsize) * 100_000 B (result) ~ 1MB. Considering how much stuff we're logging without taking size into account too much, I would suggest to be more generous with this upper limit since large results are the juicy cache hits.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: in loads_function, what if we used hash(bytes_object) as the key instead of bytes_object itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like that was just copied and moved over from here

if len(result) < 100000:

Perhaps we can make a new issue and revisit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can make a new issue and revisit?

My plan is to remove worker.dumps_function() completely, it shouldn't be required to call it explicitly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok in that case I don't think the protocol=4 bit above will be needed


def loads_function(bytes_object):
""" Load a function from bytes, cache bytes """
if len(bytes_object) < 100000:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: I would put the size of the cache into a constant s.t. the two function don't drift apart

@madsbk madsbk changed the title [WIP] Single pass serialization Single pass serialization Apr 22, 2021
@madsbk
Copy link
Contributor Author

madsbk commented Apr 22, 2021

@fjetter good points! When we settle on an overall design I will incorporate you suggestions.

Right now I am waiting on @jrbourbeau @mrocklin to review the overall design before continuing :)

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does seem a bit cleaner and simpler to me, without being a fundamental change, which is nice. I haven't thought more carefully about the implications yet though.

result = cache_dumps[func]
except KeyError:
result = pickle.dumps(func, protocol=4)
if len(result) < 100000:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: in loads_function, what if we used hash(bytes_object) as the key instead of bytes_object itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.

@@ -20,6 +21,8 @@
dask_deserialize = dask.utils.Dispatch("dask_deserialize")

_cached_allowed_modules = {}
non_list_collection_types = (tuple, set, frozenset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are set and frozenset necessary here, since they can't contain lists or dicts, even recursively within tuples?

>>> {([])}
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'

Comment on lines +303 to +306
and (
"pickle" not in serializers
or serializers.index("pickle") > serializers.index("msgpack")
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still care about whether pickle is used or not, now that we have msgpack_persist_lists?

Related: what happens if a MsgpackList gets pickled? Won't it be passed on (in a task, say) as a MsgpackList, not a plain list? Whereas msgpack_decode_default returns them as plain lists.

return {"__Set__": True, "as-tuple": tuple(obj)}

if typ is MsgpackList:
return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)}
return {"__MsgpackList__": True, "as-tuple": obj.data}

What would happen if we did this instead? obj.data should already be a list, so I'm wondering if the extra copy to a tuple is necessary.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.

Comment on lines +1784 to +1785
# With <https://github.com/dask/distributed/pull/4699>,
# deserialization is done as part of communication.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I think that you might want to be aware of this change

if typ in (Serialized, SerializedCallable):
sub_header, sub_frames = obj.header, obj.frames
elif callable(obj):
sub_header, sub_frames = {"callable": dumps_function(obj)}, []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions can be quite large sometimes, for example if users close over large variables out of function scope. Msgpack may not handle this well in some cases

x = np.arange(1000000000)

def f(y):
    return y + x.sum()

Obviously users shouldn't do this, but they will.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.

sub_header, sub_frames = serialize_and_split(
obj, serializers=serializers, on_error=on_error, context=context
)
_inplace_compress_frames(sub_header, sub_frames)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inplace stuff always makes me uncomfortable. Thoughts on making new header/frames dict/lists here instead?

For reference, it was these sorts of inplace operations that previously caused us to run into the msgpack tuple vs list difference. I think that avoiding them when we can is useful, unless there is a large performance boost (which I wouldn't expect here).

Comment on lines +150 to +151
if deserialize == "delay-exception":
return DelayedExceptionRaise(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about when this is necessary and why it wasn't before. I'm wary of creating new systems like this if we can avoid it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand this now that I've seen the c.submit(identity, Foo()) test below

# `__MsgpackList__`, we decode it here explicitly. This way
# we can delay the convertion to a regular `list` until it
# gets to a worker.
if "__MsgpackList__" in obj:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the type of obj here? Is in the right test here, or is this special value in a more specific place?

header, frames = serialize([[[x]]])
assert "dask" in str(header)
assert len(frames) == 1
assert x.data == np.frombuffer(frames[0]).data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, why did we drop this test?

@@ -4628,8 +4624,6 @@ async def test_recreate_error_futures(c, s, a, b):

function, args, kwargs = await c._recreate_error_locally(f)
assert f.status == "error"
assert function.__name__ == "div"
assert args == (1, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, what happened here?

assert results == list(map(inc, range(10)))
assert a.data and b.data
assert results == list(map(inc, range(10)))
assert a.data and b.data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, you mentioned this in meeting a couple of weeks ago. I see now how this is unfortunate.

I would expect this test to now be written as

with pytest.raises(CancelledError):
    await c.submit(identity, Foo())

I wouldn't expect the other lines here to be indented. In general seeing assert statements under a raises context manager is a sign that something is unclean :)

Copy link
Contributor Author

@madsbk madsbk Apr 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it to make it more clear what is going on:

    # Notice, because serialization is delayed until `distributed.batched`
    # we don't get an exception immediately. The exception is raised and logged
    # when the ongoing communication between the client the scheduler encounters
    # the `Foo` class. Before <https://github.com/dask/distributed/pull/4699>
    # the serialization happened immediately in `submit()`, which would raise the
    # `MyException`.
    with captured_logger("distributed") as caplog:
        future = c.submit(identity, Foo())
        # We sleep to make sure that a `BatchedSend.interval` has passed.
        await asyncio.sleep(c.scheduler_comm.interval)
    # Check that the serialization error was logged
    assert "Failed to serialize" in caplog.getvalue()

I'm curious, what do you think of the approach? We cannot easily catch the exception because it happens as part of the ongoing communication and not in the submit() call, but at least we log the exception.

with pytest.raises(TypeError):
await c.run_on_scheduler(lambda: inc)
await c.run(lambda: inc)
await c.run_on_scheduler(lambda: inc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user has specified that they don't want to allow serialization with pickle then these should continue to fail. Probably we need to feed the list of serializers down wherever serialize is beting called. I expect that this might be awkward to do when going through msgpack machinery. Maybe there is some global that we can misuse?

@@ -771,8 +771,7 @@ async def f():
await server.listen("tcp://")

async with rpc(server.address, serializers=["msgpack"]) as r:
with pytest.raises(TypeError):
await r.echo(x=to_serialize(inc))
await r.echo(x=to_serialize(inc))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sorts of changes are probably not ok. They fundamentally change the intent of the test, which is to ensure that things like this can be disallowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been fixed

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.

with cache_dumps_lock:
result = cache_dumps[func]
except KeyError:
result = pickle.dumps(func, protocol=4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this

if typ in (Serialized, SerializedCallable):
sub_header, sub_frames = obj.header, obj.frames
elif callable(obj):
sub_header, sub_frames = {"callable": dumps_function(obj)}, []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.

Comment on lines +24 to +27
cache_dumps = LRU(maxsize=100)
cache_loads = LRU(maxsize=100)
cache_dumps_lock = threading.Lock()
cache_loads_lock = threading.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are discussing getting rid of dumps_function, will these still be needed or will they go away as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants