Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor subprocesses to avoid zombies #18218

Merged
merged 21 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions src/lightning/fabric/strategies/launchers/base.py

This file was deleted.

41 changes: 39 additions & 2 deletions src/lightning/fabric/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import os
import signal
import subprocess
import sys
from typing import Any, Callable, Optional, Sequence, Tuple
import time
from threading import Thread
from typing import Any, Callable, Optional, Sequence, Tuple, List

from lightning_utilities.core.imports import RequirementCache

from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
from lightning.fabric.strategies.launchers.launcher import _Launcher

_logger = logging.getLogger(__name__)
_HYDRA_AVAILABLE = RequirementCache("hydra-core")


Expand Down Expand Up @@ -71,6 +77,7 @@ def __init__(
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes
self.procs: List[subprocess.Popen] = [] # launched child subprocesses, does not include the launcher

@property
def is_interactive_compatible(self) -> bool:
Expand All @@ -87,6 +94,7 @@ def launch(self, function: Callable, *args: Any, **kwargs: Any) -> Any:
"""
if not self.cluster_environment.creates_processes_externally:
self._call_children_scripts()
_launch_process_monitor(self.procs)
return function(*args, **kwargs)

def _call_children_scripts(self) -> None:
Expand Down Expand Up @@ -122,9 +130,13 @@ def _call_children_scripts(self) -> None:
command, cwd = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy, cwd=cwd)

proc = subprocess.Popen(command, env=env_copy, cwd=cwd)
self.procs.append(proc)

def _check_can_spawn_children(self) -> None:
if len(self.procs) > 0:
raise RuntimeError(f"The launcher can only create subprocesses once.")
if self.cluster_environment.local_rank() != 0:
raise RuntimeError(
"Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen."
Expand Down Expand Up @@ -159,3 +171,28 @@ def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]:
# Set output_subdir null since we don't want different subprocesses trying to write to config.yaml
command += [f"hydra.run.dir={rundir}", f"hydra.job.name=train_ddp_process_{local_rank}", "hydra.output_subdir=null"]
return command, cwd


def _launch_process_monitor(child_processes: List[subprocess.Popen]) -> None:
# A thread that runs along the main process and monitors the health of all processes
monitor_thread = Thread(
target=_monitor_child_processes,
kwargs={"child_processes": child_processes, "main_pid": os.getpid()},
daemon=True, # thread stops if the main process exits
)
monitor_thread.start()


def _monitor_child_processes(main_pid: int, child_processes: List[subprocess.Popen], sleep_period: int = 5) -> None:
for proc in itertools.cycle(child_processes):
time.sleep(sleep_period)
exit_code = proc.poll()
if exit_code not in (None, 0):
_logger.info(
f"Child process with PID {proc.pid} terminated with code {exit_code}."
f" Forcefully terminating all other processes to avoid zombies 🧟"
)
for p in child_processes:
p.send_signal(signal.SIGKILL)
os.kill(main_pid, signal.SIGKILL)
break
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import lightning.pytorch as pl
from lightning.fabric.plugins import ClusterEnvironment
from lightning.fabric.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd
from lightning.fabric.strategies.launchers.subprocess_script import (
_basic_subprocess_cmd,
_hydra_subprocess_cmd,
_launch_process_monitor
)
from lightning.pytorch.strategies.launchers.launcher import _Launcher
from lightning.pytorch.trainer.connectors.signal_connector import _SIGNUM

Expand Down Expand Up @@ -70,7 +74,7 @@ def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int,
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes
self.procs: List[subprocess.Popen] = [] # launched subprocesses. does not include the launcher
self.procs: List[subprocess.Popen] = [] # launched child subprocesses, does not include the launcher

@property
def is_interactive_compatible(self) -> bool:
Expand All @@ -88,6 +92,7 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"]
"""
if not self.cluster_environment.creates_processes_externally:
self._call_children_scripts()
_launch_process_monitor(self.procs)
return function(*args, **kwargs)

def kill(self, signum: _SIGNUM) -> None:
Expand Down