Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Avoid py4j for python-backend interactions #13797

Merged
merged 19 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions hail/python/hail/backend/local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def __init__(self, tmpdir, log, quiet, append, branching_factor,
jhc = hail_package.HailContext.apply(
jbackend,
branching_factor,
optimizer_iterations,
True
optimizer_iterations
)

super(LocalBackend, self).__init__(self._gateway.jvm, jbackend, jhc)
Expand Down Expand Up @@ -102,11 +101,8 @@ def register_ir_function(self,
jbody)

def stop(self):
self._backend_server.stop()
self._jhc.stop()
self._jhc = None
super().stop()
self._gateway.shutdown()
self._registered_ir_function_names = set()
uninstall_exception_handler()

@property
Expand Down
7 changes: 7 additions & 0 deletions hail/python/hail/backend/py4j_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,10 @@ def _to_java_value_ir(self, ir):

def _to_java_blockmatrix_ir(self, ir):
return self._to_java_ir(ir, self._parse_blockmatrix_ir)

def stop(self):
self._backend_server.stop()
self._jhc.stop()
self._jhc = None
self._registered_ir_function_names = set()
uninstall_exception_handler()
9 changes: 3 additions & 6 deletions hail/python/hail/backend/spark_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.aiotools.validators import validate_file

from .py4j_backend import Py4JBackend, uninstall_exception_handler
from .py4j_backend import Py4JBackend
from .backend import local_jar_information


Expand Down Expand Up @@ -114,7 +114,7 @@ def __init__(self, idempotent, sc, spark_conf, app_name, master,
jsc, app_name, master, local, log, True, append, skip_logging_configuration, min_block_size, tmpdir, local_tmpdir,
gcs_requester_pays_project, gcs_requester_pays_buckets)
jhc = hail_package.HailContext.apply(
jbackend, branching_factor, optimizer_iterations, True)
jbackend, branching_factor, optimizer_iterations)

self._jsc = jbackend.sc()
if sc:
Expand Down Expand Up @@ -146,13 +146,10 @@ def validate_file(self, uri: str) -> None:
validate_file(uri, self._router_async_fs)

def stop(self):
super().stop()
self._jbackend.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopping the hail context calls this, so you can just remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's idempotent so you don't need to. I'll approve but let's nix it.

self._jhc.stop()
self._jhc = None
self.sc.stop()
self.sc = None
self._registered_ir_function_names = set()
uninstall_exception_handler()

@property
def fs(self):
Expand Down
6 changes: 3 additions & 3 deletions hail/python/hail/ir/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -3718,10 +3718,10 @@ def __del__(self):


class JavaIR(IR):
def __init__(self, hail_type, ir_id, ref: Optional[JavaIRSharedReference] = None):
def __init__(self, hail_type: HailType, ir_id: int, ref: Optional[JavaIRSharedReference] = None):
super(JavaIR, self).__init__()
self._type: HailType = hail_type
self._id: int = ir_id
self._type = hail_type
self._id = ir_id
self._ref = ref or JavaIRSharedReference(ir_id)

def copy(self):
Expand Down
8 changes: 1 addition & 7 deletions hail/src/main/scala/is/hail/HailContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ object HailContext {

def apply(backend: Backend,
branchingFactor: Int = 50,
optimizerIterations: Int = 3,
addDefaultReferences: Boolean = true): HailContext = synchronized {
optimizerIterations: Int = 3): HailContext = synchronized {
require(theContext == null)
checkJavaVersion()

Expand All @@ -124,11 +123,6 @@ object HailContext {

info(s"Running Hail version ${ theContext.version }")

// needs to be after `theContext` is set, since this creates broadcasts
if (addDefaultReferences) {
backend.addDefaultReferences()
}

theContext
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object LocalBackend {
gcsRequesterPaysProject,
gcsRequesterPaysBuckets
)
theLocalBackend.addDefaultReferences()
theLocalBackend
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ class ServiceBackend(
elementType.virtualType,
BufferSpec.parseOrDefault(bufferSpecString)
)
assert(pt.isFieldDefined(off, 0))
codec.encode(ctx, elementType, pt.loadField(off, 0))
daniel-goldstein marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down Expand Up @@ -449,7 +450,7 @@ object ServiceBackendAPI {
HailContext.get.backend = backend
log.info("Default references added to already initialized HailContexet.")
} else {
HailContext(backend, 50, 3, false)
HailContext(backend, 50, 3)
log.info("HailContexet initialized.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ object SparkBackend {
sc1.uiWebUrl.foreach(ui => info(s"SparkUI: $ui"))

theSparkBackend = new SparkBackend(tmpdir, localTmpdir, sc1, gcsRequesterPaysProject, gcsRequesterPaysBuckets)
theSparkBackend.addDefaultReferences()
theSparkBackend
}

Expand Down