-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slurm agent #3005
base: master
Are you sure you want to change the base?
Slurm agent #3005
Changes from 31 commits
421d1b8
1d1f806
5d97126
2e7f0f2
9644b99
e41b181
6db24dc
122c7f1
e9760a7
e68fda9
470637c
1579ab4
0e538f0
8229418
9e6d8a6
e07b09a
3a7eb6d
a815fd9
a3ea014
a109bd8
e5da665
0a3d9f1
1b0f6df
26cc201
16d953e
c743917
e365dee
c1064d4
361fbd1
fc3e34e
5cd58ec
9985305
e00a5db
805548c
34c7c51
4e85b0a
27cdc8d
1f30e04
079d101
1ce5cf8
7b9b38f
b23174a
3727f4d
f291e0b
a465607
7b75a51
d0967bd
f80ccd4
ac25446
aa6e3ae
5fcf6c5
64e9e06
9d06ed1
6cda6ae
3b64ddd
b107b07
37e4ee1
77f4d61
68ab8a6
a968c01
13cd31e
52d73d2
6b27668
5aa4e87
3cc29fe
19de56a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -250,7 +250,10 @@ def __init__( | |||||||
|
||||||||
if task_config is not None: | ||||||||
fully_qualified_class_name = task_config.__module__ + "." + task_config.__class__.__name__ | ||||||||
if not fully_qualified_class_name == "flytekitplugins.pod.task.Pod": | ||||||||
if fully_qualified_class_name not in [ | ||||||||
"flytekitplugins.pod.task.Pod", | ||||||||
"flytekitplugins.slurm.script.task.Slurm", | ||||||||
]: | ||||||||
raise ValueError("TaskConfig can either be empty - indicating simple container task or a PodConfig.") | ||||||||
|
||||||||
# Each instance of NotebookTask instantiates an underlying task with a dummy function that will only be used | ||||||||
|
@@ -259,11 +262,14 @@ def __init__( | |||||||
# errors. | ||||||||
# This seem like a hack. We should use a plugin_class that doesn't require a fake-function to make work. | ||||||||
plugin_class = TaskPlugins.find_pythontask_plugin(type(task_config)) | ||||||||
self._config_task_instance = plugin_class(task_config=task_config, task_function=_dummy_task_func) | ||||||||
# Rename the internal task so that there are no conflicts at serialization time. Technically these internal | ||||||||
# tasks should not be serialized at all, but we don't currently have a mechanism for skipping Flyte entities | ||||||||
# at serialization time. | ||||||||
self._config_task_instance._name = f"_bash.{name}" | ||||||||
if plugin_class.__name__ in ["SlurmShellTask"]: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using fully qualified class name
Consider using a more explicit condition by checking the fully qualified class name instead of just the class name. The current check Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #e8c545 Should Bito avoid suggestions like this for future reviews? (Manage Rules)
|
||||||||
self._config_task_instance = None | ||||||||
else: | ||||||||
self._config_task_instance = plugin_class(task_config=task_config, task_function=_dummy_task_func) | ||||||||
# Rename the internal task so that there are no conflicts at serialization time. Technically these internal | ||||||||
# tasks should not be serialized at all, but we don't currently have a mechanism for skipping Flyte entities | ||||||||
# at serialization time. | ||||||||
self._config_task_instance._name = f"_bash.{name}" | ||||||||
self._script = script | ||||||||
self._script_file = script_file | ||||||||
self._debug = debug | ||||||||
|
@@ -275,7 +281,9 @@ def __init__( | |||||||
super().__init__( | ||||||||
name, | ||||||||
task_config, | ||||||||
task_type=self._config_task_instance.task_type, | ||||||||
task_type=kwargs.pop("task_type") | ||||||||
if self._config_task_instance is None | ||||||||
else self._config_task_instance.task_type, | ||||||||
interface=Interface(inputs=inputs, outputs=outputs), | ||||||||
**kwargs, | ||||||||
) | ||||||||
|
@@ -309,7 +317,10 @@ def script_file(self) -> typing.Optional[os.PathLike]: | |||||||
return self._script_file | ||||||||
|
||||||||
def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: | ||||||||
return self._config_task_instance.pre_execute(user_params) | ||||||||
if self._config_task_instance is None: | ||||||||
return user_params | ||||||||
else: | ||||||||
return self._config_task_instance.pre_execute(user_params) | ||||||||
|
||||||||
def execute(self, **kwargs) -> typing.Any: | ||||||||
""" | ||||||||
|
@@ -367,7 +378,10 @@ def execute(self, **kwargs) -> typing.Any: | |||||||
return None | ||||||||
|
||||||||
def post_execute(self, user_params: ExecutionParameters, rval: typing.Any) -> typing.Any: | ||||||||
return self._config_task_instance.post_execute(user_params, rval) | ||||||||
if self._config_task_instance is None: | ||||||||
return rval | ||||||||
else: | ||||||||
return self._config_task_instance.post_execute(user_params, rval) | ||||||||
|
||||||||
|
||||||||
class RawShellTask(ShellTask): | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# Flytekit Slurm Plugin | ||
|
||
The Slurm agent is designed to integrate Flyte workflows with Slurm-managed high-performance computing (HPC) clusters, enabling users to leverage Slurm's capability of compute resource allocation, scheduling, and monitoring. | ||
|
||
This [guide](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md) provides a concise overview of the design philosophy behind the Slurm agent and explains how to set up a local environment for testing the agent. |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Amazing Graph. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, bro. |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,264 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Slurm Agent Demo | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
> Note: This document is still a work in progress, focusing on demonstrating the initial implementation. It will be updated and refined frequently until a stable version is ready. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
In this guide, we will briefly introduce how to setup an environment to test Slurm agent locally without running the backend service (e.g., flyte agent gRPC server). It covers both basic and advanced use cases: the basic use case involves executing a shell script directly, while the advanced use case enables running user-defined functions on a Slurm cluster. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Table of Content | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [Overview](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#overview) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [Setup a Local Test Environment](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#setup-a-local-test-environment) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [Flyte Client (Localhost)](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [Remote Tiny Slurm Cluster](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#remote-tiny-slurm-cluster) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [SSH Configuration](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#ssh-configuration) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [(Optional) Setup Amazon S3 Bucket](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#optional-setup-amazon-s3-bucket) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [Rich Use Cases](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#rich-use-cases) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [`SlurmTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmtask) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [`SlurmShellTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmshelltask) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* [`SlurmFunctionTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmfunctiontask) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Overview | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Slurm agent on the highest level has three core methods to interact with a Slurm cluster: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
1. `create`: Use `srun` or `sbatch` to run a job on a Slurm cluster | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
2. `get`: Use `scontrol show job <job_id>` to monitor the Slurm job state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
3. `delete`: Use `scancel <job_id>` to cancel the Slurm job (this method is still under test) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
In the simplest form, Slurm agent supports directly running a batch script using `sbatch` on a Slurm cluster as shown below: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Setup a Local Test Environment | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Without running the backend service, we can setup an environment to test Slurm agent locally. The setup consists of two main components: a client (localhost) and a remote tiny Slurm cluster. Then, we need to configure SSH connection to facilitate communication between the two, which relies on `asyncssh`. Additionally, an S3-compatible object storage is needed for advanced use cases and we choose [Amazon S3](https://us-west-2.console.aws.amazon.com/s3/get-started?region=us-west-2&bucketType=general) for demonstration here. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
> Note: A persistence layer (such as S3-compatible object storage) becomes essential as scenarios grow more complex, especially when integrating heterogeneous task types into a workflow in the future. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### Flyte Client (Localhost) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
1. Setup a local Flyte cluster following this [official guide](https://docs.flyte.org/en/latest/community/contribute/contribute_code.html#how-to-setup-dev-environment-for-flytekit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
2. Build a virtual environment (e.g., [poetry](https://python-poetry.org/), [conda](https://docs.conda.io/en/latest/)) and activate it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
3. Clone Flytekit [repo](https://github.com/flyteorg/flytekit), checkout the Slurm agent [PR](https://github.com/flyteorg/flytekit/pull/3005/), and install Flytekit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
git clone https://github.com/flyteorg/flytekit.git | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
gh pr checkout 3005 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
make setup && pip install -e . | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
4. Install Flytekit Slurm agent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
cd plugins/flytekit-slurm/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
pip install -e . | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### Remote Tiny Slurm Cluster | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
To simplify the setup process, we follow this [guide](https://github.com/JiangJiaWei1103/Slurm-101) to configure a single-host Slurm cluster, covering `slurmctld` (the central management daemon) and `slurmd` (the compute node daemon). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
After building a Slurm cluster, we need to install Flytekit and Slurm agent, just as what we did in the previous [section](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
1. Build a virtual environment and activate it (we take `poetry` as an example): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
poetry new demo-env | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
# For running a subshell with the virtual environment activated | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
poetry self add poetry-plugin-shell | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Activate the virtual environment | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
poetry shell | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
2. Clone Flytekit [repo](https://github.com/flyteorg/flytekit), checkout the Slurm agent [PR](https://github.com/flyteorg/flytekit/pull/3005/), and install Flytekit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
git clone https://github.com/flyteorg/flytekit.git | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
gh pr checkout 3005 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
make setup && pip install -e . | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
3. Install Flytekit Slurm agent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
cd plugins/flytekit-slurm/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
pip install -e . | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### SSH Configuration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
To facilitate communication between the Flyte client and the remote Slurm cluster, we setup SSH on the Flyte client side as follows: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
1. Create a new authentication key pair | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssh-keygen -t rsa -b 4096 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
2. Copy the public key into the remote Slurm cluster | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssh-copy-id <username>@<remote_server_ip> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
3. Enable key-based authentication | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ~/.ssh/config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Host <host_alias> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
HostName <remote_server_ip> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Port <ssh_port> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
User <username> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
IdentityFile <path_to_private_key> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Then, run a sanity check to make sure we can connect to the Slurm cluster: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssh <host_alias> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Simple and elegant! | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### (Optional) Setup Amazon S3 Bucket | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
For those interested in advanced use cases, in which user-defined functions are sent and executed on the Slurm cluster, an S3-compitable object storage becomes a necessary component. Following summarizes the setup process: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
1. Click "Create bucket" button (marked in yellow) to create a bucket on this [page](https://us-west-2.console.aws.amazon.com/s3/get-started?region=us-west-2&bucketType=general) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Give the cluster an unique name and leave other settings as default | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
2. Click the user on the top right corner and go to "Security credentials" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
3. Create an access key and save it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
4. Configure AWS access on **both** machines | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ~/.aws/config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
[default] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
region=<your_region> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
# ~/.aws/credentials | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
[default] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
aws_access_key_id=<aws_access_key_id> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
aws_secret_access_key=<aws_secret_access_key> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
Now, both machines have access to the Amazon S3 bucket. Perfect! | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Rich Use Cases | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
In this section, we will demonstrate three supported use cases, ranging from basic to advanced. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### `SlurmTask` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
In the simplest use case, we specify the path to the batch script that is already available on the cluster. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
Suppose we have a batch script as follows: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
#!/bin/bash | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
echo "Hello AWS slurm, run a Flyte SlurmTask!" >> ./echo_aws.txt | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
We use the following python script to test Slurm agent on the [client](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
```python | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit import workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekitplugins.slurm import SlurmRemoteScript, SlurmTask | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
echo_job = SlurmTask( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
name="<task-name>", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_config=SlurmRemoteScript( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
slurm_host="<slurm-host>", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
batch_script_path="<remote-batch-script-path>", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
sbatch_conf={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"partition": "debug", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"job-name": "tiny-slurm", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
@workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def wf() -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
echo_job() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.clis.sdk_in_container import pyflyte | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from click.testing import CliRunner | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
runner = CliRunner() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
path = os.path.realpath(__file__) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(f">>> LOCAL EXEC <<<") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
result = runner.invoke(pyflyte.main, ["run", path, "wf"]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(result.output) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### `SlurmShellTask` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
`SlurmShellTask` offers users the flexibility to define the content of shell scripts. Below is an example of creating a task that executes a Python script already present on the Slurm cluster: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
```python | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit import workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekitplugins.slurm import Slurm, SlurmShellTask | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
shell_task = SlurmShellTask( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
name="test-shell", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
script="""#!/bin/bash | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# We can define sbatch options here, but using sbatch_conf can be more neat | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
echo "Run a Flyte SlurmShellTask...\n" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Run a python script on Slurm | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Activate the virtual env first if any | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
python3 <path_to_python_script> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
""", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_config=Slurm( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
slurm_host="<slurm-host>", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
sbatch_conf={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"partition": "debug", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"job-name": "tiny-slurm", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+191
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is for future not needed now |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
@workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def wf() -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
shell_task() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.clis.sdk_in_container import pyflyte | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from click.testing import CliRunner | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
runner = CliRunner() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
path = os.path.realpath(__file__) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(f">>> LOCAL EXEC <<<") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
result = runner.invoke(pyflyte.main, ["run", path, "wf"]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(result.output) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
### `SlurmFunctionTask` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
In the most advanced use case, `SlurmFunctionTask` allows users to define custom Python functions that are sent to and executed on the Slurm cluster. Following figure demonstrates the process of running a `SlurmFunctionTask`: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
```python | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit import task, workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekitplugins.slurm import SlurmFunction | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
@task( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_config=SlurmFunction( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
slurm_host="<slurm-host>", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
sbatch_conf={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"partition": "debug", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"job-name": "tiny-slurm", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def plus_one(x: int) -> int: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return x + 1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+229
to
+239
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where, task.fn should be replaced with the tasks execution command something like pflyte-fast-execute ... -- pyflyte-execute ... This command is in container.args There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ketan, Great point! This approach allows users to define behaviors for both pre-execution (e.g., setting up env vars, activating a virtual env) and post-execution (e.g., tearing down tmp files). As for the task execution command, as you mentioned, we currently construct the entry points using If that is the case, I propose using Please let me know your thoughts. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding For |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
@task | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def greet(year: int) -> str: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return f"Hello {year}!!!" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
@workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def wf(x: int) -> str: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
x = plus_one(x=x) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
msg = greet(year=x) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return msg | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.clis.sdk_in_container import pyflyte | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
from click.testing import CliRunner | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
runner = CliRunner() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
path = os.path.realpath(__file__) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(f">>> LOCAL EXEC <<<") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3-bucket-uri>", path, "wf", "--x", 2024]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
print(result.output) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .function.agent import SlurmFunctionAgent | ||
from .function.task import SlurmFunction, SlurmFunctionTask | ||
from .script.agent import SlurmScriptAgent | ||
from .script.task import Slurm, SlurmRemoteScript, SlurmShellTask, SlurmTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
is this for
shell task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we define a
SlurmTask
without specifyingcontainer_image
(as the example python script provided above),ctx.serialization_settings
will beNone
. Then, an error is raised which describes thatPythonAutoContainerTask
needs an image.I think this is just a temporary workaround for local test and I'm still pondering how to better handle this issue.