diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 3a701559297c..236ca850c42e 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -4,6 +4,7 @@ import logging import os import random +import signal import string import subprocess import sys @@ -140,6 +141,7 @@ class JobSupervisor: Job supervisor actor should fate share with subprocess it created. """ + WAIT_FOR_JOB_TERMINATION_S = 3 SUBPROCESS_POLL_PERIOD_S = 0.1 def __init__( @@ -304,20 +306,14 @@ def _get_driver_env_vars(self, resources_specified: bool) -> Dict[str, str]: } async def _polling(self, child_process: subprocess.Popen) -> int: - try: - while child_process is not None: - return_code = child_process.poll() - if return_code is not None: - # subprocess finished with return code - return return_code - else: - # still running, yield control, 0.1s by default - await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S) - except Exception: - if child_process: - # TODO (jiaodong): Improve this with SIGTERM then SIGKILL - child_process.kill() - return 1 + while child_process is not None: + return_code = child_process.poll() + if return_code is not None: + # subprocess finished with return code + return return_code + else: + # still running, yield control, 0.1s by default + await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S) async def run( self, @@ -375,12 +371,38 @@ async def run( ) if self._stop_event.is_set(): - polling_task.cancel() if sys.platform == "win32" and self._win32_job_object: + polling_task.cancel() win32job.TerminateJobObject(self._win32_job_object, -1) elif sys.platform != "win32": - # TODO (jiaodong): Improve this with SIGTERM then SIGKILL - child_process.kill() + try: + os.killpg(os.getpgid(child_process.pid), signal.SIGTERM) + except ProcessLookupError: + # Process already completed. + logger.info( + f"Job {self._job_id} completed on its own before it could " + "be manually terminated." + ) + pass + else: + # Wait for job to terminate gracefully, otherwise kill process + # forcefully after timeout. + try: + await asyncio.wait_for( + polling_task, self.WAIT_FOR_JOB_TERMINATION_S + ) + logger.info( + f"Job {self._job_id} has been terminated gracefully." + ) + except asyncio.TimeoutError: + logger.warning( + f"Attempt to gracefully terminate job {self._job_id} " + "through SIGTERM has timed out after " + f"{self.WAIT_FOR_JOB_TERMINATION_S} seconds. Job is " + "now being force-killed." + ) + polling_task.cancel() + child_process.kill() await self._job_info_client.put_status(self._job_id, JobStatus.STOPPED) else: # Child process finished execution and no stop event is set diff --git a/dashboard/modules/job/sdk.py b/dashboard/modules/job/sdk.py index 635ab158546a..3fd768cd8892 100644 --- a/dashboard/modules/job/sdk.py +++ b/dashboard/modules/job/sdk.py @@ -232,6 +232,8 @@ def stop_job( ) -> bool: """Request a job to exit asynchronously. + Attempts to terminate process first, then kills process after timeout. + Example: >>> from ray.job_submission import JobSubmissionClient >>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 9ca297b3e966..d2e321a4e964 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -21,7 +21,11 @@ async_wait_for_condition_async_predicate, ) from ray.dashboard.modules.job.common import JOB_ID_METADATA_KEY, JOB_NAME_METADATA_KEY -from ray.dashboard.modules.job.job_manager import JobManager, generate_job_id +from ray.dashboard.modules.job.job_manager import ( + JobManager, + JobSupervisor, + generate_job_id, +) from ray.dashboard.consts import RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR from ray.job_submission import JobStatus from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy # noqa: F401 @@ -778,6 +782,76 @@ async def test_stopped_job(self, job_manager): ) +@pytest.mark.asyncio +async def test_stop_job_gracefully(job_manager): + """ + Stop job should send SIGTERM to child process (before trying to kill). + """ + entrypoint = """python -c \" +import sys +import signal +import time +def handler(*args): + print('SIGTERM signal handled!'); + sys.exit() +signal.signal(signal.SIGTERM, handler) + +while True: + print('Waiting...') + time.sleep(1)\" +""" + job_id = await job_manager.submit_job(entrypoint=entrypoint) + + await async_wait_for_condition( + lambda: "Waiting..." in job_manager.get_job_logs(job_id) + ) + + assert job_manager.stop_job(job_id) is True + + await async_wait_for_condition_async_predicate( + check_job_stopped, job_manager=job_manager, job_id=job_id + ) + + assert "SIGTERM signal handled!" in job_manager.get_job_logs(job_id) + + +@pytest.mark.asyncio +async def test_stop_job_timeout(job_manager): + """ + Stop job should send SIGTERM first, then if timeout occurs, send SIGKILL. + """ + entrypoint = """python -c \" +import sys +import signal +import time +def handler(*args): + print('SIGTERM signal handled!'); + pass +signal.signal(signal.SIGTERM, handler) + +while True: + print('Waiting...') + time.sleep(1)\" +""" + job_id = await job_manager.submit_job(entrypoint=entrypoint) + + await async_wait_for_condition( + lambda: "Waiting..." in job_manager.get_job_logs(job_id) + ) + + assert job_manager.stop_job(job_id) is True + + await async_wait_for_condition( + lambda: "SIGTERM signal handled!" in job_manager.get_job_logs(job_id) + ) + await async_wait_for_condition_async_predicate( + check_job_stopped, + job_manager=job_manager, + job_id=job_id, + timeout=JobSupervisor.WAIT_FOR_JOB_TERMINATION_S, + ) + + @pytest.mark.asyncio async def test_logs_streaming(job_manager): """Test that logs are streamed during the job, not just at the end."""