Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Avoid py4j for python-backend interactions #13797

Merged
merged 19 commits into from
Oct 20, 2023

Conversation

daniel-goldstein
Copy link
Contributor

@daniel-goldstein daniel-goldstein commented Oct 11, 2023

CHANGELOG: Fixes #13756: operations that collect large results such as to_pandas may require up to 3x less memory.

This turns all "actions", i.e. backend methods supported by QoB into HTTP endpoints on the spark and local backends. This intentionally avoids py4j because py4j was really designed to pass function names and references around and does not handle large payloads well (such as results from a collect). Specifically, py4j uses a text-based protocol on top of TCP that substantially inflates the memory requirement for communicating large byte arrays. On the Java side, py4j serializes every binary payload as a Base64-encoded java.lang.String, which between the Base64 encoding and String's use of UTF-16 results in a memory footprint of the String being 4/3 * 2 = 8/3 nearly three times the size of the byte array on either side of the py4j pipe. py4j also appears to do an entire copy of this payload, which means nearly a 6x memory requirement for sending back bytes. Using our own socket means we can directly send back the response bytes to python without any of this overhead, even going so far as to encode results directly into the TCP output stream. Formalizing the API between python and java also allows us to reuse the same payload schema across all three backends.

@danking
Copy link
Contributor

danking commented Oct 16, 2023

I can start looking at this as soon as you give me the OK

@daniel-goldstein
Copy link
Contributor Author

daniel-goldstein commented Oct 17, 2023

@danking Looks like I'm still failing to configure a couple of settings related to references on the ServiceBackend but you can feel free to start looking. You'll notice that I made quite a substantial refactor in ServiceBackend.scala in an attempt to harmonize the scala backends a bit more. The rationale behind the refactor is I was having a hard time working with the various thunks passed around there. I saw them as a bit of poor-man's-object way to capture some state from the input file while keeping the ServiceBackend stateless. IMO there's no harm in keeping the ServiceBackend just as stateful as the other backends since it is single use. So I lifted a lot of that state into backend-creation time and created a harder delineation between which part of the input is for configuring the backend and which part is for the action being performed. This made it easier to reuse a couple of methods like tableType and such.

I'm happy to take suggestions on ways to trim down this PR, but I thought you'd want to take a look at the whole thing given the time-sensitivity.

@daniel-goldstein daniel-goldstein marked this pull request as ready for review October 17, 2023 14:26
@danking
Copy link
Contributor

danking commented Oct 17, 2023

No worries, I'll review this as is!

Copy link
Contributor

@danking danking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts; I'll have to do another closer read but this looks awesome so far. I'm really glad we moved away from my silly binary protocol to a JSON based one.

hail/python/hail/backend/backend.py Show resolved Hide resolved
@@ -268,7 +273,18 @@ def hail_package(self):
def utils_package_object(self):
return self._utils_package_object

def _rpc(self, action, payload) -> Tuple[bytes, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can live on Py4JBackend, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you depend on _backend_server. You could pass the _jbackend into Py4JBackend's __init__ and then you can construct _backend_server in Py4JBackend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I took it several steps further and also lifted a bunch of other duplication into Py4JBackend. Should be limited to just commit aa3ffcc if you want to look at that separately.

from .backend import ActionTag, Backend, fatal_error_from_java_error_triplet

import http.client
http.client._MAXLINE = 2 ** 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment explaining, and here's an SO post. I'm not really sure what to do here. It felt reasonable to send back timings in a response header instead of smushing it into the response body, as it is metadata, but I was left with a decision of how to raise the max header size and I went with something overly generous. AFAIK there is no actual limit in HTTP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say I was worried about proxies but I remember this entire connection is controlled by us. Yeah this seems fine.

result_bytes = await retry_transient_errors(self._read_output, ir, iodir + '/out', iodir + '/in')
return result_bytes, timings
result_bytes = await retry_transient_errors(self._read_output, iodir + '/out', iodir + '/in')
return result_bytes, str(timings.to_dict())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the str(...to_dict())?

Copy link
Contributor Author

@daniel-goldstein daniel-goldstein Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local/spark backends return str for their timings, so I was adhering to that signature. Admittedly, I cannot find anywhere in the codebase where timings is actually used, so maybe I should have instead gone the other direction and instantiated a Timings from the java JSON. I can add a test to that effect if you have an opinion on one or the other, or should we just scrap the timings altogether?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Let's not touch in this PR. I don't really recall how timings works.


async def _async_rpc(self, action: ActionTag, payload: ActionPayload):
if action == ActionTag.EXECUTE:
assert isinstance(payload, ExecutePayload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of pattern makes me wonder if the tag should just hang off of the payload e.g. .tag?

Copy link
Contributor Author

@daniel-goldstein daniel-goldstein Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I kind of agree, except on the server I switch on the tag to determine how I should deserialize the payload, so it didn't make sense to me for that to be part of the payload. Also, the local/spark backends don't use the tag, they use a URL route, so it wouldn't be helpful to also send the tag. Open to alternative suggestions.

Copy link
Contributor Author

@daniel-goldstein daniel-goldstein Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose an alternative that side-steps this whole thing could be: move the IR functions and idempotency token out of the execute payload and make them fields of the ServiceBackend config. Then we don't need to downcast the payload but can still choose only to send IR functions on ActionTag.EXECUTE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I leave your comment up to you. I'm OK with this all as it is and don't have my head deep enough in this to have strong opinions about whether ir functions and tokens are in execute or outside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll leave as is now and we can revisit in a separate PR

hail/src/main/scala/is/hail/backend/Backend.scala Outdated Show resolved Hide resolved
hail/src/main/scala/is/hail/backend/Backend.scala Outdated Show resolved Hide resolved
hail/src/main/scala/is/hail/backend/Backend.scala Outdated Show resolved Hide resolved
return_type: String,
rendered_body: String,
)

class ServiceBackendSocketAPI2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should give this a better name now. It's not a socket api at all, it's just an API I guess?

Copy link
Contributor Author

@daniel-goldstein daniel-goldstein Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to ServiceBackendAPI. I could be convinced to not have this class at all and just put the main method on the object ServiceBackend

Copy link
Contributor

@danking danking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor things

hail/python/hail/backend/local_backend.py Outdated Show resolved Hide resolved
hail/python/hail/ir/ir.py Outdated Show resolved Hide resolved
hail/src/main/scala/is/hail/HailContext.scala Outdated Show resolved Hide resolved
@@ -158,11 +158,11 @@ object Worker {
timer.start("executeFunction")

if (HailContext.isInitialized) {
HailContext.get.backend = new ServiceBackend(null, null, new HailClassLoader(getClass().getClassLoader()), null, None)
HailContext.get.backend = new ServiceBackend(null, null, new HailClassLoader(getClass().getClassLoader()), null, None, null, null, null, null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is feeling increasingly wrong. I think after this PR lands I'll PR something that creates a QoBWorkerBackend which is just the class loader and raises UnsupportedOperationException for everything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be great

@@ -146,13 +146,10 @@ def validate_file(self, uri: str) -> None:
validate_file(uri, self._router_async_fs)

def stop(self):
super().stop()
self._jbackend.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopping the hail context calls this, so you can just remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's idempotent so you don't need to. I'll approve but let's nix it.

@danking danking merged commit c73386f into hail-is:main Oct 20, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants