From d5c620a20db2f1587be37797cb5be41c99e55034 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 10 May 2022 12:56:27 -0600 Subject: [PATCH 01/25] Add tests --- prefect_ray/task_runners.py | 14 +++---- setup.cfg | 4 ++ tests/test_task_runners.py | 82 ++++++++++++++++++++++++++++++++----- 3 files changed, 81 insertions(+), 19 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 226e220..db708b0 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -10,29 +10,24 @@ from prefect.states import exception_to_crashed_state from prefect.task_runners import BaseTaskRunner, R from prefect.utilities.asyncio import A, sync_compatible +from prefect.task_runners import TaskConcurrencyType class RayTaskRunner(BaseTaskRunner): """ A parallel task_runner that submits tasks to `ray`. - By default, a temporary Ray cluster is created for the duration of the flow run. - Alternatively, if you already have a `ray` instance running, you can provide the connection URL via the `address` kwarg. - Args: address (string, optional): Address of a currently running `ray` instance; if one is not provided, a temporary instance will be created. init_kwargs (dict, optional): Additional kwargs to use when calling `ray.init`. - Examples: - Using a temporary local ray cluster: >>> from prefect import flow >>> from prefect.task_runners import RayTaskRunner >>> @flow(task_runner=RayTaskRunner) - Connecting to an existing ray instance: >>> RayTaskRunner(address="ray://192.0.2.255:8786") """ @@ -54,6 +49,10 @@ def __init__( super().__init__() + @property + def concurrency_type(self) -> TaskConcurrencyType: + return TaskConcurrencyType.PARALLEL + async def submit( self, task_run: TaskRun, @@ -115,7 +114,6 @@ def _ray(self) -> "ray": async def _start(self, exit_stack: AsyncExitStack): """ Start the task runner and prep for context exit. - - Creates a cluster if an external address is not set. - Creates a client to connect to the cluster. - Pushes a call to wait for all running futures to complete on exit. @@ -165,4 +163,4 @@ def _get_ray_ref(self, prefect_future: PrefectFuture) -> "ray.ObjectRef": """ Retrieve the ray object reference corresponding to a prefect future. """ - return self._ray_refs[prefect_future.run_id] + return self._ray_refs[prefect_future.run_id] \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 48aadf6..6d7a80a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,3 +35,7 @@ fail_under = 80 [tool:pytest] asyncio_mode = auto + +markers = + service(arg): a service integration test. For example 'docker' + enable_orion_handler: by default, sending logs to the API is disabled. Tests marked with this use the handler. diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 189f07d..8050f6c 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -1,13 +1,26 @@ +import uuid import subprocess +import asyncio +import warnings import prefect import pytest import ray import ray.cluster_utils - +import tests +from prefect.testing.fixtures import use_hosted_orion, hosted_orion_api from prefect_ray import RayTaskRunner -from prefect.utilities.testing import TaskRunnerTests +from prefect.orion.schemas.core import TaskRun +from prefect.task_runners import TaskConcurrencyType +from prefect.testing.standard_test_suites import TaskRunnerStandardTestSuite + + +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() @pytest.fixture(scope="module") @@ -25,6 +38,18 @@ def machine_ray_instance(): subprocess.run(["ray", "stop"]) +@pytest.fixture +@pytest.mark.service("ray") +def default_ray_task_runner(): + with warnings.catch_warnings(): + # Ray does not properly close resources and we do not want their warnings to + # bubble into our test suite + # https://github.com/ray-project/ray/pull/22419 + warnings.simplefilter("ignore", ResourceWarning) + + yield RayTaskRunner() + + @pytest.fixture def ray_task_runner_with_existing_cluster( machine_ray_instance, use_hosted_orion, hosted_orion_api @@ -39,9 +64,9 @@ def ray_task_runner_with_existing_cluster( address=machine_ray_instance, init_kwargs={ "runtime_env": { - # Ship the 'prefect' module to the workers or they will not be able to + # Ship the 'tests' module to the workers or they will not be able to # deserialize test tasks / flows - "py_modules": [prefect] + "py_modules": [tests] } }, ) @@ -74,9 +99,9 @@ def ray_task_runner_with_inprocess_cluster( address=inprocess_ray_cluster.address, init_kwargs={ "runtime_env": { - # Ship the 'prefect' module to the workers or they will not be able to + # Ship the 'tests' module to the workers or they will not be able to # deserialize test tasks / flows - "py_modules": [prefect] + "py_modules": [tests] } }, ) @@ -93,23 +118,58 @@ def ray_task_runner_with_temporary_cluster(use_hosted_orion, hosted_orion_api): yield RayTaskRunner( init_kwargs={ "runtime_env": { - # Ship the 'prefect' module to the workers or they will not be able to + # Ship the 'tests' module to the workers or they will not be able to # deserialize test tasks / flows - "py_modules": [prefect] + "py_modules": [tests] } }, ) -class TestRayTaskRunner(TaskRunnerTests): +@pytest.mark.service("ray") +class TestRayTaskRunner(TaskRunnerStandardTestSuite): @pytest.fixture( params=[ - ray_task_runner_with_temporary_cluster, - ray_task_runner_with_inprocess_cluster, + default_ray_task_runner, ray_task_runner_with_existing_cluster, + ray_task_runner_with_inprocess_cluster, ] ) def task_runner(self, request): yield request.getfixturevalue( request.param._pytestfixturefunction.name or request.param.__name__ ) + + # Ray wraps the exception, interrupts will result in "Cancelled" tasks + # or "Killed" workers while normal errors will result in a "RayTaskError". + # We care more about the crash detection and + # lack of re-raise here than the equality of the exception. + @pytest.mark.parametrize("exception", [KeyboardInterrupt(), ValueError("test")]) + async def test_wait_captures_exceptions_as_crashed_state( + self, task_runner, exception + ): + """ + Ray wraps the exception, interrupts will result in "Cancelled" tasks + or "Killed" workers while normal errors will result in a "RayTaskError". + We care more about the crash detection and + lack of re-raise here than the equality of the exception. + """ + if task_runner.concurrency_type != TaskConcurrencyType.PARALLEL: + pytest.skip( + f"This will raise for {task_runner.concurrency_type} task runners." + ) + + task_run = TaskRun(flow_run_id=uuid4(), task_key="foo", dynamic_key="bar") + + async def fake_orchestrate_task_run(): + raise exception + + async with task_runner.start(): + future = await task_runner.submit( + task_run=task_run, run_fn=fake_orchestrate_task_run, run_kwargs={} + ) + + state = await task_runner.wait(future, 5) + assert state is not None, "wait timed out" + assert isinstance(state, State), "wait should return a state" + assert state.name == "Crashed" From e9d6f13e5daf3767c71d362cb62eb01a878d54c1 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 10 May 2022 13:48:24 -0600 Subject: [PATCH 02/25] Add new changes from fix-ray branch --- prefect_ray/task_runners.py | 26 ++++++++++++++------------ requirements.txt | 4 +++- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index db708b0..01b92d2 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -114,6 +114,7 @@ def _ray(self) -> "ray": async def _start(self, exit_stack: AsyncExitStack): """ Start the task runner and prep for context exit. + - Creates a cluster if an external address is not set. - Creates a client to connect to the cluster. - Pushes a call to wait for all running futures to complete on exit. @@ -127,22 +128,23 @@ async def _start(self, exit_stack: AsyncExitStack): self.logger.info("Creating a local Ray instance") init_args = () - # When connecting to an out-of-process cluster (e.g. ray://ip) this returns a - # `ClientContext` otherwise it returns a `dict`. + # In ray < 1.11.0, connecting to an out-of-process cluster (e.g. ray://ip) + # returns a `ClientContext` otherwise it returns a `dict`. + # In ray >= 1.11.0, a context is always returned. context_or_metadata = self._ray.init(*init_args, **self.init_kwargs) - if isinstance(context_or_metadata, dict): - metadata = context_or_metadata - context = None - else: - metadata = None # TODO: Some of this may be retrievable from the client ctx + if hasattr(context_or_metadata, "__enter__"): context = context_or_metadata + metadata = getattr(context, "address_info", {}) + dashboard_url = getattr(context, "dashboard_url", None) + else: + context = None + metadata = context_or_metadata + dashboard_url = metadata.get("webui_url") - # Shutdown differs depending on the connection type if context: - # Just disconnect the client exit_stack.push(context) else: - # Shutdown ray + # If not given a context, call shutdown manually at exit exit_stack.push_async_callback(self._shutdown_ray) # Display some information about the cluster @@ -150,9 +152,9 @@ async def _start(self, exit_stack: AsyncExitStack): living_nodes = [node for node in nodes if node.get("alive")] self.logger.info(f"Using Ray cluster with {len(living_nodes)} nodes.") - if metadata and metadata.get("webui_url"): + if dashboard_url: self.logger.info( - f"The Ray UI is available at {metadata['webui_url']}", + f"The Ray UI is available at {dashboard_url}", ) async def _shutdown_ray(self): diff --git a/requirements.txt b/requirements.txt index a89ae23..107ddf5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ prefect>=2.0a13 -ray[default] >= 1.9 \ No newline at end of file +ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" +ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" + From 9541c7b94ea3934ac7ce5fbb2d86b81dc7be8156 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 10 May 2022 15:42:07 -0600 Subject: [PATCH 03/25] Add init --- tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From cd8663aa6b0566f1e3e9a11afaa4a4903a3287b5 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 11 May 2022 15:00:59 -0600 Subject: [PATCH 04/25] Fix tests --- prefect_ray/task_runners.py | 62 ++++++++++++++++- tests/conftest.py | 124 +++++++++++++++++++++++++++++++++ tests/test_task_runners.py | 135 ++++-------------------------------- 3 files changed, 198 insertions(+), 123 deletions(-) create mode 100644 tests/conftest.py diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 01b92d2..8914de2 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -1,3 +1,45 @@ +""" +Interface and implementations of the Ray Task Runner. +[Task Runners](/concepts/task-runners/) in Prefect are +responsible for managing the execution of Prefect task runs. +Generally speaking, users are not expected to interact with +task runners outside of configuring and initializing them for a flow. + +Example: + >>> from prefect import flow, task + >>> from prefect.task_runners import SequentialTaskRunner + >>> from typing import List + >>> + >>> @task + >>> def say_hello(name): + ... print(f"hello {name}") + >>> + >>> @task + >>> def say_goodbye(name): + ... print(f"goodbye {name}") + >>> + >>> @flow(task_runner=SequentialTaskRunner()) + >>> def greetings(names: List[str]): + ... for name in names: + ... say_hello(name) + ... say_goodbye(name) + + Switching to a `RayTaskRunner`: + >>> from prefect.task_runners import RayTaskRunner + >>> flow.task_runner = DaskTaskRunner() + >>> greetings(["arthur", "trillian", "ford", "marvin"]) + hello arthur + goodbye arthur + hello trillian + hello ford + goodbye marvin + hello marvin + goodbye ford + goodbye trillian + +For usage details, see the [Task Runners](/concepts/task-runners/) documentation. +""" + from contextlib import AsyncExitStack from typing import Any, Awaitable, Callable, Dict, Optional from uuid import UUID @@ -8,9 +50,8 @@ from prefect.orion.schemas.core import TaskRun from prefect.orion.schemas.states import State from prefect.states import exception_to_crashed_state -from prefect.task_runners import BaseTaskRunner, R +from prefect.task_runners import BaseTaskRunner, R, TaskConcurrencyType from prefect.utilities.asyncio import A, sync_compatible -from prefect.task_runners import TaskConcurrencyType class RayTaskRunner(BaseTaskRunner): @@ -37,6 +78,9 @@ def __init__( address: str = None, init_kwargs: dict = None, ): + """ + Initialize keywords. + """ # Store settings self.address = address self.init_kwargs = init_kwargs.copy() if init_kwargs else {} @@ -51,6 +95,9 @@ def __init__( @property def concurrency_type(self) -> TaskConcurrencyType: + """ + Set the concurrency type; parallel for Ray. + """ return TaskConcurrencyType.PARALLEL async def submit( @@ -60,6 +107,9 @@ async def submit( run_kwargs: Dict[str, Any], asynchronous: A = True, ) -> PrefectFuture[R, A]: + """ + Submit task. + """ if not self._started: raise RuntimeError( "The task runner must be started before submitting work." @@ -79,6 +129,9 @@ async def wait( prefect_future: PrefectFuture, timeout: float = None, ) -> Optional[State]: + """ + Wait for task to complete. + """ ref = self._get_ray_ref(prefect_future) result = None @@ -158,6 +211,9 @@ async def _start(self, exit_stack: AsyncExitStack): ) async def _shutdown_ray(self): + """ + Shuts down the cluster. + """ self.logger.debug("Shutting down Ray cluster...") self._ray.shutdown() @@ -165,4 +221,4 @@ def _get_ray_ref(self, prefect_future: PrefectFuture) -> "ray.ObjectRef": """ Retrieve the ray object reference corresponding to a prefect future. """ - return self._ray_refs[prefect_future.run_id] \ No newline at end of file + return self._ray_refs[prefect_future.run_id] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e1e43aa --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,124 @@ +import asyncio +import subprocess +import warnings + +import prefect +import pytest +import ray +import ray.cluster_utils +from prefect.testing.fixtures import hosted_orion_api, use_hosted_orion # noqa: F401 + +import tests +from prefect_ray import RayTaskRunner + + +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="module") +def machine_ray_instance(): + """ + Starts a ray instance for the current machine + """ + subprocess.check_call( + ["ray", "start", "--head", "--include-dashboard", "False"], + cwd=str(prefect.__root_path__), + ) + try: + yield "ray://127.0.0.1:10001" + finally: + subprocess.run(["ray", "stop"]) + + +@pytest.fixture +@pytest.mark.service("ray") +def default_ray_task_runner(): + with warnings.catch_warnings(): + # Ray does not properly close resources and we do not want their warnings to + # bubble into our test suite + # https://github.com/ray-project/ray/pull/22419 + warnings.simplefilter("ignore", ResourceWarning) + + yield RayTaskRunner() + + +@pytest.fixture +def ray_task_runner_with_existing_cluster( + machine_ray_instance, use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that's connected to a ray instance running in a separate + process. + + This tests connection via `ray://` which is a client-based connection. + """ + yield RayTaskRunner( + address=machine_ray_instance, + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [prefect, tests] + } + }, + ) + + +@pytest.fixture(scope="module") +def inprocess_ray_cluster(): + """ + Starts a ray cluster in-process + """ + cluster = ray.cluster_utils.Cluster(initialize_head=True) + try: + cluster.add_node() # We need to add a second node for parallelism + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture +def ray_task_runner_with_inprocess_cluster( + inprocess_ray_cluster, use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that's connected to an in-process cluster. + + This tests connection via 'localhost' which is not a client-based connection. + """ + + yield RayTaskRunner( + address=inprocess_ray_cluster.address, + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [prefect, tests] + } + }, + ) + + +@pytest.fixture +def ray_task_runner_with_temporary_cluster( + use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that creates a temporary cluster. + + This tests connection via 'localhost' which is not a client-based connection. + """ + + yield RayTaskRunner( + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [prefect, tests] + } + }, + ) diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 8050f6c..8447692 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -1,129 +1,17 @@ -import uuid -import subprocess -import asyncio +from uuid import uuid4 -import warnings -import prefect import pytest -import ray -import ray.cluster_utils - -import tests -from prefect.testing.fixtures import use_hosted_orion, hosted_orion_api -from prefect_ray import RayTaskRunner from prefect.orion.schemas.core import TaskRun +from prefect.states import State from prefect.task_runners import TaskConcurrencyType from prefect.testing.standard_test_suites import TaskRunnerStandardTestSuite - -@pytest.fixture(scope="session") -def event_loop(): - loop = asyncio.get_event_loop() - yield loop - loop.close() - - -@pytest.fixture(scope="module") -def machine_ray_instance(): - """ - Starts a ray instance for the current machine - """ - subprocess.check_call( - ["ray", "start", "--head", "--include-dashboard", "False"], - cwd=str(prefect.__root_path__), - ) - try: - yield "ray://127.0.0.1:10001" - finally: - subprocess.run(["ray", "stop"]) - - -@pytest.fixture -@pytest.mark.service("ray") -def default_ray_task_runner(): - with warnings.catch_warnings(): - # Ray does not properly close resources and we do not want their warnings to - # bubble into our test suite - # https://github.com/ray-project/ray/pull/22419 - warnings.simplefilter("ignore", ResourceWarning) - - yield RayTaskRunner() - - -@pytest.fixture -def ray_task_runner_with_existing_cluster( - machine_ray_instance, use_hosted_orion, hosted_orion_api -): - """ - Generate a ray task runner that's connected to a ray instance running in a separate - process. - - This tests connection via `ray://` which is a client-based connection. - """ - yield RayTaskRunner( - address=machine_ray_instance, - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [tests] - } - }, - ) - - -@pytest.fixture(scope="module") -def inprocess_ray_cluster(): - """ - Starts a ray cluster in-process - """ - cluster = ray.cluster_utils.Cluster(initialize_head=True) - try: - cluster.add_node() # We need to add a second node for parallelism - yield cluster - finally: - cluster.shutdown() - - -@pytest.fixture -def ray_task_runner_with_inprocess_cluster( - inprocess_ray_cluster, use_hosted_orion, hosted_orion_api -): - """ - Generate a ray task runner that's connected to an in-process cluster. - - This tests connection via 'localhost' which is not a client-based connection. - """ - - yield RayTaskRunner( - address=inprocess_ray_cluster.address, - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [tests] - } - }, - ) - - -@pytest.fixture -def ray_task_runner_with_temporary_cluster(use_hosted_orion, hosted_orion_api): - """ - Generate a ray task runner that creates a temporary cluster. - - This tests connection via 'localhost' which is not a client-based connection. - """ - - yield RayTaskRunner( - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [tests] - } - }, - ) +# unable to get this to import automatically within pytest fixture +from .conftest import ( + default_ray_task_runner, + ray_task_runner_with_existing_cluster, + ray_task_runner_with_inprocess_cluster, +) @pytest.mark.service("ray") @@ -140,6 +28,13 @@ def task_runner(self, request): request.param._pytestfixturefunction.name or request.param.__name__ ) + def get_sleep_time(self) -> float: + """ + Return an amount of time to sleep for concurrency tests. + The RayTaskRunner is prone to flaking on concurrency tests. + """ + return 5.0 + # Ray wraps the exception, interrupts will result in "Cancelled" tasks # or "Killed" workers while normal errors will result in a "RayTaskError". # We care more about the crash detection and From 97ce9ca3acb0e75a6a4a8b4a77f98744204067e8 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 11 May 2022 15:05:49 -0600 Subject: [PATCH 05/25] Lint and add to dev req --- prefect_ray/__init__.py | 2 +- requirements_dev.txt | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/prefect_ray/__init__.py b/prefect_ray/__init__.py index 26e1812..d9730a8 100644 --- a/prefect_ray/__init__.py +++ b/prefect_ray/__init__.py @@ -3,4 +3,4 @@ __version__ = _version.get_versions()["version"] -from .task_runners import RayTaskRunner +from .task_runners import RayTaskRunner # noqa diff --git a/requirements_dev.txt b/requirements_dev.txt index ead4728..4fcbeff 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -11,4 +11,5 @@ pytest-asyncio mock; python_version < '3.8' mkdocs-gen-files interrogate -coverage \ No newline at end of file +coverage +ray From 0154819795179153e19d5a9f92786f96db264dd8 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 11 May 2022 15:08:28 -0600 Subject: [PATCH 06/25] Fix reqs? --- requirements.txt | 4 +--- requirements_dev.txt | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 107ddf5..cdffd6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,2 @@ prefect>=2.0a13 -ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" -ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" - +ray[default]>=1.11.0 diff --git a/requirements_dev.txt b/requirements_dev.txt index 4fcbeff..100aaa5 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -12,4 +12,3 @@ mock; python_version < '3.8' mkdocs-gen-files interrogate coverage -ray From 2397f4b7563c3943badd6463f876d9667d6eabba Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 11 May 2022 15:13:49 -0600 Subject: [PATCH 07/25] Fix reqs and lint --- .pre-commit-config.yaml | 4 ++-- requirements.txt | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e09fb4c..47b4b4b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ repos: - id: isort language_version: python3 - repo: https://github.com/psf/black - rev: 22.1.0 + rev: 22.3.0 hooks: - id: black language_version: python3 @@ -18,4 +18,4 @@ repos: hooks: - id: interrogate args: [-vv] - pass_filenames: false \ No newline at end of file + pass_filenames: false diff --git a/requirements.txt b/requirements.txt index cdffd6f..107ddf5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ prefect>=2.0a13 -ray[default]>=1.11.0 +ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" +ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" + From 30f232bcb83b5ec521d2f6b34fc4a6b8a18f4cf7 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 11 May 2022 15:37:43 -0600 Subject: [PATCH 08/25] Update docs --- README.md | 26 ++++++++++++++++---------- mkdocs.yml | 3 +-- prefect_ray/task_runners.py | 2 -- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 613b198..0002bb9 100644 --- a/README.md +++ b/README.md @@ -25,19 +25,25 @@ pip install prefect-ray ### Write and run a flow ```python -from prefect import flow -from prefect_ray.tasks import ( - goodbye_prefect_ray, - hello_prefect_ray, -) +from prefect import flow, task +from prefect.task_runners import RayTaskRunner +@task +def say_hello(name): + print(f"hello {name}") -@flow -def example_flow(): - hello_prefect_ray - goodbye_prefect_ray +@task +def say_goodbye(name): + print(f"goodbye {name}") -example_flow() +@flow(task_runner=RayTaskRunner()) +def greetings(names): + for name in names: + say_hello(name) + say_goodbye(name) + +if __name__ == "__main__": + greetings(["arthur", "trillian", "ford", "marvin"]) ``` ## Resources diff --git a/mkdocs.yml b/mkdocs.yml index ca1d1e6..aea4137 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -35,6 +35,5 @@ plugins: nav: - Home: index.md - - Tasks: tasks.md - - Flows: flows.md + - Task Runners: task_runners.md diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 8914de2..8033cb0 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -36,8 +36,6 @@ hello marvin goodbye ford goodbye trillian - -For usage details, see the [Task Runners](/concepts/task-runners/) documentation. """ from contextlib import AsyncExitStack From 03d15aafa17b115c4ae2eda7e76db5e9d8003488 Mon Sep 17 00:00:00 2001 From: Andrew <15331990+ahuang11@users.noreply.github.com> Date: Wed, 11 May 2022 15:44:02 -0600 Subject: [PATCH 09/25] Update task_runners.py --- prefect_ray/task_runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 8033cb0..bc3baba 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -26,7 +26,7 @@ Switching to a `RayTaskRunner`: >>> from prefect.task_runners import RayTaskRunner - >>> flow.task_runner = DaskTaskRunner() + >>> flow.task_runner = RayTaskRunner() >>> greetings(["arthur", "trillian", "ford", "marvin"]) hello arthur goodbye arthur From dc3592bc26c7480a5fd3ad3a4bf1613907f4f8a0 Mon Sep 17 00:00:00 2001 From: Andrew <15331990+ahuang11@users.noreply.github.com> Date: Wed, 11 May 2022 16:52:32 -0600 Subject: [PATCH 10/25] Remove 3.10 --- .github/workflows/tests.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 96e078b..3cc3763 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,9 @@ jobs: - "3.7" - "3.8" - "3.9" - - "3.10" + # temporarily remove because Ray doesn't support yet + # https://github.com/ray-project/ray/issues/19116 + # - "3.10" fail-fast: false steps: - uses: actions/checkout@v3 From 3b578062cade0b48bd71bc449286d8e904fc6246 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 12 May 2022 12:12:13 -0600 Subject: [PATCH 11/25] Remove service mark and conftest --- prefect_ray/task_runners.py | 32 --------- tests/conftest.py | 124 ----------------------------------- tests/test_task_runners.py | 127 ++++++++++++++++++++++++++++++++++-- 3 files changed, 121 insertions(+), 162 deletions(-) delete mode 100644 tests/conftest.py diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 8033cb0..f86b6d6 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -4,38 +4,6 @@ responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow. - -Example: - >>> from prefect import flow, task - >>> from prefect.task_runners import SequentialTaskRunner - >>> from typing import List - >>> - >>> @task - >>> def say_hello(name): - ... print(f"hello {name}") - >>> - >>> @task - >>> def say_goodbye(name): - ... print(f"goodbye {name}") - >>> - >>> @flow(task_runner=SequentialTaskRunner()) - >>> def greetings(names: List[str]): - ... for name in names: - ... say_hello(name) - ... say_goodbye(name) - - Switching to a `RayTaskRunner`: - >>> from prefect.task_runners import RayTaskRunner - >>> flow.task_runner = DaskTaskRunner() - >>> greetings(["arthur", "trillian", "ford", "marvin"]) - hello arthur - goodbye arthur - hello trillian - hello ford - goodbye marvin - hello marvin - goodbye ford - goodbye trillian """ from contextlib import AsyncExitStack diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index e1e43aa..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,124 +0,0 @@ -import asyncio -import subprocess -import warnings - -import prefect -import pytest -import ray -import ray.cluster_utils -from prefect.testing.fixtures import hosted_orion_api, use_hosted_orion # noqa: F401 - -import tests -from prefect_ray import RayTaskRunner - - -@pytest.fixture(scope="session") -def event_loop(): - loop = asyncio.get_event_loop() - yield loop - loop.close() - - -@pytest.fixture(scope="module") -def machine_ray_instance(): - """ - Starts a ray instance for the current machine - """ - subprocess.check_call( - ["ray", "start", "--head", "--include-dashboard", "False"], - cwd=str(prefect.__root_path__), - ) - try: - yield "ray://127.0.0.1:10001" - finally: - subprocess.run(["ray", "stop"]) - - -@pytest.fixture -@pytest.mark.service("ray") -def default_ray_task_runner(): - with warnings.catch_warnings(): - # Ray does not properly close resources and we do not want their warnings to - # bubble into our test suite - # https://github.com/ray-project/ray/pull/22419 - warnings.simplefilter("ignore", ResourceWarning) - - yield RayTaskRunner() - - -@pytest.fixture -def ray_task_runner_with_existing_cluster( - machine_ray_instance, use_hosted_orion, hosted_orion_api # noqa: F811 -): - """ - Generate a ray task runner that's connected to a ray instance running in a separate - process. - - This tests connection via `ray://` which is a client-based connection. - """ - yield RayTaskRunner( - address=machine_ray_instance, - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [prefect, tests] - } - }, - ) - - -@pytest.fixture(scope="module") -def inprocess_ray_cluster(): - """ - Starts a ray cluster in-process - """ - cluster = ray.cluster_utils.Cluster(initialize_head=True) - try: - cluster.add_node() # We need to add a second node for parallelism - yield cluster - finally: - cluster.shutdown() - - -@pytest.fixture -def ray_task_runner_with_inprocess_cluster( - inprocess_ray_cluster, use_hosted_orion, hosted_orion_api # noqa: F811 -): - """ - Generate a ray task runner that's connected to an in-process cluster. - - This tests connection via 'localhost' which is not a client-based connection. - """ - - yield RayTaskRunner( - address=inprocess_ray_cluster.address, - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [prefect, tests] - } - }, - ) - - -@pytest.fixture -def ray_task_runner_with_temporary_cluster( - use_hosted_orion, hosted_orion_api # noqa: F811 -): - """ - Generate a ray task runner that creates a temporary cluster. - - This tests connection via 'localhost' which is not a client-based connection. - """ - - yield RayTaskRunner( - init_kwargs={ - "runtime_env": { - # Ship the 'tests' module to the workers or they will not be able to - # deserialize test tasks / flows - "py_modules": [prefect, tests] - } - }, - ) diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 8447692..bda711c 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -1,26 +1,141 @@ +import asyncio +import subprocess +import warnings from uuid import uuid4 +import prefect import pytest +import ray +import ray.cluster_utils from prefect.orion.schemas.core import TaskRun from prefect.states import State from prefect.task_runners import TaskConcurrencyType +from prefect.testing.fixtures import hosted_orion_api, use_hosted_orion # noqa: F401 from prefect.testing.standard_test_suites import TaskRunnerStandardTestSuite -# unable to get this to import automatically within pytest fixture -from .conftest import ( - default_ray_task_runner, - ray_task_runner_with_existing_cluster, - ray_task_runner_with_inprocess_cluster, -) +import tests +from prefect_ray import RayTaskRunner +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="module") +def machine_ray_instance(): + """ + Starts a ray instance for the current machine + """ + subprocess.check_call( + ["ray", "start", "--head", "--include-dashboard", "False"], + cwd=str(prefect.__root_path__), + ) + try: + yield "ray://127.0.0.1:10001" + finally: + subprocess.run(["ray", "stop"]) + + +@pytest.fixture @pytest.mark.service("ray") +def default_ray_task_runner(): + with warnings.catch_warnings(): + # Ray does not properly close resources and we do not want their warnings to + # bubble into our test suite + # https://github.com/ray-project/ray/pull/22419 + warnings.simplefilter("ignore", ResourceWarning) + + yield RayTaskRunner() + + +@pytest.fixture +def ray_task_runner_with_existing_cluster( + machine_ray_instance, use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that's connected to a ray instance running in a separate + process. + + This tests connection via `ray://` which is a client-based connection. + """ + yield RayTaskRunner( + address=machine_ray_instance, + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [tests] + } + }, + ) + + +@pytest.fixture(scope="module") +def inprocess_ray_cluster(): + """ + Starts a ray cluster in-process + """ + cluster = ray.cluster_utils.Cluster(initialize_head=True) + try: + cluster.add_node() # We need to add a second node for parallelism + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture +def ray_task_runner_with_inprocess_cluster( + inprocess_ray_cluster, use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that's connected to an in-process cluster. + + This tests connection via 'localhost' which is not a client-based connection. + """ + + yield RayTaskRunner( + address=inprocess_ray_cluster.address, + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [tests] + } + }, + ) + + +@pytest.fixture +def ray_task_runner_with_temporary_cluster( + use_hosted_orion, hosted_orion_api # noqa: F811 +): + """ + Generate a ray task runner that creates a temporary cluster. + + This tests connection via 'localhost' which is not a client-based connection. + """ + + yield RayTaskRunner( + init_kwargs={ + "runtime_env": { + # Ship the 'tests' module to the workers or they will not be able to + # deserialize test tasks / flows + "py_modules": [tests] + } + }, + ) + + class TestRayTaskRunner(TaskRunnerStandardTestSuite): @pytest.fixture( params=[ default_ray_task_runner, ray_task_runner_with_existing_cluster, ray_task_runner_with_inprocess_cluster, + ray_task_runner_with_temporary_cluster, ] ) def task_runner(self, request): From 3a5d44990188ca241932006da22e4f4139e29e6e Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 12 May 2022 12:43:10 -0600 Subject: [PATCH 12/25] Remove service mark --- tests/test_task_runners.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index bda711c..52e4533 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -40,7 +40,6 @@ def machine_ray_instance(): @pytest.fixture -@pytest.mark.service("ray") def default_ray_task_runner(): with warnings.catch_warnings(): # Ray does not properly close resources and we do not want their warnings to From 4326e645a0378c0c90124f5ec1fd5b82cadc3931 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 12 May 2022 17:54:26 -0600 Subject: [PATCH 13/25] Add orion main to dev req --- requirements_dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index 100aaa5..5b6c52f 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -12,3 +12,4 @@ mock; python_version < '3.8' mkdocs-gen-files interrogate coverage +git+https://github.com/PrefectHQ/orion.git@main From 9c7055e77563b09d3918bd472f8c743599139421 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 12 May 2022 17:59:09 -0600 Subject: [PATCH 14/25] Fix reqs --- requirements_dev.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 5b6c52f..af2d092 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -12,4 +12,5 @@ mock; python_version < '3.8' mkdocs-gen-files interrogate coverage -git+https://github.com/PrefectHQ/orion.git@main +orion @ git+https://github.com/PrefectHQ/orion.git + From 1f5ebfc145e9907534a93511c68644a4f0839c7f Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Fri, 13 May 2022 09:57:04 -0500 Subject: [PATCH 15/25] Install from git --- requirements.txt | 2 +- requirements_dev.txt | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 107ddf5..3d36d82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -prefect>=2.0a13 +git+https://github.com/PrefectHQ/prefect@orion ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" diff --git a/requirements_dev.txt b/requirements_dev.txt index af2d092..100aaa5 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -12,5 +12,3 @@ mock; python_version < '3.8' mkdocs-gen-files interrogate coverage -orion @ git+https://github.com/PrefectHQ/orion.git - From 494c7ee4def7b3cba59d8db35b707075ad0daca5 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Fri, 13 May 2022 12:54:26 -0500 Subject: [PATCH 16/25] Fix requirement compat with setuptools --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 3d36d82..ae126c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/PrefectHQ/prefect@orion +prefect @ git+https://github.com/PrefectHQ/prefect@orion ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" From 2c075534dc1960e63b6b8676d8a8f57623db1290 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 May 2022 16:08:48 -0600 Subject: [PATCH 17/25] Add flaky --- requirements_dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index 100aaa5..0b1f87c 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,6 +1,7 @@ pytest black flake8 +flaky mypy mkdocs mkdocs-material From 67dcd3c17d1a4aa0fd79a72621c58eeeb2472741 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 May 2022 18:21:37 -0600 Subject: [PATCH 18/25] Copy event loop --- tests/test_task_runners.py | 42 ++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 52e4533..56da293 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -1,5 +1,7 @@ import asyncio +import logging import subprocess +import sys import warnings from uuid import uuid4 @@ -18,10 +20,42 @@ @pytest.fixture(scope="session") -def event_loop(): - loop = asyncio.get_event_loop() - yield loop - loop.close() +def event_loop(request): + """ + Redefine the event loop to support session/module-scoped fixtures; + see https://github.com/pytest-dev/pytest-asyncio/issues/68 + When running on Windows we need to use a non-default loop for subprocess support. + """ + if sys.platform == "win32" and sys.version_info >= (3, 8): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + + policy = asyncio.get_event_loop_policy() + + if sys.version_info < (3, 8) and sys.platform != "win32": + from prefect.utilities.compat import ThreadedChildWatcher + + # Python < 3.8 does not use a `ThreadedChildWatcher` by default which can + # lead to errors in tests as the previous default `SafeChildWatcher` is not + # compatible with threaded event loops. + policy.set_child_watcher(ThreadedChildWatcher()) + + loop = policy.new_event_loop() + + # configure asyncio logging to capture long running tasks + asyncio_logger = logging.getLogger("asyncio") + asyncio_logger.setLevel("WARNING") + asyncio_logger.addHandler(logging.StreamHandler()) + loop.set_debug(True) + loop.slow_callback_duration = 0.25 + + try: + yield loop + finally: + loop.close() + + # Workaround for failures in pytest_asyncio 0.17; + # see https://github.com/pytest-dev/pytest-asyncio/issues/257 + policy.set_event_loop(loop) @pytest.fixture(scope="module") From 83ea0dac147d23db1c7a89a9c9598f22753d7dae Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 May 2022 18:22:44 -0600 Subject: [PATCH 19/25] Remove delayed import --- prefect_ray/task_runners.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index bc3baba..50bbbb0 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -144,24 +144,6 @@ async def wait( return result - @property - def _ray(self) -> "ray": - """ - Delayed import of `ray` allowing configuration of the task runner - without the extra installed and improves `prefect` import times. - """ - global ray - - if ray is None: - try: - import ray - except ImportError as exc: - raise RuntimeError( - "Using the `RayTaskRunner` requires `ray` to be installed." - ) from exc - - return ray - async def _start(self, exit_stack: AsyncExitStack): """ Start the task runner and prep for context exit. @@ -182,7 +164,7 @@ async def _start(self, exit_stack: AsyncExitStack): # In ray < 1.11.0, connecting to an out-of-process cluster (e.g. ray://ip) # returns a `ClientContext` otherwise it returns a `dict`. # In ray >= 1.11.0, a context is always returned. - context_or_metadata = self._ray.init(*init_args, **self.init_kwargs) + context_or_metadata = ray.init(*init_args, **self.init_kwargs) if hasattr(context_or_metadata, "__enter__"): context = context_or_metadata metadata = getattr(context, "address_info", {}) @@ -213,7 +195,7 @@ async def _shutdown_ray(self): Shuts down the cluster. """ self.logger.debug("Shutting down Ray cluster...") - self._ray.shutdown() + ray.shutdown() def _get_ray_ref(self, prefect_future: PrefectFuture) -> "ray.ObjectRef": """ From b67314b1716f402333790cdbc94a8ea791e9e17b Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 19 May 2022 12:54:46 -0600 Subject: [PATCH 20/25] Update requirements to fix tests? --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index ae126c6..2aaf364 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ prefect @ git+https://github.com/PrefectHQ/prefect@orion -ray[default] >= 1.12.0; python_version > '3.7' and python_version < '3.10' and platform_machine == "x86_64" -ray[default] >= 1.9, < 1.11.0; python_version == '3.7' and platform_machine == "x86_64" +ray[default] >= 1.12.0; python_version >= '3.7' and python_version < '3.10' and platform_machine == "x86_64" From f284fd1233c941f0cf44230f4acc743e8f76dfc4 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Thu, 19 May 2022 13:25:28 -0600 Subject: [PATCH 21/25] Remove support for <1.12 --- prefect_ray/task_runners.py | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 50bbbb0..d9a0eba 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -161,24 +161,9 @@ async def _start(self, exit_stack: AsyncExitStack): self.logger.info("Creating a local Ray instance") init_args = () - # In ray < 1.11.0, connecting to an out-of-process cluster (e.g. ray://ip) - # returns a `ClientContext` otherwise it returns a `dict`. - # In ray >= 1.11.0, a context is always returned. - context_or_metadata = ray.init(*init_args, **self.init_kwargs) - if hasattr(context_or_metadata, "__enter__"): - context = context_or_metadata - metadata = getattr(context, "address_info", {}) - dashboard_url = getattr(context, "dashboard_url", None) - else: - context = None - metadata = context_or_metadata - dashboard_url = metadata.get("webui_url") - - if context: - exit_stack.push(context) - else: - # If not given a context, call shutdown manually at exit - exit_stack.push_async_callback(self._shutdown_ray) + context = ray.init(*init_args, **self.init_kwargs) + dashboard_url = getattr(context, "dashboard_url", None) + exit_stack.push(context) # Display some information about the cluster nodes = ray.nodes() From 9851cde711d188c01ef22dad541e4cd14736e162 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 27 May 2022 15:39:58 -0500 Subject: [PATCH 22/25] Addresses review comments and updates prefect version --- prefect_ray/task_runners.py | 9 --------- requirements.txt | 2 +- tests/test_task_runners.py | 8 -------- 3 files changed, 1 insertion(+), 18 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index d9a0eba..4e6b433 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -76,9 +76,6 @@ def __init__( address: str = None, init_kwargs: dict = None, ): - """ - Initialize keywords. - """ # Store settings self.address = address self.init_kwargs = init_kwargs.copy() if init_kwargs else {} @@ -93,9 +90,6 @@ def __init__( @property def concurrency_type(self) -> TaskConcurrencyType: - """ - Set the concurrency type; parallel for Ray. - """ return TaskConcurrencyType.PARALLEL async def submit( @@ -105,9 +99,6 @@ async def submit( run_kwargs: Dict[str, Any], asynchronous: A = True, ) -> PrefectFuture[R, A]: - """ - Submit task. - """ if not self._started: raise RuntimeError( "The task runner must be started before submitting work." diff --git a/requirements.txt b/requirements.txt index 2aaf364..69b6204 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -prefect @ git+https://github.com/PrefectHQ/prefect@orion +prefect>=2.0a5 ray[default] >= 1.12.0; python_version >= '3.7' and python_version < '3.10' and platform_machine == "x86_64" diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 56da293..bfde9cc 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -183,10 +183,6 @@ def get_sleep_time(self) -> float: """ return 5.0 - # Ray wraps the exception, interrupts will result in "Cancelled" tasks - # or "Killed" workers while normal errors will result in a "RayTaskError". - # We care more about the crash detection and - # lack of re-raise here than the equality of the exception. @pytest.mark.parametrize("exception", [KeyboardInterrupt(), ValueError("test")]) async def test_wait_captures_exceptions_as_crashed_state( self, task_runner, exception @@ -197,10 +193,6 @@ async def test_wait_captures_exceptions_as_crashed_state( We care more about the crash detection and lack of re-raise here than the equality of the exception. """ - if task_runner.concurrency_type != TaskConcurrencyType.PARALLEL: - pytest.skip( - f"This will raise for {task_runner.concurrency_type} task runners." - ) task_run = TaskRun(flow_run_id=uuid4(), task_key="foo", dynamic_key="bar") From 63ee8e2a688db8fa78c48c1edc9886c05316d520 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 27 May 2022 15:51:42 -0500 Subject: [PATCH 23/25] Ignores task_runners module for interrogate since we want the base class docstrings --- prefect_ray/task_runners.py | 3 --- setup.cfg | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py index 4e6b433..1569261 100644 --- a/prefect_ray/task_runners.py +++ b/prefect_ray/task_runners.py @@ -118,9 +118,6 @@ async def wait( prefect_future: PrefectFuture, timeout: float = None, ) -> Optional[State]: - """ - Wait for task to complete. - """ ref = self._get_ray_ref(prefect_future) result = None diff --git a/setup.cfg b/setup.cfg index 6d7a80a..09e2455 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,7 +23,7 @@ parentdir_prefix = [tool:interrogate] ignore-init-module = True -exclude = prefect_ray/_version.py, tests, setup.py, versioneer.py, docs, site +exclude = prefect_ray/_version.py, tests, setup.py, versioneer.py, docs, site, prefect_ray/task_runners.py fail-under = 95 omit-covered-files = True From ec7fa189793e174125c8d1e700c66e853b3575a5 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Sat, 28 May 2022 06:31:27 -0500 Subject: [PATCH 24/25] Resolves static analysis errors --- setup.cfg | 4 +++- tests/test_task_runners.py | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index 09e2455..bc82a07 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,7 +23,9 @@ parentdir_prefix = [tool:interrogate] ignore-init-module = True -exclude = prefect_ray/_version.py, tests, setup.py, versioneer.py, docs, site, prefect_ray/task_runners.py +exclude = prefect_ray/_version.py, tests, setup.py, versioneer.py, docs, site +ignore_init_method = True +ignore_regex = submit,wait,concurrency_type fail-under = 95 omit-covered-files = True diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index bfde9cc..638e8fd 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -11,7 +11,6 @@ import ray.cluster_utils from prefect.orion.schemas.core import TaskRun from prefect.states import State -from prefect.task_runners import TaskConcurrencyType from prefect.testing.fixtures import hosted_orion_api, use_hosted_orion # noqa: F401 from prefect.testing.standard_test_suites import TaskRunnerStandardTestSuite From 4eb2ce3f3b540918227b98355171219f5063ca97 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 31 May 2022 14:51:22 -0700 Subject: [PATCH 25/25] Added changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05d3e9d..067a434 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,4 +26,4 @@ Released on ????? ?th, 20??. ### Added -- `task_name` task - [#1](https://github.com/PrefectHQ/prefect-ray/pull/1) +- Migrated `RayTaskRunner` - [#7](https://github.com/PrefectHQ/prefect-ray/pull/7)