Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dragon iteration two, clean history #5

Merged
merged 14 commits into from
Mar 6, 2024
32 changes: 28 additions & 4 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from smartsim import Experiment
from smartsim.entity import Model
from smartsim.database import Orchestrator
from smartsim.log import get_logger
from smartsim.settings import (
SrunSettings,
AprunSettings,
Expand All @@ -50,10 +51,12 @@
from subprocess import run
import sys
import tempfile
import time
import typing as t
import uuid
import warnings

logger = get_logger(__name__)

# pylint: disable=redefined-outer-name,invalid-name,global-statement

Expand All @@ -68,6 +71,7 @@
test_port = CONFIG.test_port
test_account = CONFIG.test_account or ""
test_batch_resources: t.Dict[t.Any,t.Any] = CONFIG.test_batch_resources
test_output_dirs = 0

# Fill this at runtime if needed
test_hostlist = None
Expand Down Expand Up @@ -119,6 +123,9 @@ def pytest_sessionstart(
if os.path.isdir(test_output_root):
shutil.rmtree(test_output_root)
os.makedirs(test_output_root)
while not os.path.isdir(test_output_root):
time.sleep(0.1)

print_test_configuration()


Expand All @@ -130,10 +137,20 @@ def pytest_sessionfinish(
returning the exit status to the system.
"""
if exitstatus == 0:
shutil.rmtree(test_output_root)
else:
# kill all spawned processes in case of error
kill_all_test_spawned_processes()
cleanup_attempts = 5
while cleanup_attempts > 0:
try:
shutil.rmtree(test_output_root)
except OSError as e:
cleanup_attempts -= 1
time.sleep(1)
if not cleanup_attempts:
raise
else:
break

# kill all spawned processes
kill_all_test_spawned_processes()


def kill_all_test_spawned_processes() -> None:
Expand Down Expand Up @@ -455,6 +472,13 @@ def environment_cleanup(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SSKEYOUT", raising=False)


@pytest.fixture(scope="function", autouse=True)
def check_output_dir() -> None:
global test_output_dirs
assert os.path.isdir(test_output_root)
assert len(os.listdir(test_output_root)) >= test_output_dirs
test_output_dirs = len(os.listdir(test_output_root))
MattToast marked this conversation as resolved.
Show resolved Hide resolved

@pytest.fixture
def dbutils() -> t.Type[DBUtils]:
return DBUtils
Expand Down
8 changes: 8 additions & 0 deletions smartsim/_core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ def telemetry_cooldown(self) -> int:
def telemetry_subdir(self) -> str:
return ".smartsim/telemetry"

@property
def dragon_default_subdir(self) -> str:
return ".smartsim/dragon"

@property
def dragon_log_filename(self) -> str:
return "dragon_config.log"


@lru_cache(maxsize=128, typed=False)
def get_config() -> Config:
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def start(
logger.warning(msg)
self._launcher.connect_to_dragon(dragon_server_paths[0])
else:
dragon_path = osp.join(exp_path, ".smartsim", "dragon")
dragon_path = osp.join(exp_path, CONFIG.dragon_default_subdir)
self._launcher.connect_to_dragon(dragon_path)
if not self._launcher.is_connected:
raise LauncherError("Could not connect to Dragon server")
Expand Down
23 changes: 17 additions & 6 deletions smartsim/_core/entrypoints/dragon.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import os
import signal
import socket
import textwrap
import typing as t
from types import FrameType
Expand All @@ -36,14 +37,20 @@

import smartsim._core.utils.helpers as _helpers
from smartsim._core.launcher.dragon.dragonBackend import DragonBackend
from smartsim._core.schemas import DragonBootstrapRequest, DragonBootstrapResponse
from smartsim._core.schemas import (
DragonBootstrapRequest,
DragonBootstrapResponse,
DragonShutdownResponse,
)
from smartsim._core.schemas.dragonRequests import request_serializer
from smartsim._core.schemas.dragonResponses import response_serializer
from smartsim._core.utils.network import get_best_interface_and_address

# kill is not catchable
SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM, signal.SIGABRT]

SHUTDOWN_INITIATED = False


def handle_signal(signo: int, _frame: t.Optional[FrameType]) -> None:
if not signo:
Expand All @@ -56,7 +63,7 @@ def handle_signal(signo: int, _frame: t.Optional[FrameType]) -> None:
context = zmq.Context()

"""
Redis/KeyDB entrypoint script
Dragon server entrypoint script
"""

DBPID: t.Optional[int] = None
Expand All @@ -71,6 +78,7 @@ def print_summary(network_interface: str, ip_address: str) -> None:
-------- Dragon Configuration --------
IPADDRESS: {ip_address}
NETWORK: {network_interface}
HOSTNAME: {socket.gethostname()}
DRAGON_SERVER_CONFIG: {json.dumps(zmq_config)}
--------------------------------------

Expand All @@ -81,20 +89,22 @@ def print_summary(network_interface: str, ip_address: str) -> None:


def run(dragon_head_address: str) -> None:
global SHUTDOWN_INITIATED # pylint: disable=global-statement
print(f"Opening socket {dragon_head_address}")
dragon_head_socket = context.socket(zmq.REP)
dragon_head_socket.bind(dragon_head_address)

dragon_backend = DragonBackend()

while True:
while not SHUTDOWN_INITIATED:
print(f"Listening to {dragon_head_address}")
req = dragon_head_socket.recv_json()
print(f"Received request: {req}")
drg_req = request_serializer.deserialize_from_json(str(req))
resp = dragon_backend.process_request(drg_req)
print(f"Sending response {resp}", flush=True)
dragon_head_socket.send_json(response_serializer.serialize_to_json(resp))
if isinstance(resp, DragonShutdownResponse):
SHUTDOWN_INITIATED = True


def main(args: argparse.Namespace) -> int:
Expand Down Expand Up @@ -132,11 +142,14 @@ def main(args: argparse.Namespace) -> int:

run(dragon_head_address=dragon_head_address)

print("Shutting down! Bye bye!")
return 0


def cleanup() -> None:
global SHUTDOWN_INITIATED # pylint: disable=global-statement
print("Cleaning up", flush=True)
SHUTDOWN_INITIATED = True


if __name__ == "__main__":
Expand All @@ -156,8 +169,6 @@ def cleanup() -> None:
)
args_ = parser.parse_args()

print(args_)

# make sure to register the cleanup before the start
# the process so our signaller will be able to stop
# the database process.
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/generation/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _gen_exp_dir(self) -> None:
)
if not path.isdir(self.gen_path):
# keep exists ok for race conditions on NFS
pathlib.Path(self.gen_path).mkdir(exist_ok=True)
pathlib.Path(self.gen_path).mkdir(exist_ok=True, parents=True)
else:
logger.log(
level=self.log_level, msg="Working in previously created experiment"
Expand Down Expand Up @@ -177,7 +177,7 @@ def _gen_orc_dir(self, orchestrator_list: t.List[Orchestrator]) -> None:
# Always remove orchestrator files if present.
if path.isdir(orc_path):
shutil.rmtree(orc_path, ignore_errors=True)
pathlib.Path(orc_path).mkdir(exist_ok=self.overwrite)
pathlib.Path(orc_path).mkdir(exist_ok=self.overwrite, parents=True)

def _gen_entity_list_dir(self, entity_lists: t.List[Ensemble]) -> None:
"""Generate directories for Ensemble instances
Expand Down
42 changes: 37 additions & 5 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
DragonResponse,
DragonRunRequest,
DragonRunResponse,
DragonShutdownRequest,
DragonShutdownResponse,
DragonStopRequest,
DragonStopResponse,
DragonUpdateStatusRequest,
Expand All @@ -51,6 +53,9 @@
STATUS_RUNNING,
)

# from dragon.infrastructure.policy import Policy, GS_DEFAULT_POLICY


# pylint: enable=import-error


Expand Down Expand Up @@ -79,6 +84,7 @@ def process_request(self, request: DragonRequest) -> DragonResponse:

@process_request.register
def _(self, request: DragonRunRequest) -> DragonRunResponse:

proc = TemplateProcess(
target=request.exe,
args=request.exe_args,
Expand All @@ -88,7 +94,24 @@ def _(self, request: DragonRunRequest) -> DragonRunResponse:
# stderr=Popen.PIPE,
)

grp = ProcessGroup(restart=False, pmi_enabled=True)
# WORK IN PROGRESS, needs new policy groups
# global_policy = Policy(placement=Policy.Placement.HOST_NAME,
# host_name=Node(node_list[request.nodes]).hostname)
# grp = ProcessGroup(restart=False, policy=global_policy)

# # create a process group that runs on a subset of nodes
# for node_num in range(num_nodes_to_use):
# node_name = Node(node_list[node_num]).hostname
# local_policy = Policy(placement=Policy.Placement.HOST_NAME,
# host_name=node_name)
# grp.add_process(nproc=num_procs_per_node,
# template=TemplateProcess(target=placement_info,
# args=args,
# cwd=cwd,
# stdout=Popen.PIPE,
# policy=local_policy))
MattToast marked this conversation as resolved.
Show resolved Hide resolved

grp = ProcessGroup(restart=False, pmi_enabled=request.pmi_enabled)
grp.add_process(nproc=request.tasks, template=proc)
step_id = self._get_new_id()
grp.init()
Expand All @@ -108,10 +131,13 @@ def _(self, request: DragonUpdateStatusRequest) -> DragonUpdateStatusResponse:
updated_statuses[step_id] = (STATUS_RUNNING, return_codes)
else:
if all(proc_id is not None for proc_id in proc_group_tuple[1]):
return_codes = [
Process(None, ident=puid).returncode
for puid in proc_group_tuple[1]
]
try:
return_codes = [
Process(None, ident=puid).returncode
for puid in proc_group_tuple[1]
]
except (ValueError, TypeError):
return_codes = [-1 for _ in proc_group_tuple[1]]
else:
return_codes = [0]
status = (
Expand Down Expand Up @@ -141,3 +167,9 @@ def _(self, request: DragonStopRequest) -> DragonStopResponse:
# pylint: disable-next=no-self-use,unused-argument
def _(self, request: DragonHandshakeRequest) -> DragonHandshakeResponse:
return DragonHandshakeResponse()

@process_request.register
# Deliberately suppressing errors so that overloads have the same signature
# pylint: disable-next=no-self-use,unused-argument
def _(self, request: DragonShutdownRequest) -> DragonShutdownResponse:
return DragonShutdownResponse()
Loading
Loading