Skip to content

Commit

Permalink
use conda_subprocess package
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Jun 25, 2024
1 parent ea8e174 commit adeca66
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
11 changes: 8 additions & 3 deletions pympipool/shared/communication.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from socket import gethostname
from typing import Optional

import cloudpickle
import zmq
Expand Down Expand Up @@ -66,14 +67,14 @@ def bind_to_random_port(self):
"""
return self._socket.bind_to_random_port("tcp://*")

def bootup(self, command_lst: list[str]):
def bootup(self, command_lst: list[str], prefix_name: Optional[str] = None, prefix_path: Optional[str] = None,):
"""
Boot up the client process to connect to the SocketInterface.
Args:
command_lst (list): list of strings to start the client process
"""
self._interface.bootup(command_lst=command_lst)
self._interface.bootup(command_lst=command_lst, prefix_name=prefix_name, prefix_path=prefix_path)

def shutdown(self, wait: bool = True):
result = None
Expand All @@ -99,6 +100,8 @@ def interface_bootup(
command_lst: list[str],
connections,
hostname_localhost: bool = False,
prefix_name: Optional[str] = None,
prefix_path: Optional[str] = None,
):
"""
Start interface for ZMQ communication
Expand All @@ -114,6 +117,8 @@ def interface_bootup(
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
prefix_name (str): name of the conda environment to initialize
prefix_path (str): path of the conda environment to initialize
Returns:
pympipool.shared.communication.SocketInterface: socket interface for zmq communication
Expand All @@ -128,7 +133,7 @@ def interface_bootup(
"--zmqport",
str(interface.bind_to_random_port()),
]
interface.bootup(command_lst=command_lst)
interface.bootup(command_lst=command_lst, prefix_name=prefix_name, prefix_path=prefix_path)
return interface


Expand Down
15 changes: 3 additions & 12 deletions pympipool/shared/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ def execute_parallel_tasks(
interface = interface_bootup(
command_lst=_get_backend_path(
cores=cores,
prefix_path=prefix_path,
prefix_name=prefix_name,
),
connections=interface_class(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
prefix_path=prefix_path,
prefix_name=prefix_name,
)
if init_function is not None:
interface.send_dict(
Expand Down Expand Up @@ -441,26 +441,17 @@ def get_command_path(executable: str) -> str:

def _get_backend_path(
cores: int,
prefix_name: Optional[str] = None,
prefix_path: Optional[str] = None,
) -> list:
"""
Get command to call backend as a list of two strings
Args:
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
prefix_name (str): name of the conda environment to initialize
prefix_path (str): path of the conda environment to initialize
Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
if prefix_name is not None:
command_lst = ["conda", "run", "-n", prefix_name, sys.executable]
elif prefix_path is not None:
command_lst = ["conda", "run", "-p", prefix_path, sys.executable]
else:
command_lst = [sys.executable]
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst += [get_command_path(executable="interactive_parallel.py")]
elif cores > 1:
Expand Down
22 changes: 16 additions & 6 deletions pympipool/shared/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@ def __init__(
)
self._process = None

def bootup(self, command_lst: list[str]):
self._process = subprocess.Popen(
args=self.generate_command(command_lst=command_lst),
cwd=self._cwd,
stdin=subprocess.DEVNULL,
)
def bootup(self, command_lst: list[str], prefix_name: Optional[str] = None, prefix_path: Optional[str] = None,):
if prefix_name is None and prefix_path is None:
self._process = subprocess.Popen(
args=self.generate_command(command_lst=command_lst),
cwd=self._cwd,
stdin=subprocess.DEVNULL,
)
else:
import conda_subprocess
self._process = conda_subprocess.Popen(
args=self.generate_command(command_lst=command_lst),
cwd=self._cwd,
stdin=subprocess.DEVNULL,
prefix_path=prefix_path,
prefix_name=prefix_name,
)

def generate_command(self, command_lst: list[str]) -> list[str]:
return command_lst
Expand Down
22 changes: 11 additions & 11 deletions tests/test_executor_conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def test_python_executor_conda_path(self):
self.assertEqual(fs.result(), self.env_path)
self.assertTrue(fs.done())

# def test_python_executor_conda_name(self):
# with Executor(
# max_cores=1,
# hostname_localhost=True,
# backend="local",
# conda_environment_name=self.env_name,
# ) as exe:
# cloudpickle_register(ind=1)
# fs = exe.submit(get_conda_env_prefix)
# self.assertEqual(fs.result(), self.env_path)
# self.assertTrue(fs.done())
def test_python_executor_conda_name(self):
with Executor(
max_cores=1,
hostname_localhost=True,
backend="local",
conda_environment_name=self.env_name,
) as exe:
cloudpickle_register(ind=1)
fs = exe.submit(get_conda_env_prefix)
self.assertEqual(fs.result(), self.env_path)
self.assertTrue(fs.done())

0 comments on commit adeca66

Please sign in to comment.