Replies: 6 comments 6 replies
-
Here is an initial design for the new evaluator class that can run experiments on a remote SLURM HPC cluster:
With this design, users should be able to run experiments on the remote SLURM HPC cluster in a similar manner to using the A sample implementation could look like this. It requires paramiko for authorization and import os
import paramiko
import tempfile
from scp import SCPClient
from ema_workbench import BaseEvaluator
import socket
import time
class SLURMEvaluator(BaseEvaluator):
def __init__(self, msis, n_processes, netid, password=None, key_filename=None, remote_env_path=None, is_student=True):
super().__init__(msis)
self.n_processes = n_processes
self.netid = netid
self.password = password
self.key_filename = key_filename
self.remote_env_path = remote_env_path
self.is_student = is_student
self.ssh = self.connect_to_cluster()
def connect_to_cluster(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
bastion_host = 'student-linux.tudelft.nl' if self.is_student else 'linux-bastion.tudelft.nl'
# First, connect to the bastion server.
bastion = paramiko.SSHClient()
bastion.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.key_filename:
bastion.connect(bastion_host, username=self.netid, key_filename=self.key_filename)
else:
bastion.connect(bastion_host, username=self.netid, password=self.password)
# Set up a socket to forward the connection to the target server through the bastion server.
bastion_transport = bastion.get_transport()
channel = bastion_transport.open_channel(kind='direct-tcpip',
dest_addr=('login.delftblue.tudelft.nl', 22),
src_addr=('localhost', 0))
# Connect to the target server using the forwarded connection.
if self.key_filename:
ssh.connect('login.delftblue.tudelft.nl', username=self.netid, key_filename=self.key_filename, sock=channel)
else:
ssh.connect('login.delftblue.tudelft.nl', username=self.netid, password=self.password, sock=channel)
# Close the connection to the bastion server.
bastion.close()
return ssh
def create_or_update_remote_env(self):
# This method should create or update the remote environment based on the provided specifications.
# For example, you can use a remote 'environment.yml' file and use 'conda env update' or use 'pip' to install packages.
pass
def transfer_files(self, local_path, remote_path, is_upload=True):
with SCPClient(self.ssh.get_transport()) as scp:
if is_upload:
scp.put(local_path, remote_path)
else:
scp.get(remote_path, local_path)
def submit_job(self, batch_script):
stdin, stdout, stderr = self.ssh.exec_command(f'sbatch {batch_script}')
job_id = stdout.read().decode('utf-8').strip().split()[-1]
return job_id
def check_job_status(self, job_id):
stdin, stdout, stderr = self.ssh.exec_command(f'squeue -u {self.netid} -j {job_id}')
status = stdout.readlines()
return len(status) > 1 # If there's more than one line, the job is still running.
def generate_batch_script(self, job_name, commands, output_file, error_file, time_limit=None, partition=None, nodes=None):
script = f"""#!/bin/bash
#SBATCH --job-name={job_name}
#SBATCH --output={output_file}
#SBATCH --error={error_file}
"""
if time_limit:
script += f"#SBATCH --time={time_limit}\n"
if partition:
script += f"#SBATCH --partition={partition}\n"
if nodes:
script += f"#SBATCH --nodes={nodes}\n"
script += "source activate {}\n".format(self.remote_env_path) # Assuming a conda environment
script += "\n".join(commands)
return script
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
add_tasks(self.n_processes, self._pool, ex_gen, callback)
# Transfer required model and data files to the remote cluster.
local_model_file = "path/to/local/model/file" # Replace with the actual path.
remote_model_file = f"/home/{self.netid}/remote_model_file" # Replace with the desired remote path.
self.transfer_files(local_model_file, remote_model_file)
local_data_file = "path/to/local/data/file" # Replace with the actual path.
remote_data_file = f"/home/{self.netid}/remote_data_file" # Replace with the desired remote path.
self.transfer_files(local_data_file, remote_data_file)
# Generate the batch script to run the experiments on the remote cluster.
job_name = "ema_workbench_experiment"
output_file = "ema_workbench_out.txt"
error_file = "ema_workbench_err.txt"
commands = [
f"python remote_experiment_script.py --model {remote_model_file} --data {remote_data_file}",
]
batch_script_content = self.generate_batch_script(job_name, commands, output_file, error_file)
local_batch_script = tempfile.NamedTemporaryFile(delete=False)
local_batch_script.write(batch_script_content.encode())
local_batch_script.close()
# Transfer the batch script to the remote cluster.
remote_batch_script = f"/home/{self.netid}/{os.path.basename(local_batch_script.name)}"
self.transfer_files(local_batch_script.name, remote_batch_script)
# Submit the job to the remote cluster and wait for it to complete.
job_id = self.submit_job(remote_batch_script)
while self.check_job_status(job_id):
time.sleep(60) # Wait for 1 minute before checking the job status again.
# Transfer the results back to the local machine.
local_result_file = "path/to/local/result/file" # Replace with the actual path.
remote_result_file = f"/home/{self.netid}/{output_file}"
self.transfer_files(local_result_file, remote_result_file, is_upload=False)
# Pass the results to the callback function.
with open(local_result_file, 'r') as f:
results = f.read()
callback(results)
# Clean up temporary files on the remote cluster.
self.ssh.exec_command(f"rm {remote_model_file} {remote_data_file} {remote_batch_script} {remote_result_file}")
# Close the SSH connection.
self.ssh.close() This implementation completes the @quaquel I will still do a lot of experimentation with this, but let me know if you have any feedback in this stage! |
Beta Was this translation helpful? Give feedback.
-
You are talking the problem in a different way from what I had in mind. This is not right or wrong, but just an observation. You solve the problem at the level of SLURM. I have no idea how this would work out in case of wanting to claim multiple nodes etc. Also, how would this work out for the optimization, because this will call perform_experiments repeatedly? My own thinking was to solve the problem at the level of MPI, which would mean SLURM has already allocated one or more nodes. you would get something along the lines of import os
import mpi4py
from mpi4py.futures import MPIPoolExecutor
from mpi4py import MPI
from ema_workbench.em_framework.evaluators import BaseEvaluator
from ema_workbench.em_framework.evaluators import BaseEvaluator
from ema_workbench.em_framework.ema_multiprocessing import setup_working_directories
from ema_workbench.em_framework.util import NamedObjectMap
from ema_workbench.em_framework.points import experiment_generator
from ema_workbench.em_framework.experiment_runner import ExperimentRunner
from ema_workbench.em_framework.model import AbstractModel
from ema_workbench.util import EMAError, get_module_logger, ema_logging
from ema_mpi import setup_logging, MPIFileHandler
def worker(experiment):
"""the worker function for executing an individual experiment
Parameters
----------
experiment : dict
"""
global experiment_runner
return experiment, experiment_runner.run_experiment(experiment)
def initializer(*args):
"""initializer for a worker process
Parameters
----------
models : list of AbstractModel instances
This function initializes the worker. This entails
* initializing the experiment runner
* setting up the working directory
* setting up the logging
"""
global experiment_runner
# # current_process = multiprocessing.current_process()
models, root_dir, loglevel = args
# setup the experiment runner
msis = NamedObjectMap(AbstractModel)
msis.extend(models)
experiment_runner = ExperimentRunner(msis)
# setup the logging
setup_logging(loglevel)
# ensure that log_level filtering happens client side
# check how this is done elsewhere in the workbench
# setup the working directories
# make a root temp
# copy each model directory
tmpdir = setup_working_directories(models, root_dir)
# register a cleanup finalizer function
# remove the root temp
if tmpdir:
pass
# multiprocessing.util.Finalize(None, finalizer,
# args=(os.path.abspath(tmpdir),),
# exitpriority=10)
class MPI4PyEvaluator(BaseEvaluator):
"""evaluator for experiments using a multiprocessing pool
Parameters
----------
msis : collection of models
searchover : {None, 'levers', 'uncertainties'}, optional
to be used in combination with platypus
Raises
------
ValueError
"""
reporting_frequency = 3
def __init__(self, msis):
super(MPI4PyEvaluator, self).__init__(msis)
def __enter__(self):
self.initialize()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.finalize()
if exc_type is not None:
return False
def initialize(self):
"""initialize the evaluator"""
# organize logging
try:
loglevel = ema_logging._rootlogger.getEffectiveLevel()
except AttributeError:
loglevel = 30
# sort out working directories
for model in self._msis:
try:
model.working_directory
except AttributeError:
self.root_dir = None
break
else:
random_part = [
random.choice(string.ascii_letters + string.digits) for _ in range(5)
]
random_part = "".join(random_part)
# not sure whether we want and need this on
# mpi because it means all processes
# will write to the nsf?
self.root_dir = os.path.abspath("tmp" + random_part)
os.makedirs(self.root_dir)
handler = MPIFileHandler("test.log")
self._executor = MPIPoolExecutor(
initializer=initializer, initargs=(self._msis, self.root_dir, loglevel)
)
_logger.info("MPIPoolExecutor started")
# handler = MPIFileHandler("test.log")
def finalize(self):
"""finalize the evaluator"""
self._executor.shutdown()
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
"""used by ema_workbench"""
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
results = self._executor.map(
worker,
ex_gen,
)
for entry in results:
callback(*entry)
with import logging
import os
import sys
from mpi4py import MPI
from ema_workbench import ema_logging
class MPIIOStream(object):
"""
A very basic MPI stream handler for synchronised I/O.
"""
def __init__(self, filename, comm, mode):
print(comm)
print(os.path.abspath(filename))
# self._file = MPI.File.Open(comm, filename, mode)
# self._file.Set_atomicity(True)
def write(self, msg):
# if for some reason we don't have a unicode string...
try:
msg = msg.encode()
except AttributeError:
pass
self._file.Write_shared(msg)
def sync(self):
"""
Synchronise the processes
"""
self._file.Sync()
def close(self):
self.sync()
self._file.Close()
class MPIFileHandler(logging.StreamHandler):
"""
A basic MPI file handler for writing log files.
Internally opens a synchronised MPI I/O stream via MPIIOStream.
Ideas and some code from:
* https://groups.google.com/forum/#!topic/mpi4py/SaNzc8bdj6U
* https://gist.github.com/JohnCEarls/8172807
* https://stackoverflow.com/questions/45680050/cannot-write-to-shared-mpi-file-with-mpi4py
"""
def __init__(
self, filename, mode=MPI.MODE_WRONLY | MPI.MODE_CREATE, comm=MPI.COMM_WORLD
):
self.filename = filename
self.mode = mode
self.comm = comm
super(MPIFileHandler, self).__init__(self._open())
def _open(self):
stream = MPIIOStream(self.filename, self.comm, self.mode)
return stream
def close(self):
if self.stream:
self.stream.close()
self.stream = None
def emit(self, record):
"""
Emit a record.
We have to override emit, as the logging.StreamHandler has 2 calls
to 'write'. The first for the message, and the second for the
terminator. This posed a problem for mpi, where a second process
could call 'write' in between these two calls and create a
conjoined log message.
"""
msg = self.format(record)
self.stream.write("{}{}".format(msg, self.terminator))
self.flush()
def setup_logging(log_level):
"""helper function for enabling logging from the workers to the main
process
Parameters
----------
log_level : int
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# # create a logger
logger = logging.getLogger(f"{ema_logging.LOGGER_NAME}.{rank}")
ema_logging._logger = logger
logger.handlers = []
# add the handler
handler = MPIFileHandler("test.log")
# handler.setFormatter(ema_logging.LOG_FORMAT)
# logger.addHandler(handler)
# set the log_level
# logger.setLevel(log_level)
|
Beta Was this translation helpful? Give feedback.
-
A few considerations I'm having right now:
|
Beta Was this translation helpful? Give feedback.
-
I think both questions go back to whether you want to solve it at the level of the scheduler (e.g., SLURM), or given a set of nodes solve the distribution of tasks over those nodes (e.g., through MPI or ipyparallel). |
Beta Was this translation helpful? Give feedback.
-
Not really that exciting, but I figured out the boring stuff like moving files to and from DelftBlue, submitting jobs with sbatch, and getting data back. Nothing really complicated yet. However, I now only can use the "Education share", which is only a small portion and has long waiting times.
Can I get access to the TPM research share (probably |
Beta Was this translation helpful? Give feedback.
-
Done
To-do
Abandoned
Dev branch: https://github.com/quaquel/EMAworkbench/tree/multi-node-development |
Beta Was this translation helpful? Give feedback.
-
In the next two months I will be developing an evaluator that works with multi-node systems, including HPC clusters. This discussion thread is a place to discuss general ideas about this implementation.
Beta Was this translation helpful? Give feedback.
All reactions