diff --git a/src/zenml/orchestrators/base_orchestrator.py b/src/zenml/orchestrators/base_orchestrator.py index dd6d1903192..51e501ead99 100644 --- a/src/zenml/orchestrators/base_orchestrator.py +++ b/src/zenml/orchestrators/base_orchestrator.py @@ -166,9 +166,8 @@ def prepare_or_run_pipeline( environment: Environment variables to set in the orchestration environment. These don't need to be set if running locally. - Returns: - The optional return value from this method will be returned by the - `pipeline_instance.run()` call when someone is running a pipeline. + Yields: + Metadata for the pipeline run. """ def run( @@ -176,7 +175,7 @@ def run( deployment: "PipelineDeploymentResponse", stack: "Stack", placeholder_run: Optional["PipelineRunResponse"] = None, - ) -> Any: + ) -> None: """Runs a pipeline on a stack. Args: diff --git a/src/zenml/pipelines/pipeline_definition.py b/src/zenml/pipelines/pipeline_definition.py index f5e40ebb18a..69e1d12e022 100644 --- a/src/zenml/pipelines/pipeline_definition.py +++ b/src/zenml/pipelines/pipeline_definition.py @@ -782,7 +782,9 @@ def _run( ) deploy_pipeline( - deployment=deployment_model, stack=stack, placeholder_run=run + deployment=deployment_model, + stack=stack, + placeholder_run=run, ) if run: return Client().get_pipeline_run(run.id) diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 11d4430b406..9fb307dc016 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -22,6 +22,7 @@ PipelineRunResponse, StackResponse, ) +from zenml.orchestrators.publish_utils import publish_failed_pipeline_run from zenml.orchestrators.utils import get_run_name from zenml.stack import Flavor, Stack from zenml.utils import code_utils, notebook_utils, source_utils @@ -125,20 +126,18 @@ def deploy_pipeline( Args: deployment: The deployment to run. stack: The stack on which to run the deployment. - placeholder_run: An optional placeholder run for the deployment. This - will be deleted in case the pipeline deployment failed. + placeholder_run: An optional placeholder run for the deployment. Raises: Exception: Any exception that happened while deploying or running (in case it happens synchronously) the pipeline. """ - stack.prepare_pipeline_deployment(deployment=deployment) - # Prevent execution of nested pipelines which might lead to # unexpected behavior previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True try: + stack.prepare_pipeline_deployment(deployment=deployment) stack.deploy_pipeline( deployment=deployment, placeholder_run=placeholder_run, @@ -146,13 +145,14 @@ def deploy_pipeline( except Exception as e: if ( placeholder_run - and Client().get_pipeline_run(placeholder_run.id).status + and Client() + .get_pipeline_run(placeholder_run.id, hydrate=False) + .status == ExecutionStatus.INITIALIZING ): - # The run hasn't actually started yet, which means that we - # failed during initialization -> We don't want the - # placeholder run to stay in the database - Client().delete_pipeline_run(placeholder_run.id) + # The run failed during the initialization phase -> We change it's + # status to `Failed` + publish_failed_pipeline_run(placeholder_run.id) raise e finally: diff --git a/src/zenml/stack/stack.py b/src/zenml/stack/stack.py index 07602c6c796..06cf6445377 100644 --- a/src/zenml/stack/stack.py +++ b/src/zenml/stack/stack.py @@ -810,18 +810,14 @@ def deploy_pipeline( self, deployment: "PipelineDeploymentResponse", placeholder_run: Optional["PipelineRunResponse"] = None, - ) -> Any: + ) -> None: """Deploys a pipeline on this stack. Args: deployment: The pipeline deployment. placeholder_run: An optional placeholder run for the deployment. - This will be deleted in case the pipeline deployment failed. - - Returns: - The return value of the call to `orchestrator.run_pipeline(...)`. """ - return self.orchestrator.run( + self.orchestrator.run( deployment=deployment, stack=self, placeholder_run=placeholder_run ) diff --git a/src/zenml/zen_server/template_execution/runner_entrypoint_configuration.py b/src/zenml/zen_server/template_execution/runner_entrypoint_configuration.py index 16f9ed3adfc..dec897e3c87 100644 --- a/src/zenml/zen_server/template_execution/runner_entrypoint_configuration.py +++ b/src/zenml/zen_server/template_execution/runner_entrypoint_configuration.py @@ -17,9 +17,7 @@ from zenml.entrypoints.base_entrypoint_configuration import ( BaseEntrypointConfiguration, ) -from zenml.pipelines.run_utils import ( - deploy_pipeline, -) +from zenml.pipelines.run_utils import deploy_pipeline, get_placeholder_run class RunnerEntrypointConfiguration(BaseEntrypointConfiguration): @@ -36,4 +34,9 @@ def run(self) -> None: stack = Client().active_stack assert deployment.stack and stack.id == deployment.stack.id - deploy_pipeline(deployment=deployment, stack=stack) + placeholder_run = get_placeholder_run(deployment_id=deployment.id) + deploy_pipeline( + deployment=deployment, + stack=stack, + placeholder_run=placeholder_run, + ) diff --git a/tests/unit/pipelines/test_base_pipeline.py b/tests/unit/pipelines/test_base_pipeline.py index 0d583dd22f2..565dbe1d89f 100644 --- a/tests/unit/pipelines/test_base_pipeline.py +++ b/tests/unit/pipelines/test_base_pipeline.py @@ -605,23 +605,18 @@ def test_rerunning_deloyment_does_not_fail( assert runs.total == 2 -def test_failure_during_initialization_deletes_placeholder_run( +def test_failure_during_initialization_marks_placeholder_run_as_failed( clean_client, empty_pipeline, # noqa: F811 mocker, ): """Tests that when a pipeline run fails during initialization, the - placeholder run that was created for it is deleted.""" + placeholder run is marked as failed.""" mock_create_run = mocker.patch.object( type(clean_client.zen_store), "create_run", wraps=clean_client.zen_store.create_run, ) - mock_delete_run = mocker.patch.object( - type(clean_client.zen_store), - "delete_run", - wraps=clean_client.zen_store.delete_run, - ) pipeline_instance = empty_pipeline assert clean_client.list_pipeline_runs().total == 0 @@ -634,9 +629,10 @@ def test_failure_during_initialization_deletes_placeholder_run( pipeline_instance() mock_create_run.assert_called_once() - mock_delete_run.assert_called_once() - assert clean_client.list_pipeline_runs().total == 0 + runs = clean_client.list_pipeline_runs() + assert len(runs) == 1 + assert runs[0].status == ExecutionStatus.FAILED def test_running_scheduled_pipeline_does_not_create_placeholder_run( diff --git a/tests/unit/stack/test_stack.py b/tests/unit/stack/test_stack.py index 418e635efae..dfcc142e88c 100644 --- a/tests/unit/stack/test_stack.py +++ b/tests/unit/stack/test_stack.py @@ -160,12 +160,6 @@ def test_stack_deployment( components.""" # Mock the pipeline run registering which tries (and fails) to serialize # our mock objects - - pipeline_run_return_value = object() - stack_with_mock_components.orchestrator.run.return_value = ( - pipeline_run_return_value - ) - with empty_pipeline: empty_pipeline.entrypoint() deployment = Compiler().compile( @@ -173,19 +167,15 @@ def test_stack_deployment( stack=stack_with_mock_components, run_configuration=PipelineRunConfiguration(), ) - return_value = stack_with_mock_components.deploy_pipeline( + stack_with_mock_components.deploy_pipeline( deployment=deployment, ) - # for component in stack_with_mock_components.components.values(): - # component.prepare_step_run.assert_called_once() - stack_with_mock_components.orchestrator.run.assert_called_once_with( deployment=deployment, stack=stack_with_mock_components, placeholder_run=None, ) - assert return_value is pipeline_run_return_value def test_requires_remote_server(stack_with_mock_components, mocker):