-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slurm agent #3005
base: master
Are you sure you want to change the base?
Slurm agent #3005
Conversation
Signed-off-by: jiangjiawei1103 <waynechuang97@gmail.com>
Signed-off-by: jiangjiawei1103 <waynechuang97@gmail.com>
Signed-off-by: jiangjiawei1103 <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3005 +/- ##
===========================================
- Coverage 78.20% 46.62% -31.58%
===========================================
Files 292 206 -86
Lines 25401 21786 -3615
Branches 2779 2839 +60
===========================================
- Hits 19864 10157 -9707
- Misses 4726 11114 +6388
+ Partials 811 515 -296 ☔ View full report in Codecov by Sentry. |
Successfully submit and run the user-defined task as a normal python function on a remote Slurm cluster. 1. Inherit from PythonFunctionTask instead of PythonTask 2. Transfer the task module through sftp 3. Interact with amazon s3 bucket on both localhost and Slurm cluster Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Specifying `--raw-output-data-prefix` option handles task_module download. Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
Add `ssh_conf` filed to let users specify connection secret Note that reconnection is done in both `get` and `delete`. This is just a temporary workaround. Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
For data scientists and MLEs developing flyte wf with Slurm agent, they don't actually need to know ssh connection details. We assume they only need to specify which Slurm cluster to use by hostname. Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
1. Write user-defined batch script to a tmp file 2. Transfer the batch script through sftp 3. Construct sbatch command to run on Slurm cluster Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
1. Remove SFTP for batch script transfer * Assume Slurm batch script is present on Slurm cluster 2. Support directly specifying a remote batch script path Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
Signed-off-by: pryce-turner <pryce.turner@gmail.com>
Code Review Agent Run Status
|
Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Code Review Agent Run Status
|
…onTask Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@task(
task_config=SlurmFunction(
slurm_host="aws2",
sbatch_conf={
...
},
script="""#!/bin/bash
# == Pre-Execution ==
echo "Hello, world!"
# Setup env vars
export MY_ENV_VAR=123
# Activate virtual env
. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate
# == Execute Flyte Task Function ==
{task.fn}
# == Post-Execution ==
echo "Success!!"
"""
)
)
def plus_one(x: int) -> int:
print(os.getenv("MY_ENV_VAR"))
return x + 1
@task(
task_config=SlurmFunction(
slurm_host="aws2",
sbatch_conf={
...
},
script="""#!/bin/bash
# == Pre-Execution ==
echo "Let's make this task fail..."
# Setup env vars
export MY_ENV_VAR=123
# Activate virtual env
. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate
# == Execute Flyte Task Function ==
{task.fn}
# == Post-Execution ==
echo "Success!!"
"""
)
)
def divide_zero(x: int) -> float:
print(os.getenv("MY_ENV_VAR"))
return x / 0 As can be observed, the root cause
|
Code Review Agent Run Status
|
1. Make SSH `host` and `username` required fields 2. Support SSH connection based on the default OpenSSH client config file `~/.ssh/config` 3. Support SSH connection via public key auth either by user-specified `client_keys` or the secret for key `FLYTE_SLURM_PRIVATE_KEY` Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Code Review Agent Run Status
|
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Code Review Agent Run Status
|
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Code Review Agent Run #e8c545Actionable Suggestions - 8
Additional Suggestions - 2
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
name=name, | ||
task_config=task_config, | ||
# Dummy interface, will support this after discussion | ||
interface=Interface(None, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface initialization with None
values in SlurmTask.__init__()
may cause issues. Consider defining a proper interface with input/output types or providing a clearer explanation for using None
values.
Code suggestion
Check the AI-generated fix before applying
interface=Interface(None, None), | |
interface=Interface(inputs={}, outputs={}), |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
# tasks should not be serialized at all, but we don't currently have a mechanism for skipping Flyte entities | ||
# at serialization time. | ||
self._config_task_instance._name = f"_bash.{name}" | ||
if plugin_class.__name__ in ["SlurmShellTask"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more explicit condition by checking the fully qualified class name instead of just the class name. The current check plugin_class.__name__ in ["SlurmShellTask"]
may be fragile if class names change or if there are name collisions.
Code suggestion
Check the AI-generated fix before applying
if plugin_class.__name__ in ["SlurmShellTask"]: | |
plugin_class_fqn = f"{plugin_class.__module__}.{plugin_class.__name__}" | |
if plugin_class_fqn in ["flytekitplugins.slurm.script.task.SlurmShellTask"]: |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
_conn: Optional[SSHClientConnection] = None | ||
|
||
# Tmp remote path of the batch script | ||
REMOTE_PATH = "/tmp/task.slurm" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using hardcoded temporary file path /tmp/task.slurm
could be insecure. Consider using tempfile.mkstemp()
instead.
Code suggestion
Check the AI-generated fix before applying
REMOTE_PATH = "/tmp/task.slurm" | |
_, REMOTE_PATH = tempfile.mkstemp(suffix='.slurm') |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
_conn: Optional[SSHClientConnection] = None | ||
|
||
# Tmp remote path of the batch script | ||
REMOTE_PATH = "/tmp/echo_shell.slurm" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded temporary file path /tmp/echo_shell.slurm
could pose a security risk. Consider using tempfile.mkstemp()
to generate secure temporary files.
Code suggestion
Check the AI-generated fix before applying
REMOTE_PATH = "/tmp/echo_shell.slurm" | |
_tmp_fd, REMOTE_PATH = tempfile.mkstemp(suffix='.slurm') | |
os.close(_tmp_fd) |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
# Determine the current flyte phase from Slurm job state | ||
job_state = "running" | ||
for o in job_res.stdout.split(" "): | ||
if "JobState" in o: | ||
job_state = o.split("=")[1].strip().lower() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the case where StdOut
path is not found in the job output. Currently, msg
will be undefined if the StdOut
path is not present in the output, which could lead to a runtime error. Consider initializing msg
with a default value.
Code suggestion
Check the AI-generated fix before applying
# Determine the current flyte phase from Slurm job state | |
job_state = "running" | |
for o in job_res.stdout.split(" "): | |
if "JobState" in o: | |
job_state = o.split("=")[1].strip().lower() | |
# Determine the current flyte phase from Slurm job state | |
job_state = "running" | |
msg = "No output available" | |
for o in job_res.stdout.split(" "): | |
if "JobState" in o: | |
job_state = o.split("=")[1].strip().lower() |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
for arg in batch_script_args: | ||
cmd.append(arg) | ||
|
||
cmd = " ".join(cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using shlex.join()
instead of " ".join()
for shell command construction to properly handle arguments containing spaces or special characters.
Code suggestion
Check the AI-generated fix before applying
cmd = " ".join(cmd) | |
import shlex | |
cmd = shlex.join(cmd) |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
# Write the private key to a local path | ||
# This may not be a good practice... | ||
with open("./slurm_private_key", "w") as f: | ||
f.write(default_client_key) | ||
client_keys.append("./slurm_private_key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more secure approach for storing the private key. Writing sensitive data to a local file at ./slurm_private_key
could pose security risks. Consider using environment variables or a secure key management system.
Code suggestion
Check the AI-generated fix before applying
# Write the private key to a local path | |
# This may not be a good practice... | |
with open("./slurm_private_key", "w") as f: | |
f.write(default_client_key) | |
client_keys.append("./slurm_private_key") | |
# Store private key in protected directory with secure permissions | |
key_dir = os.path.expanduser("~/.ssh") | |
os.makedirs(key_dir, mode=0o700, exist_ok=True) | |
key_path = os.path.join(key_dir, "slurm_private_key") | |
with open(key_path, "w") as f: | |
os.chmod(key_path, 0o600) | |
f.write(default_client_key) | |
client_keys.append(key_path) |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
with open("./slurm_private_key", "w") as f: | ||
f.write(default_client_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using blocking open()
in an async function can impact performance. Consider using aiofiles
for async file operations.
Code suggestion
Check the AI-generated fix before applying
with open("./slurm_private_key", "w") as f: | |
f.write(default_client_key) | |
async with aiofiles.open("./slurm_private_key", "w") as f: | |
await f.write(default_client_key) |
Code Review Run #e8c545
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Support passing files across multiple `SlurmShellTask` Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Code Review Agent Run #0e549bActionable Suggestions - 1
Review Details
|
msg = msg_res.stdout | ||
cur_phase = convert_to_flyte_phase(job_state) | ||
|
||
return Resource(phase=cur_phase, message=msg, outputs=resource_meta.outputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating resource_meta.outputs
before using it in the Resource
constructor. The outputs field could potentially be None
which might cause issues downstream.
Code suggestion
Check the AI-generated fix before applying
return Resource(phase=cur_phase, message=msg, outputs=resource_meta.outputs) | |
outputs = resource_meta.outputs if resource_meta.outputs is not None else {} | |
return Resource(phase=cur_phase, message=msg, outputs=outputs) |
Code Review Run #0e549b
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Tracking issue
flyteorg/flyte#5634
Why are the changes needed?
What changes were proposed in this pull request?
Implement the Slurm agent, which submits the user-defined flytekit task to a remote Slurm cluster to run. Following describe three core methods:
create
: Submit a Slurm job withsbatch
to run a batch script on Slurm clusterget
: Check the Slurm job statedelete
(haven't been tested): Cancel the Slurm jobHow was this patch tested?
We test
create
andget
in the development environment described as follows:flytekit
installedslurmctld
andslurmd
runningasyncssh
Suppose we have a batch script to run on Slurm cluster:
We use the following python script to test Slurm agent on the client side:
The test result is shown as follows:

Setup process
As stated above
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
Implementation of a new Slurm agent for executing Flyte tasks on remote clusters, featuring job submission, monitoring, and cancellation capabilities via SSH. The enhancement adds support for output location interpolation and improved job metadata handling, with focus on dynamic script generation and better output management in the SlurmScriptAgent class.Unit tests added: False
Estimated effort to review (1-5, lower is better): 3