Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable cache precomputation for run templates #3156

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/zenml/orchestrators/base_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,16 @@ def prepare_or_run_pipeline(
environment: Environment variables to set in the orchestration
environment. These don't need to be set if running locally.

Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
Yields:
Metadata for the pipeline run.
"""

def run(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
) -> None:
"""Runs a pipeline on a stack.

Args:
Expand Down
5 changes: 4 additions & 1 deletion src/zenml/pipelines/pipeline_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,10 @@ def _run(
)

deploy_pipeline(
deployment=deployment_model, stack=stack, placeholder_run=run
deployment=deployment_model,
stack=stack,
placeholder_run=run,
cleanup_placeholder_run=True,
)
if run:
return Client().get_pipeline_run(run.id)
Expand Down
37 changes: 25 additions & 12 deletions src/zenml/pipelines/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
PipelineRunResponse,
StackResponse,
)
from zenml.orchestrators.publish_utils import publish_failed_pipeline_run
from zenml.orchestrators.utils import get_run_name
from zenml.stack import Flavor, Stack
from zenml.utils import code_utils, notebook_utils, source_utils
Expand Down Expand Up @@ -119,41 +120,53 @@ def deploy_pipeline(
deployment: "PipelineDeploymentResponse",
stack: "Stack",
placeholder_run: Optional["PipelineRunResponse"] = None,
cleanup_placeholder_run: bool = False,
) -> None:
"""Run a deployment.

Args:
deployment: The deployment to run.
stack: The stack on which to run the deployment.
placeholder_run: An optional placeholder run for the deployment. This
will be deleted in case the pipeline deployment failed.
will be deleted in case the pipeline deployment failed and
`cleanup_placeholder_run` is set to True.
cleanup_placeholder_run: If True, the placeholder run will be deleted
in case the pipeline deployment failed.

Raises:
Exception: Any exception that happened while deploying or running
(in case it happens synchronously) the pipeline.
"""
stack.prepare_pipeline_deployment(deployment=deployment)

def _cleanup_after_failure() -> None:
if not placeholder_run:
return

refreshed_run = Client().get_pipeline_run(
placeholder_run.id, hydrate=False
)
if refreshed_run.status != ExecutionStatus.INITIALIZING:
# The run is failed or some steps have already started
return

if cleanup_placeholder_run:
Client().delete_pipeline_run(placeholder_run.id)
else:
publish_failed_pipeline_run(placeholder_run.id)

# Prevent execution of nested pipelines which might lead to
# unexpected behavior
previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
stack.prepare_pipeline_deployment(deployment=deployment)
stack.deploy_pipeline(
deployment=deployment,
placeholder_run=placeholder_run,
)
except Exception as e:
if (
placeholder_run
and Client().get_pipeline_run(placeholder_run.id).status
== ExecutionStatus.INITIALIZING
):
# The run hasn't actually started yet, which means that we
# failed during initialization -> We don't want the
# placeholder run to stay in the database
Client().delete_pipeline_run(placeholder_run.id)

# TODO: Ideally this would not run if the orchestrator already created a run on their end?
schustmi marked this conversation as resolved.
Show resolved Hide resolved
_cleanup_after_failure()
raise e
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = previous_value
Expand Down
8 changes: 2 additions & 6 deletions src/zenml/stack/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,18 +810,14 @@ def deploy_pipeline(
self,
deployment: "PipelineDeploymentResponse",
placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
) -> None:
"""Deploys a pipeline on this stack.

Args:
deployment: The pipeline deployment.
placeholder_run: An optional placeholder run for the deployment.
This will be deleted in case the pipeline deployment failed.

Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
"""
return self.orchestrator.run(
self.orchestrator.run(
deployment=deployment, stack=self, placeholder_run=placeholder_run
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
from zenml.entrypoints.base_entrypoint_configuration import (
BaseEntrypointConfiguration,
)
from zenml.pipelines.run_utils import (
deploy_pipeline,
)
from zenml.pipelines.run_utils import deploy_pipeline, get_placeholder_run


class RunnerEntrypointConfiguration(BaseEntrypointConfiguration):
Expand All @@ -36,4 +34,12 @@ def run(self) -> None:
stack = Client().active_stack
assert deployment.stack and stack.id == deployment.stack.id

deploy_pipeline(deployment=deployment, stack=stack)
placeholder_run = get_placeholder_run(deployment_id=deployment.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a little bit unsure about this part. Is the relationship between a deployment_id and a placeholder_run always 1-to-1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's 1 to 0 actually, which is why this function returns an optional placeholder run.

# We don't want to cleanup the placeholder run here, as that contains
# the logs that help the user see what happened/why it might have failed
deploy_pipeline(
deployment=deployment,
stack=stack,
placeholder_run=placeholder_run,
cleanup_placeholder_run=False,
)
Loading