Skip to content

Commit

Permalink
[Jobs] Use SIGTERM followed by SIGKILL to stop a job (#30851)
Browse files Browse the repository at this point in the history
Currently, when user wants to stop a job, we directly send a `SIGKILL` signal. Instead, we want to send a `SIGTERM` signal first, then send a `SIGKILL` signal after a few seconds if the child process still has not terminated.
  • Loading branch information
zcin authored Dec 15, 2022
1 parent 64d744b commit 22af732
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 18 deletions.
56 changes: 39 additions & 17 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import random
import signal
import string
import subprocess
import sys
Expand Down Expand Up @@ -140,6 +141,7 @@ class JobSupervisor:
Job supervisor actor should fate share with subprocess it created.
"""

WAIT_FOR_JOB_TERMINATION_S = 3
SUBPROCESS_POLL_PERIOD_S = 0.1

def __init__(
Expand Down Expand Up @@ -304,20 +306,14 @@ def _get_driver_env_vars(self, resources_specified: bool) -> Dict[str, str]:
}

async def _polling(self, child_process: subprocess.Popen) -> int:
try:
while child_process is not None:
return_code = child_process.poll()
if return_code is not None:
# subprocess finished with return code
return return_code
else:
# still running, yield control, 0.1s by default
await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S)
except Exception:
if child_process:
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
child_process.kill()
return 1
while child_process is not None:
return_code = child_process.poll()
if return_code is not None:
# subprocess finished with return code
return return_code
else:
# still running, yield control, 0.1s by default
await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S)

async def run(
self,
Expand Down Expand Up @@ -375,12 +371,38 @@ async def run(
)

if self._stop_event.is_set():
polling_task.cancel()
if sys.platform == "win32" and self._win32_job_object:
polling_task.cancel()
win32job.TerminateJobObject(self._win32_job_object, -1)
elif sys.platform != "win32":
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
child_process.kill()
try:
os.killpg(os.getpgid(child_process.pid), signal.SIGTERM)
except ProcessLookupError:
# Process already completed.
logger.info(
f"Job {self._job_id} completed on its own before it could "
"be manually terminated."
)
pass
else:
# Wait for job to terminate gracefully, otherwise kill process
# forcefully after timeout.
try:
await asyncio.wait_for(
polling_task, self.WAIT_FOR_JOB_TERMINATION_S
)
logger.info(
f"Job {self._job_id} has been terminated gracefully."
)
except asyncio.TimeoutError:
logger.warning(
f"Attempt to gracefully terminate job {self._job_id} "
"through SIGTERM has timed out after "
f"{self.WAIT_FOR_JOB_TERMINATION_S} seconds. Job is "
"now being force-killed."
)
polling_task.cancel()
child_process.kill()
await self._job_info_client.put_status(self._job_id, JobStatus.STOPPED)
else:
# Child process finished execution and no stop event is set
Expand Down
2 changes: 2 additions & 0 deletions dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ def stop_job(
) -> bool:
"""Request a job to exit asynchronously.
Attempts to terminate process first, then kills process after timeout.
Example:
>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP
Expand Down
76 changes: 75 additions & 1 deletion dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
async_wait_for_condition_async_predicate,
)
from ray.dashboard.modules.job.common import JOB_ID_METADATA_KEY, JOB_NAME_METADATA_KEY
from ray.dashboard.modules.job.job_manager import JobManager, generate_job_id
from ray.dashboard.modules.job.job_manager import (
JobManager,
JobSupervisor,
generate_job_id,
)
from ray.dashboard.consts import RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR
from ray.job_submission import JobStatus
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy # noqa: F401
Expand Down Expand Up @@ -778,6 +782,76 @@ async def test_stopped_job(self, job_manager):
)


@pytest.mark.asyncio
async def test_stop_job_gracefully(job_manager):
"""
Stop job should send SIGTERM to child process (before trying to kill).
"""
entrypoint = """python -c \"
import sys
import signal
import time
def handler(*args):
print('SIGTERM signal handled!');
sys.exit()
signal.signal(signal.SIGTERM, handler)
while True:
print('Waiting...')
time.sleep(1)\"
"""
job_id = await job_manager.submit_job(entrypoint=entrypoint)

await async_wait_for_condition(
lambda: "Waiting..." in job_manager.get_job_logs(job_id)
)

assert job_manager.stop_job(job_id) is True

await async_wait_for_condition_async_predicate(
check_job_stopped, job_manager=job_manager, job_id=job_id
)

assert "SIGTERM signal handled!" in job_manager.get_job_logs(job_id)


@pytest.mark.asyncio
async def test_stop_job_timeout(job_manager):
"""
Stop job should send SIGTERM first, then if timeout occurs, send SIGKILL.
"""
entrypoint = """python -c \"
import sys
import signal
import time
def handler(*args):
print('SIGTERM signal handled!');
pass
signal.signal(signal.SIGTERM, handler)
while True:
print('Waiting...')
time.sleep(1)\"
"""
job_id = await job_manager.submit_job(entrypoint=entrypoint)

await async_wait_for_condition(
lambda: "Waiting..." in job_manager.get_job_logs(job_id)
)

assert job_manager.stop_job(job_id) is True

await async_wait_for_condition(
lambda: "SIGTERM signal handled!" in job_manager.get_job_logs(job_id)
)
await async_wait_for_condition_async_predicate(
check_job_stopped,
job_manager=job_manager,
job_id=job_id,
timeout=JobSupervisor.WAIT_FOR_JOB_TERMINATION_S,
)


@pytest.mark.asyncio
async def test_logs_streaming(job_manager):
"""Test that logs are streamed during the job, not just at the end."""
Expand Down

0 comments on commit 22af732

Please sign in to comment.