-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skypilot with Kubernetes #3033
Skypilot with Kubernetes #3033
Changes from 17 commits
c5aead6
342fa34
7590584
03951d8
89718f0
775253f
4b42940
305322e
edae0e8
2c68fa6
38922b2
364f8ac
85e7f63
2c421b0
f4639f6
46194cc
abf58c4
b3a4520
1eef87a
615c083
5b48068
98973a6
126eefb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -250,6 +250,7 @@ def prepare_or_run_pipeline( | |
entrypoint_str = " ".join(command) | ||
arguments_str = " ".join(args) | ||
|
||
task_envs = environment | ||
docker_environment_str = " ".join( | ||
f"-e {k}={v}" for k, v in environment.items() | ||
) | ||
|
@@ -271,29 +272,33 @@ def prepare_or_run_pipeline( | |
f"sudo docker login --username $DOCKER_USERNAME --password " | ||
f"$DOCKER_PASSWORD {stack.container_registry.config.uri}" | ||
) | ||
task_envs = { | ||
"DOCKER_USERNAME": docker_username, | ||
"DOCKER_PASSWORD": docker_password, | ||
} | ||
task_envs["DOCKER_USERNAME"] = docker_username | ||
task_envs["DOCKER_PASSWORD"] = docker_password | ||
else: | ||
setup = None | ||
task_envs = None | ||
|
||
# Run the entire pipeline | ||
|
||
# Set the service connector AWS profile ENV variable | ||
self.prepare_environment_variable(set=True) | ||
|
||
try: | ||
if isinstance(self.cloud, sky.clouds.Kubernetes): | ||
run_command = f"$VIRTUAL_ENV{entrypoint_str} {arguments_str}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this doesn't work right, And we need to make sure this is actually working There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes this is important @safoinme |
||
setup = None | ||
down = False | ||
idle_minutes_to_autostop = None | ||
safoinme marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" | ||
down = settings.down | ||
idle_minutes_to_autostop = settings.idle_minutes_to_autostop | ||
task = sky.Task( | ||
run=f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}", | ||
run=run_command, | ||
setup=setup, | ||
envs=task_envs, | ||
) | ||
logger.debug( | ||
f"Running run: sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" | ||
) | ||
logger.debug(f"Running run: {setup}") | ||
logger.debug(f"Running run: {run_command}") | ||
|
||
task = task.set_resources( | ||
sky.Resources( | ||
cloud=self.cloud, | ||
|
@@ -306,15 +311,24 @@ def prepare_or_run_pipeline( | |
job_recovery=settings.job_recovery, | ||
region=settings.region, | ||
zone=settings.zone, | ||
image_id=settings.image_id, | ||
image_id=image | ||
if isinstance(self.cloud, sky.clouds.Kubernetes) | ||
else settings.image_id, | ||
disk_size=settings.disk_size, | ||
disk_tier=settings.disk_tier, | ||
) | ||
) | ||
|
||
# Set the cluster name | ||
cluster_name = settings.cluster_name | ||
if cluster_name is None: | ||
if settings.cluster_name: | ||
sky.exec( | ||
task, | ||
settings.cluster_name, | ||
down=down, | ||
stream_logs=settings.stream_logs, | ||
backend=None, | ||
detach_run=True, | ||
) | ||
else: | ||
# Find existing cluster | ||
for i in sky.status(refresh=True): | ||
if isinstance( | ||
|
@@ -324,21 +338,19 @@ def prepare_or_run_pipeline( | |
logger.info( | ||
f"Found existing cluster {cluster_name}. Reusing..." | ||
) | ||
if cluster_name is None: | ||
cluster_name = self.sanitize_cluster_name( | ||
f"{orchestrator_run_name}" | ||
) | ||
|
||
# Launch the cluster | ||
sky.launch( | ||
task, | ||
cluster_name, | ||
retry_until_up=settings.retry_until_up, | ||
idle_minutes_to_autostop=settings.idle_minutes_to_autostop, | ||
down=settings.down, | ||
stream_logs=settings.stream_logs, | ||
detach_setup=True, | ||
) | ||
# Launch the cluster | ||
sky.launch( | ||
task, | ||
cluster_name, | ||
retry_until_up=settings.retry_until_up, | ||
idle_minutes_to_autostop=idle_minutes_to_autostop, | ||
down=down, | ||
stream_logs=settings.stream_logs, | ||
detach_setup=True, | ||
) | ||
|
||
except Exception as e: | ||
logger.error(f"Pipeline run failed: {e}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# Copyright (c) ZenML GmbH 2024. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
# or implied. See the License for the specific language governing | ||
# permissions and limitations under the License. | ||
"""Initialization of the Skypilot Kubernetes integration for ZenML. | ||
|
||
The Skypilot integration sub-module powers an alternative to the local | ||
orchestrator for a remote orchestration of ZenML pipelines on VMs. | ||
""" | ||
from typing import List, Type | ||
|
||
from zenml.integrations.constants import ( | ||
SKYPILOT_KUBERNETES, | ||
) | ||
from zenml.integrations.integration import Integration | ||
from zenml.stack import Flavor | ||
|
||
SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR = "vm_kubernetes" | ||
|
||
|
||
class SkypilotKubernetesIntegration(Integration): | ||
"""Definition of Skypilot Kubernetes Integration for ZenML.""" | ||
|
||
NAME = SKYPILOT_KUBERNETES | ||
# all 0.6.x versions of skypilot[kubernetes] are compatible | ||
REQUIREMENTS = ["skypilot[kubernetes]~=0.6.1"] | ||
APT_PACKAGES = ["openssh-client", "rsync"] | ||
|
||
@classmethod | ||
def flavors(cls) -> List[Type[Flavor]]: | ||
"""Declare the stack component flavors for the Skypilot Kubernetes integration. | ||
|
||
Returns: | ||
List of stack component flavors for this integration. | ||
""" | ||
from zenml.integrations.skypilot_kubernetes.flavors import ( | ||
SkypilotKubernetesOrchestratorFlavor, | ||
) | ||
|
||
return [SkypilotKubernetesOrchestratorFlavor] | ||
|
||
|
||
SkypilotKubernetesIntegration.check_installation() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Copyright (c) ZenML GmbH 2024. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
# or implied. See the License for the specific language governing | ||
# permissions and limitations under the License. | ||
"""Skypilot integration flavor for Skypilot Kubernetes orchestrator.""" | ||
|
||
from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import ( | ||
SkypilotKubernetesOrchestratorConfig, | ||
SkypilotKubernetesOrchestratorFlavor, | ||
SkypilotKubernetesOrchestratorSettings, | ||
) | ||
|
||
__all__ = [ | ||
"SkypilotKubernetesOrchestratorConfig", | ||
"SkypilotKubernetesOrchestratorFlavor", | ||
"SkypilotKubernetesOrchestratorSettings", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might seem mostly cosmetic, but I'll say it anyway: you're adding implementation specific conditions in the base class, which is supposed to be implementation agnostic. The correct way to do this is through static attributes, properties or methods that the implementations can override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well i think i want to re-work the whole Skypilot integration if possible so instead of have a base abstraction and many sub integration which blocks one of the most important features of skypilot
cross-cloud/env failover
, tho I totally agree with your comment