-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Avoid py4j for python-backend interactions #13797
Conversation
baf8613
to
b911fce
Compare
edd8f8a
to
dfeca5a
Compare
I can start looking at this as soon as you give me the OK |
@danking Looks like I'm still failing to configure a couple of settings related to references on the I'm happy to take suggestions on ways to trim down this PR, but I thought you'd want to take a look at the whole thing given the time-sensitivity. |
No worries, I'll review this as is! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts; I'll have to do another closer read but this looks awesome so far. I'm really glad we moved away from my silly binary protocol to a JSON based one.
@@ -268,7 +273,18 @@ def hail_package(self): | |||
def utils_package_object(self): | |||
return self._utils_package_object | |||
|
|||
def _rpc(self, action, payload) -> Tuple[bytes, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can live on Py4JBackend
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you depend on _backend_server. You could pass the _jbackend
into Py4JBackend
's __init__
and then you can construct _backend_server
in Py4JBackend
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I took it several steps further and also lifted a bunch of other duplication into Py4JBackend
. Should be limited to just commit aa3ffcc
if you want to look at that separately.
from .backend import ActionTag, Backend, fatal_error_from_java_error_triplet | ||
|
||
import http.client | ||
http.client._MAXLINE = 2 ** 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment explaining, and here's an SO post. I'm not really sure what to do here. It felt reasonable to send back timings in a response header instead of smushing it into the response body, as it is metadata, but I was left with a decision of how to raise the max header size and I went with something overly generous. AFAIK there is no actual limit in HTTP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to say I was worried about proxies but I remember this entire connection is controlled by us. Yeah this seems fine.
result_bytes = await retry_transient_errors(self._read_output, ir, iodir + '/out', iodir + '/in') | ||
return result_bytes, timings | ||
result_bytes = await retry_transient_errors(self._read_output, iodir + '/out', iodir + '/in') | ||
return result_bytes, str(timings.to_dict()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the str(...to_dict())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The local/spark backends return str
for their timings
, so I was adhering to that signature. Admittedly, I cannot find anywhere in the codebase where timings
is actually used, so maybe I should have instead gone the other direction and instantiated a Timings
from the java JSON. I can add a test to that effect if you have an opinion on one or the other, or should we just scrap the timings altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Let's not touch in this PR. I don't really recall how timings works.
|
||
async def _async_rpc(self, action: ActionTag, payload: ActionPayload): | ||
if action == ActionTag.EXECUTE: | ||
assert isinstance(payload, ExecutePayload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of pattern makes me wonder if the tag should just hang off of the payload e.g. .tag
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I kind of agree, except on the server I switch on the tag to determine how I should deserialize the payload, so it didn't make sense to me for that to be part of the payload. Also, the local/spark backends don't use the tag, they use a URL route, so it wouldn't be helpful to also send the tag. Open to alternative suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose an alternative that side-steps this whole thing could be: move the IR functions and idempotency token out of the execute payload and make them fields of the ServiceBackend config. Then we don't need to downcast the payload but can still choose only to send IR functions on ActionTag.EXECUTE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I leave your comment up to you. I'm OK with this all as it is and don't have my head deep enough in this to have strong opinions about whether ir functions and tokens are in execute or outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll leave as is now and we can revisit in a separate PR
return_type: String, | ||
rendered_body: String, | ||
) | ||
|
||
class ServiceBackendSocketAPI2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should give this a better name now. It's not a socket api at all, it's just an API I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to ServiceBackendAPI
. I could be convinced to not have this class at all and just put the main
method on the object ServiceBackend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor things
@@ -158,11 +158,11 @@ object Worker { | |||
timer.start("executeFunction") | |||
|
|||
if (HailContext.isInitialized) { | |||
HailContext.get.backend = new ServiceBackend(null, null, new HailClassLoader(getClass().getClassLoader()), null, None) | |||
HailContext.get.backend = new ServiceBackend(null, null, new HailClassLoader(getClass().getClassLoader()), null, None, null, null, null, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is feeling increasingly wrong. I think after this PR lands I'll PR something that creates a QoBWorkerBackend
which is just the class loader and raises UnsupportedOperationException for everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be great
@@ -146,13 +146,10 @@ def validate_file(self, uri: str) -> None: | |||
validate_file(uri, self._router_async_fs) | |||
|
|||
def stop(self): | |||
super().stop() | |||
self._jbackend.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopping the hail context calls this, so you can just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's idempotent so you don't need to. I'll approve but let's nix it.
CHANGELOG: Fixes #13756: operations that collect large results such as
to_pandas
may require up to 3x less memory.This turns all "actions", i.e. backend methods supported by QoB into HTTP endpoints on the spark and local backends. This intentionally avoids py4j because py4j was really designed to pass function names and references around and does not handle large payloads well (such as results from a
collect
). Specifically, py4j uses a text-based protocol on top of TCP that substantially inflates the memory requirement for communicating large byte arrays. On the Java side, py4j serializes every binary payload as a Base64-encodedjava.lang.String
, which between the Base64 encoding andString
's use of UTF-16 results in a memory footprint of theString
being4/3 * 2 = 8/3
nearly three times the size of the byte array on either side of the py4j pipe. py4j also appears to do an entire copy of this payload, which means nearly a 6x memory requirement for sending back bytes. Using our own socket means we can directly send back the response bytes to python without any of this overhead, even going so far as to encode results directly into the TCP output stream. Formalizing the API between python and java also allows us to reuse the same payload schema across all three backends.