Skip to content

Commit

Permalink
[Core][deprecate run_on_all_workers 1/n] set worker's sys.path throug…
Browse files Browse the repository at this point in the history
…h JobConfig._py_driver_sys_path (ray-project#31383)

Why are these changes needed?
Today we use run_on_all_workers to set worker's system path, where the run_on_all_workers suffers from weak ordering guarantees and will be deprecated.

Instead, we should use JobConfig._py_driver_sys_path to set worker's system path, where the worker will add these paths into its sys.path on startup.

Note we have JobConfig.code_search_path which servers similar functionality, however it uses a different load_code_from_local code path and behaves differently and introduced bugs that failed tests (ray-project#17605 (comment)).
  • Loading branch information
scv119 authored Jan 6, 2023
1 parent 1012fbc commit 8228aeb
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 25 deletions.
46 changes: 23 additions & 23 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,29 @@ def connect(
runtime_env.pop("excludes", None)
job_config.set_runtime_env(runtime_env)

if mode == SCRIPT_MODE:
# Add the directory containing the script that is running to the Python
# paths of the workers. Also add the current directory. Note that this
# assumes that the directory structures on the machines in the clusters
# are the same.
# When using an interactive shell, there is no script directory.
code_paths = []
if not interactive_mode:
script_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
# If driver's sys.path doesn't include the script directory
# (e.g driver is started via `python -m`,
# see https://peps.python.org/pep-0338/),
# then we shouldn't add it to the workers.
if script_directory in sys.path:
code_paths.append(script_directory)
# In client mode, if we use runtime envs with "working_dir", then
# it'll be handled automatically. Otherwise, add the current dir.
if not job_config.client_job and not job_config.runtime_env_has_working_dir():
current_directory = os.path.abspath(os.path.curdir)
code_paths.append(current_directory)
if len(code_paths) != 0:
job_config.py_driver_sys_path.extend(code_paths)

serialized_job_config = job_config.serialize()
if not node.should_redirect_logs():
# Logging to stderr, so give core worker empty logs directory.
Expand Down Expand Up @@ -2097,29 +2120,6 @@ def connect(
worker.logger_thread.start()

if mode == SCRIPT_MODE:
# Add the directory containing the script that is running to the Python
# paths of the workers. Also add the current directory. Note that this
# assumes that the directory structures on the machines in the clusters
# are the same.
# When using an interactive shell, there is no script directory.
code_paths = []
if not interactive_mode:
script_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
# If driver's sys.path doesn't include the script directory
# (e.g driver is started via `python -m`,
# see https://peps.python.org/pep-0338/),
# then we shouldn't add it to the workers.
if script_directory in sys.path:
code_paths.append(script_directory)
# In client mode, if we use runtime envs with "working_dir", then
# it'll be handled automatically. Otherwise, add the current dir.
if not job_config.client_job and not job_config.runtime_env_has_working_dir():
current_directory = os.path.abspath(os.path.curdir)
code_paths.append(current_directory)
if len(code_paths) != 0:
worker.run_function_on_all_workers(
lambda worker_info: [sys.path.insert(1, path) for path in code_paths]
)
# TODO(rkn): Here we first export functions to run, then remote
# functions. The order matters. For example, one of the functions to
# run may set the Python path, which is needed to import a module used
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
sys.path.insert(0, p)
ray._private.worker.global_worker.set_load_code_from_local(load_code_from_local)

# Add driver's system path to sys.path
py_driver_sys_path = core_worker.get_job_config().py_driver_sys_path
if py_driver_sys_path:
for p in py_driver_sys_path:
sys.path.insert(0, p)

# Setup log file.
out_file, err_file = node.get_log_file_handles(
get_worker_log_file_name(args.worker_type)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class JobConfig:
``runtime_env.py`` for detailed documentation).
client_job: A boolean represent the source of the job.
default_actor_lifetime: The default value of actor lifetime.
py_driver_sys_path: A list of directories that
specify the search path for python workers.
"""

def __init__(
Expand All @@ -29,6 +31,7 @@ def __init__(
metadata: Optional[dict] = None,
ray_namespace: Optional[str] = None,
default_actor_lifetime: str = "non_detached",
py_driver_sys_path: List[str] = None,
):
self.jvm_options = jvm_options or []
self.code_search_path = code_search_path or []
Expand All @@ -42,6 +45,7 @@ def __init__(
self.ray_namespace = ray_namespace
self.set_runtime_env(runtime_env)
self.set_default_actor_lifetime(default_actor_lifetime)
self.py_driver_sys_path = py_driver_sys_path or []

def set_metadata(self, key: str, value: str) -> None:
self.metadata[key] = value
Expand Down Expand Up @@ -108,6 +112,7 @@ def get_proto_job_config(self):
pb.ray_namespace = self.ray_namespace
pb.jvm_options.extend(self.jvm_options)
pb.code_search_path.extend(self.code_search_path)
pb.py_driver_sys_path.extend(self.py_driver_sys_path)
for k, v in self.metadata.items():
pb.metadata[k] = v

Expand Down Expand Up @@ -149,4 +154,5 @@ def from_json(cls, job_config_json):
client_job=job_config_json.get("client_job", False),
metadata=job_config_json.get("metadata", None),
ray_namespace=job_config_json.get("ray_namespace", None),
py_driver_sys_path=job_config_json.get("py_driver_sys_path", None),
)
3 changes: 1 addition & 2 deletions python/ray/tests/test_basic_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def get_kv_metrics():
# So far we have the following gets
"""
b'fun' b'IsolatedExports:01000000:\x00\x00\x00\x00\x00\x00\x00\x01'
b'fun' b'FunctionsToRun:01000000:\x12\x9b\xea\xa39\x01...'
b'fun' b'IsolatedExports:01000000:\x00\x00\x00\x00\x00\x00\x00\x02'
b'cluster' b'CLUSTER_METADATA'
b'fun' b'IsolatedExports:01000000:\x00\x00\x00\x00\x00\x00\x00\x01'
Expand All @@ -247,7 +246,7 @@ def get_kv_metrics():
???? # unknown
"""
# !!!If you want to increase this number, please let ray-core knows this!!!
assert freqs["internal_kv_get"] == 8
assert freqs["internal_kv_get"] == 5


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ message JobConfig {
// If the lifetime of an actor is not specified explicitly at runtime, this
// default value will be applied.
ActorLifetime default_actor_lifetime = 7;
// System paths of the driver scripts. Python workers need to search
// these paths to load modules.
repeated string py_driver_sys_path = 8;
}

message JobTableData {
Expand Down

0 comments on commit 8228aeb

Please sign in to comment.