Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Core][Compiled Graph] Add API to Specify DAG Concurrence Group #48600

Closed
wants to merge 0 commits into from

Conversation

xslingcn
Copy link

@xslingcn xslingcn commented Nov 6, 2024

Why are these changes needed?

As mentioned in #46336, the current implementation runs all aDAGs in a background concurrency group _ray_system. Meanwhile, actors run in their own default concurrency group. This discrepancy complicates the DAG's ability to access thread-local states within actors that were initialized before the DAG's execution. For example, consider the following code:

import ray
import threading
from ray.dag import InputNode

@ray.remote
class MyActor:
    def __init__(self):
        # data local to actor default executor thread
        self.local_data = threading.local()
        self.local_data.seed = 42

    def compute(self, value):
        return value + self.local_data.seed

actor = MyActor.remote()

with InputNode() as inp:
    dag = actor.compute.bind(inp)

# DAG running on executor in _ray_syetem group, no access to actor.local_data
compiled_dag = dag.experimental_compile()
print(ray.get(compiled_dag.execute(10)))

which will raise an error:

Traceback (most recent call last):
...
ray.exceptions.RayTaskError(AttributeError): ray::MyActor.__ray_call__() (pid=3136275, ip=172.0.0.1)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Code/ray/run_thread_local_dag.py", line 15, in compute
    return value + self.local_data.seed
                   ^^^^^^^^^^^^^^^^^^^^
AttributeError: '_thread._local' object has no attribute 'seed'

In this PR, we add a new API for experimental_compile that allows user to optionally specify the DAG's concurrency group.

  • If concurrence_group is None, we will acquire the actor default executor for execution. This ensures that the DAG runs on the same thread as the actor. The above example should now give the expected result by default.
  • if a concurrence_group: str is specified, we will try to find the corresponding executor (ref) from the concurrency groups created when creating the actor. If it's not found, the ray::core::ConcurrencyGroupManager will raise a Check failed error. An example would be:
import ray
from ray.dag import InputNode

@ray.remote(concurrency_groups={"compute": 1})
class MyActor:
    def __init__(self):
        self.data = 42

    def compute(self, value):
        return value + self.data

actor = MyActor.remote()

with InputNode() as inp:
    dag = actor.compute.bind(inp)

compiled_dag = dag.experimental_compile(concurrency_group="compute") 
print(ray.get(compiled_dag.execute(10))) # OK

compiled_dag_bad = dag.experimental_compile(concurrency_group="nonexistent")
"""
...
(SimpleActor pid=3135979) [2024-11-06 11:48:04,241 C 3135979 3135979] concurrency_group_manager.cc:62:  
Check failed: it != name_to_executor_index_.end() 
Failed to look up the executor of the given concurrency group double_threaded . 
It might be that you didn't define the concurrency group double_threaded
...
"""

Related issue number

Problem described in #46336.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(
  • Improve the error message for concurrence group not found, they are currently a little hard to parse

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant