Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slurm agent #3005

Open
wants to merge 66 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
421d1b8
Add slurm plugin blank components
JiangJiaWei1103 Dec 14, 2024
1d1f806
feat: Add naive slurm agent create and get with rest api
JiangJiaWei1103 Dec 16, 2024
5d97126
Use asyncssh instead of REST API
JiangJiaWei1103 Dec 17, 2024
2e7f0f2
Test ssh communication and run sbatch
JiangJiaWei1103 Dec 18, 2024
9644b99
Add delete method and support slurm job state
JiangJiaWei1103 Dec 19, 2024
e41b181
feat: Submit and run SlurmTask on a remote Slurm cluster
JiangJiaWei1103 Dec 27, 2024
6db24dc
refactor: Remove redundant task_module transfer
JiangJiaWei1103 Dec 28, 2024
122c7f1
refactor: Remove redundant env var
JiangJiaWei1103 Dec 28, 2024
e9760a7
docs: Add env setup guide for local test
JiangJiaWei1103 Dec 30, 2024
e68fda9
docs: Add links and figures
JiangJiaWei1103 Dec 30, 2024
470637c
docs: Fix commit sha
JiangJiaWei1103 Dec 30, 2024
1579ab4
docs: Fix commit sha for demo guide
JiangJiaWei1103 Dec 30, 2024
0e538f0
docs: Fix links
JiangJiaWei1103 Dec 30, 2024
8229418
feat: Support SSH config in task config
JiangJiaWei1103 Dec 31, 2024
9e6d8a6
docs: Include ssh config in demo example
JiangJiaWei1103 Dec 31, 2024
e07b09a
refactor: Reduce ssh_conf option to slurm_host only
JiangJiaWei1103 Jan 7, 2025
3a7eb6d
feat: Support Slurm agent with ShellTask
JiangJiaWei1103 Jan 7, 2025
a815fd9
feat: Simplify Slurm job submission logic
JiangJiaWei1103 Jan 9, 2025
a3ea014
Added script args to agent and task
pryce-turner Jan 10, 2025
a109bd8
Add asyncssh to dependencies
JiangJiaWei1103 Jan 11, 2025
e5da665
docs: Update setup and demo for a basic use case
JiangJiaWei1103 Jan 11, 2025
0a3d9f1
docs: Update basic arch figure path
JiangJiaWei1103 Jan 11, 2025
1b0f6df
docs: Fix typo and hyperlink
JiangJiaWei1103 Jan 11, 2025
26cc201
fix: A tmp workaround to test agent locally without container_image
JiangJiaWei1103 Jan 11, 2025
16d953e
feat: Support user-defined batch script content with SlurmShellTask
JiangJiaWei1103 Jan 14, 2025
c743917
feat: Fall back to PythonTask for naive use cases
JiangJiaWei1103 Jan 15, 2025
e365dee
refactor: Define Slurm as a base task config and extend for remote sc…
JiangJiaWei1103 Jan 15, 2025
c1064d4
feat: Support PythonFunctionTask and reorganize agent structure
JiangJiaWei1103 Jan 16, 2025
361fbd1
Use poetry virtual env to avoid contamination
JiangJiaWei1103 Jan 22, 2025
fc3e34e
docs: Complete local test env setup process
JiangJiaWei1103 Jan 23, 2025
5cd58ec
docs: Add use cases ranging from basic to advanced
JiangJiaWei1103 Jan 23, 2025
9985305
feat: Add a script option for the Slurm function task
JiangJiaWei1103 Feb 6, 2025
e00a5db
fix: Avoid attaching async resource to different event loops
JiangJiaWei1103 Feb 6, 2025
805548c
Merge branch 'master' into slurm-agent-dev
Future-Outlier Feb 6, 2025
34c7c51
use await self._connect(slurm_host) in slurm agent
Future-Outlier Feb 12, 2025
4e85b0a
change
Future-Outlier Feb 12, 2025
27cdc8d
print more info
Future-Outlier Feb 13, 2025
1f30e04
use logger
Future-Outlier Feb 13, 2025
079d101
print more infor
Future-Outlier Feb 13, 2025
1ce5cf8
print
Future-Outlier Feb 13, 2025
7b9b38f
Use sbatch for running Slurm function task
JiangJiaWei1103 Feb 13, 2025
b23174a
update
Future-Outlier Feb 14, 2025
3727f4d
push
Future-Outlier Feb 14, 2025
f291e0b
feat: Show stdout and stderr msg of the Slurm cluster
JiangJiaWei1103 Feb 14, 2025
a465607
feat: Show stdout and stderr msg of the Slurm cluster for SlurmFuncti…
JiangJiaWei1103 Feb 14, 2025
7b75a51
feat: Make an SSH connetion based on client config file or ssh_config
JiangJiaWei1103 Feb 16, 2025
d0967bd
Clarify SSH connection logic
JiangJiaWei1103 Feb 17, 2025
f80ccd4
feat: Interpolate the script with dynamic input values
JiangJiaWei1103 Feb 18, 2025
ac25446
feat: Interpolate the script with dynamic output values
JiangJiaWei1103 Feb 18, 2025
aa6e3ae
Merge branch 'master' into slurm-agent-dev
Future-Outlier Feb 19, 2025
5fcf6c5
add assertion
Future-Outlier Feb 19, 2025
64e9e06
update
Future-Outlier Feb 19, 2025
9d06ed1
update
Future-Outlier Feb 19, 2025
6cda6ae
Fix Script agent bug
Future-Outlier Feb 19, 2025
3b64ddd
agent service for shell task
Future-Outlier Feb 20, 2025
b107b07
Remove remote path to avoid race condition
Future-Outlier Feb 20, 2025
37e4ee1
Revert agent server change
Future-Outlier Feb 20, 2025
77f4d61
use key val to run ssh config
Future-Outlier Feb 20, 2025
68ab8a6
update
Future-Outlier Feb 20, 2025
a968c01
use _get_or_create_ssh_connection
Future-Outlier Feb 20, 2025
13cd31e
update
Future-Outlier Feb 20, 2025
52d73d2
use SlurmCluster and hash
Future-Outlier Feb 20, 2025
6b27668
updagte
Future-Outlier Feb 20, 2025
5aa4e87
update
Future-Outlier Feb 20, 2025
3cc29fe
update
Future-Outlier Feb 20, 2025
19de56a
refactor: Simplify validation process and clean up legacy code
JiangJiaWei1103 Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/extend/backend/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class AsyncAgentExecutorMixin:

def execute(self: PythonTask, **kwargs) -> LiteralMap:
ctx = FlyteContext.current_context()
ss = ctx.serialization_settings or SerializationSettings(ImageConfig())
ss = ctx.serialization_settings or SerializationSettings(ImageConfig.auto_default_image())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?
is this for shell task?

Copy link
Contributor Author

@JiangJiaWei1103 JiangJiaWei1103 Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we define a SlurmTask without specifying container_image (as the example python script provided above), ctx.serialization_settings will be None. Then, an error is raised which describes that PythonAutoContainerTask needs an image.

I think this is just a temporary workaround for local test and I'm still pondering how to better handle this issue.

output_prefix = ctx.file_access.get_random_remote_directory()
self.resource_meta = None

Expand Down
4 changes: 2 additions & 2 deletions flytekit/extend/backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def convert_to_flyte_phase(state: str) -> TaskExecution.Phase:
Convert the state from the agent to the phase in flyte.
"""
state = state.lower()
if state in ["failed", "timeout", "timedout", "canceled", "skipped", "internal_error"]:
if state in ["failed", "timeout", "timedout", "canceled", "cancelled", "skipped", "internal_error"]:
return TaskExecution.FAILED
elif state in ["done", "succeeded", "success"]:
elif state in ["done", "succeeded", "success", "completed"]:
return TaskExecution.SUCCEEDED
elif state in ["running", "terminating"]:
return TaskExecution.RUNNING
Expand Down
32 changes: 23 additions & 9 deletions flytekit/extras/tasks/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ def __init__(

if task_config is not None:
fully_qualified_class_name = task_config.__module__ + "." + task_config.__class__.__name__
if not fully_qualified_class_name == "flytekitplugins.pod.task.Pod":
if fully_qualified_class_name not in [
"flytekitplugins.pod.task.Pod",
"flytekitplugins.slurm.script.task.Slurm",
]:
raise ValueError("TaskConfig can either be empty - indicating simple container task or a PodConfig.")

# Each instance of NotebookTask instantiates an underlying task with a dummy function that will only be used
Expand All @@ -259,11 +262,14 @@ def __init__(
# errors.
# This seem like a hack. We should use a plugin_class that doesn't require a fake-function to make work.
plugin_class = TaskPlugins.find_pythontask_plugin(type(task_config))
self._config_task_instance = plugin_class(task_config=task_config, task_function=_dummy_task_func)
# Rename the internal task so that there are no conflicts at serialization time. Technically these internal
# tasks should not be serialized at all, but we don't currently have a mechanism for skipping Flyte entities
# at serialization time.
self._config_task_instance._name = f"_bash.{name}"
if plugin_class.__name__ in ["SlurmShellTask"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using fully qualified class name

Consider using a more explicit condition by checking the fully qualified class name instead of just the class name. The current check plugin_class.__name__ in ["SlurmShellTask"] may be fragile if class names change or if there are name collisions.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if plugin_class.__name__ in ["SlurmShellTask"]:
plugin_class_fqn = f"{plugin_class.__module__}.{plugin_class.__name__}"
if plugin_class_fqn in ["flytekitplugins.slurm.script.task.SlurmShellTask"]:

Code Review Run #e8c545


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

self._config_task_instance = None
else:
self._config_task_instance = plugin_class(task_config=task_config, task_function=_dummy_task_func)
# Rename the internal task so that there are no conflicts at serialization time. Technically these internal
# tasks should not be serialized at all, but we don't currently have a mechanism for skipping Flyte entities
# at serialization time.
self._config_task_instance._name = f"_bash.{name}"
self._script = script
self._script_file = script_file
self._debug = debug
Expand All @@ -275,7 +281,9 @@ def __init__(
super().__init__(
name,
task_config,
task_type=self._config_task_instance.task_type,
task_type=kwargs.pop("task_type")
if self._config_task_instance is None
else self._config_task_instance.task_type,
interface=Interface(inputs=inputs, outputs=outputs),
**kwargs,
)
Expand Down Expand Up @@ -309,7 +317,10 @@ def script_file(self) -> typing.Optional[os.PathLike]:
return self._script_file

def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters:
return self._config_task_instance.pre_execute(user_params)
if self._config_task_instance is None:
return user_params
else:
return self._config_task_instance.pre_execute(user_params)

def execute(self, **kwargs) -> typing.Any:
"""
Expand Down Expand Up @@ -367,7 +378,10 @@ def execute(self, **kwargs) -> typing.Any:
return None

def post_execute(self, user_params: ExecutionParameters, rval: typing.Any) -> typing.Any:
return self._config_task_instance.post_execute(user_params, rval)
if self._config_task_instance is None:
return rval
else:
return self._config_task_instance.post_execute(user_params, rval)


class RawShellTask(ShellTask):
Expand Down
5 changes: 5 additions & 0 deletions plugins/flytekit-slurm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Flytekit Slurm Plugin

The Slurm agent is designed to integrate Flyte workflows with Slurm-managed high-performance computing (HPC) clusters, enabling users to leverage Slurm's capability of compute resource allocation, scheduling, and monitoring.

This [guide](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md) provides a concise overview of the design philosophy behind the Slurm agent and explains how to set up a local environment for testing the agent.
Binary file added plugins/flytekit-slurm/assets/basic_arch.png
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing Graph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, bro.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added plugins/flytekit-slurm/assets/flyte_client.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added plugins/flytekit-slurm/assets/overview_v2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
264 changes: 264 additions & 0 deletions plugins/flytekit-slurm/demo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
# Slurm Agent Demo

> Note: This document is still a work in progress, focusing on demonstrating the initial implementation. It will be updated and refined frequently until a stable version is ready.

In this guide, we will briefly introduce how to setup an environment to test Slurm agent locally without running the backend service (e.g., flyte agent gRPC server). It covers both basic and advanced use cases: the basic use case involves executing a shell script directly, while the advanced use case enables running user-defined functions on a Slurm cluster.

## Table of Content
* [Overview](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#overview)
* [Setup a Local Test Environment](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#setup-a-local-test-environment)
* [Flyte Client (Localhost)](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost)
* [Remote Tiny Slurm Cluster](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#remote-tiny-slurm-cluster)
* [SSH Configuration](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#ssh-configuration)
* [(Optional) Setup Amazon S3 Bucket](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#optional-setup-amazon-s3-bucket)
* [Rich Use Cases](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#rich-use-cases)
* [`SlurmTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmtask)
* [`SlurmShellTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmshelltask)
* [`SlurmFunctionTask`](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#slurmfunctiontask)

## Overview
Slurm agent on the highest level has three core methods to interact with a Slurm cluster:
1. `create`: Use `srun` or `sbatch` to run a job on a Slurm cluster
2. `get`: Use `scontrol show job <job_id>` to monitor the Slurm job state
3. `delete`: Use `scancel <job_id>` to cancel the Slurm job (this method is still under test)

In the simplest form, Slurm agent supports directly running a batch script using `sbatch` on a Slurm cluster as shown below:

![](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/assets/basic_arch.png)

## Setup a Local Test Environment
Without running the backend service, we can setup an environment to test Slurm agent locally. The setup consists of two main components: a client (localhost) and a remote tiny Slurm cluster. Then, we need to configure SSH connection to facilitate communication between the two, which relies on `asyncssh`. Additionally, an S3-compatible object storage is needed for advanced use cases and we choose [Amazon S3](https://us-west-2.console.aws.amazon.com/s3/get-started?region=us-west-2&bucketType=general) for demonstration here.
> Note: A persistence layer (such as S3-compatible object storage) becomes essential as scenarios grow more complex, especially when integrating heterogeneous task types into a workflow in the future.

### Flyte Client (Localhost)
1. Setup a local Flyte cluster following this [official guide](https://docs.flyte.org/en/latest/community/contribute/contribute_code.html#how-to-setup-dev-environment-for-flytekit)
2. Build a virtual environment (e.g., [poetry](https://python-poetry.org/), [conda](https://docs.conda.io/en/latest/)) and activate it
3. Clone Flytekit [repo](https://github.com/flyteorg/flytekit), checkout the Slurm agent [PR](https://github.com/flyteorg/flytekit/pull/3005/), and install Flytekit
```
git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 3005
make setup && pip install -e .
```
4. Install Flytekit Slurm agent
```
cd plugins/flytekit-slurm/
pip install -e .
```

### Remote Tiny Slurm Cluster
To simplify the setup process, we follow this [guide](https://github.com/JiangJiaWei1103/Slurm-101) to configure a single-host Slurm cluster, covering `slurmctld` (the central management daemon) and `slurmd` (the compute node daemon).

After building a Slurm cluster, we need to install Flytekit and Slurm agent, just as what we did in the previous [section](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost).
1. Build a virtual environment and activate it (we take `poetry` as an example):
```
poetry new demo-env

# For running a subshell with the virtual environment activated
poetry self add poetry-plugin-shell

# Activate the virtual environment
poetry shell
```
2. Clone Flytekit [repo](https://github.com/flyteorg/flytekit), checkout the Slurm agent [PR](https://github.com/flyteorg/flytekit/pull/3005/), and install Flytekit
```
git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 3005
make setup && pip install -e .
```
3. Install Flytekit Slurm agent
```
cd plugins/flytekit-slurm/
pip install -e .
```

### SSH Configuration
To facilitate communication between the Flyte client and the remote Slurm cluster, we setup SSH on the Flyte client side as follows:
1. Create a new authentication key pair
```
ssh-keygen -t rsa -b 4096
```
2. Copy the public key into the remote Slurm cluster
```
ssh-copy-id <username>@<remote_server_ip>
```
3. Enable key-based authentication
```
# ~/.ssh/config
Host <host_alias>
HostName <remote_server_ip>
Port <ssh_port>
User <username>
IdentityFile <path_to_private_key>
```
Then, run a sanity check to make sure we can connect to the Slurm cluster:
```
ssh <host_alias>
```
Simple and elegant!

### (Optional) Setup Amazon S3 Bucket
For those interested in advanced use cases, in which user-defined functions are sent and executed on the Slurm cluster, an S3-compitable object storage becomes a necessary component. Following summarizes the setup process:
1. Click "Create bucket" button (marked in yellow) to create a bucket on this [page](https://us-west-2.console.aws.amazon.com/s3/get-started?region=us-west-2&bucketType=general)
* Give the cluster an unique name and leave other settings as default
2. Click the user on the top right corner and go to "Security credentials"
3. Create an access key and save it
4. Configure AWS access on **both** machines
```
# ~/.aws/config
[default]
region=<your_region>

# ~/.aws/credentials
[default]
aws_access_key_id=<aws_access_key_id>
aws_secret_access_key=<aws_secret_access_key>
```

Now, both machines have access to the Amazon S3 bucket. Perfect!


## Rich Use Cases
In this section, we will demonstrate three supported use cases, ranging from basic to advanced.

### `SlurmTask`
In the simplest use case, we specify the path to the batch script that is already available on the cluster.

Suppose we have a batch script as follows:
```
#!/bin/bash

echo "Hello AWS slurm, run a Flyte SlurmTask!" >> ./echo_aws.txt
```

We use the following python script to test Slurm agent on the [client](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost):
```python
import os

from flytekit import workflow
from flytekitplugins.slurm import SlurmRemoteScript, SlurmTask


echo_job = SlurmTask(
name="<task-name>",
task_config=SlurmRemoteScript(
slurm_host="<slurm-host>",
batch_script_path="<remote-batch-script-path>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)


@workflow
def wf() -> None:
echo_job()


if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner

runner = CliRunner()
path = os.path.realpath(__file__)

print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", path, "wf"])
print(result.output)
```

### `SlurmShellTask`
`SlurmShellTask` offers users the flexibility to define the content of shell scripts. Below is an example of creating a task that executes a Python script already present on the Slurm cluster:
```python
import os

from flytekit import workflow
from flytekitplugins.slurm import Slurm, SlurmShellTask


shell_task = SlurmShellTask(
name="test-shell",
script="""#!/bin/bash
# We can define sbatch options here, but using sbatch_conf can be more neat
echo "Run a Flyte SlurmShellTask...\n"

# Run a python script on Slurm
# Activate the virtual env first if any
python3 <path_to_python_script>
""",
task_config=Slurm(
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
Comment on lines +191 to +195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
slurm_host_ssh_key="x",
secrets=[Secret("x")]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for future not needed now

),
)


@workflow
def wf() -> None:
shell_task()


if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner

runner = CliRunner()
path = os.path.realpath(__file__)

print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", path, "wf"])
print(result.output)
```

### `SlurmFunctionTask`
In the most advanced use case, `SlurmFunctionTask` allows users to define custom Python functions that are sent to and executed on the Slurm cluster. Following figure demonstrates the process of running a `SlurmFunctionTask`:

![](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/assets/overview_v2.png)

```python
import os

from flytekit import task, workflow
from flytekitplugins.slurm import SlurmFunction


@task(
task_config=SlurmFunction(
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
def plus_one(x: int) -> int:
return x + 1
Comment on lines +229 to +239
Copy link
Contributor

@kumare3 kumare3 Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@task(
task_config=SlurmFunction(
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
def plus_one(x: int) -> int:
return x + 1
@task(
task_config=SlurmFunction(
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
script="""
export ENV_VAR={inputs.v}
{task.fn}
rm -rf /tmp/files"""
)
)
def plus_one(x: int) -> int:
return x + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where, task.fn should be replaced with the tasks execution command something like

pflyte-fast-execute ... -- pyflyte-execute ...

This command is in container.args

Copy link
Contributor Author

@JiangJiaWei1103 JiangJiaWei1103 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ketan,

Great point! This approach allows users to define behaviors for both pre-execution (e.g., setting up env vars, activating a virtual env) and post-execution (e.g., tearing down tmp files).

As for the task execution command, as you mentioned, we currently construct the entry points using task_template.container.args. My question is, does this mean users are always required to hardcode {task.fn} to specify where the task function should run?

If that is the case, I propose using replace to insert the entry points. Alternatively, we could split pre-execution and post-execution into separate parameters while abstracting {task.fn} from users, so they don’t need to write it explicitly.

Please let me know your thoughts. Thanks!

Copy link
Contributor Author

@JiangJiaWei1103 JiangJiaWei1103 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding {inputs.v}, should we support this additional input for both SlurmFunctionTask and SlurmShellTask, or only for SlurmShellTask?

For SlurmShellTask, I would apply string interpolation.



@task
def greet(year: int) -> str:
return f"Hello {year}!!!"


@workflow
def wf(x: int) -> str:
x = plus_one(x=x)
msg = greet(year=x)
return msg


if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner

runner = CliRunner()
path = os.path.realpath(__file__)

print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3-bucket-uri>", path, "wf", "--x", 2024])
print(result.output)
```
4 changes: 4 additions & 0 deletions plugins/flytekit-slurm/flytekitplugins/slurm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .function.agent import SlurmFunctionAgent
from .function.task import SlurmFunction, SlurmFunctionTask
from .script.agent import SlurmScriptAgent
from .script.task import Slurm, SlurmRemoteScript, SlurmShellTask, SlurmTask
Loading
Loading