Skip to content

Commit

Permalink
Use placeholder runs to show pipeline runs in the dashboard without d…
Browse files Browse the repository at this point in the history
…elay (#2048)

* POC

* Maybe solve concurrency issue

* Readd log

* Some comments and small fixes

* Small cleanup

* Docstrings

* rename constraint

* Add new execution status

* Finish docstring sentence

* Delete run if failed during initialization

* Remove wait method

* cleanup

* Replace emoji

* Prevent empty run names

* Fix return annotation

* Remove unique constraint

* Add index for pipeline run table

* Better explanation

* Switch back to unique constraint

* Manual migration to add dummy orchestrator run id for old runs

* Remove useless check that is handled by unique constraint

* Add additional check to make sure we're only replacing placeholder runs

* Improve docstring

* Add tests

* Fix mypy and unit tests

* Mypy fixes

* Fix typos

* Fix get_or_create_run for pipelines with multiple steps

* Silence darglint

* Verify unique values during migration

* Fix alembic order

* Improve robustness of test_logs_are_recorded_properly test

* Fix alembic order

* Fix alembic order

* Fix ruff ignore

* Fix typo

* Import cleanup

* Formatting

* Fix alembic order

* Auto-update of E2E template

* Fix alembic order

* Auto-update of Starter template

* Auto-update of E2E template

* Auto-update of NLP template

---------

Co-authored-by: GitHub Actions <actions@github.com>
Co-authored-by: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 3, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent f4fa7e4 commit 79d967e
Showing 17 changed files with 588 additions and 138 deletions.
2 changes: 2 additions & 0 deletions src/zenml/cli/utils.py
Original file line number Diff line number Diff line change
@@ -2235,6 +2235,8 @@ def get_execution_status_emoji(status: "ExecutionStatus") -> str:
"""
from zenml.enums import ExecutionStatus

if status == ExecutionStatus.INITIALIZING:
return ":hourglass_flowing_sand:"
if status == ExecutionStatus.FAILED:
return ":x:"
if status == ExecutionStatus.RUNNING:
14 changes: 14 additions & 0 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
@@ -56,11 +56,25 @@ class VisualizationType(StrEnum):
class ExecutionStatus(StrEnum):
"""Enum that represents the current status of a step or pipeline run."""

INITIALIZING = "initializing"
FAILED = "failed"
COMPLETED = "completed"
RUNNING = "running"
CACHED = "cached"

@property
def is_finished(self) -> bool:
"""Whether the execution status refers to a finished execution.
Returns:
Whether the execution status refers to a finished execution.
"""
return self in {
ExecutionStatus.FAILED,
ExecutionStatus.COMPLETED,
ExecutionStatus.CACHED,
}


class LoggingLevels(Enum):
"""Enum for logging levels."""
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@

import sky

from zenml.client import Client
from zenml.entrypoints import PipelineEntrypointConfiguration
from zenml.enums import StackComponentType
from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import (
@@ -31,7 +30,6 @@
from zenml.orchestrators import (
ContainerizedOrchestrator,
)
from zenml.orchestrators import utils as orchestrator_utils
from zenml.stack import StackValidator
from zenml.utils import string_utils

@@ -264,12 +262,7 @@ def prepare_or_run_pipeline(
self.prepare_environment_variable(set=False)

run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
"Pipeline run has finished in `%s`.",
string_utils.get_human_readable_time(run_duration),
)
1 change: 0 additions & 1 deletion src/zenml/models/v2/core/pipeline_run.py
Original file line number Diff line number Diff line change
@@ -62,7 +62,6 @@
class PipelineRunRequest(WorkspaceScopedRequest):
"""Request model for pipeline runs."""

id: UUID
name: str = Field(
title="The name of the pipeline run.",
max_length=STR_FIELD_MAX_LENGTH,
66 changes: 47 additions & 19 deletions src/zenml/new/pipelines/pipeline.py
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@
from zenml.config.pipeline_spec import PipelineSpec
from zenml.config.schedule import Schedule
from zenml.config.step_configurations import StepConfigurationUpdate
from zenml.enums import StackComponentType
from zenml.enums import ExecutionStatus, StackComponentType
from zenml.hooks.hook_validators import resolve_and_validate_hook
from zenml.logger import get_logger
from zenml.models import (
@@ -65,11 +65,13 @@
PipelineDeploymentResponse,
PipelineRequest,
PipelineResponse,
PipelineRunRequest,
PipelineRunResponse,
ScheduleRequest,
)
from zenml.new.pipelines import build_utils
from zenml.new.pipelines.model_utils import NewModelVersionRequest
from zenml.orchestrators.utils import get_run_name
from zenml.stack import Stack
from zenml.steps import BaseStep
from zenml.steps.entrypoint_function_utils import (
@@ -569,7 +571,7 @@ def _run(
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
) -> None:
) -> Optional[PipelineRunResponse]:
"""Runs the pipeline on the active stack.
Args:
@@ -597,6 +599,10 @@ def _run(
Raises:
Exception: bypass any exception from pipeline up.
Returns:
Model of the pipeline run if running without a schedule, `None` if
running with a schedule.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
@@ -609,7 +615,7 @@ def _run(
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
return None

logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")

@@ -734,24 +740,52 @@ def _run(

self.log_pipeline_deployment_metadata(deployment_model)

run = None
if not schedule:
run_request = PipelineRunRequest(
name=get_run_name(
run_name_template=deployment_model.run_name_template
),
# We set the start time on the placeholder run already to
# make it consistent with the {time} placeholder in the
# run name. This means the placeholder run will usually
# have longer durations than scheduled runs, as for them
# the start_time is only set once the first step starts
# running.
start_time=datetime.utcnow(),
orchestrator_run_id=None,
user=Client().active_user.id,
workspace=deployment_model.workspace.id,
deployment=deployment_model.id,
pipeline=deployment_model.pipeline.id
if deployment_model.pipeline
else None,
status=ExecutionStatus.INITIALIZING,
)
run = Client().zen_store.create_run(run_request)

# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
stack.deploy_pipeline(deployment=deployment_model)
except Exception as e:
if (
run
and Client().get_pipeline_run(run.id).status
== ExecutionStatus.INITIALIZING
):
# The run hasn't actually started yet, which means that we
# failed during initialization -> We don't want the
# placeholder run to stay in the database
Client().delete_pipeline_run(run.id)

raise e
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

runs = Client().list_pipeline_runs(
deployment_id=deployment_model.id,
sort_by="desc:start_time",
size=1,
)

if runs.items:
run_url = dashboard_utils.get_run_url(runs[0])
if run:
run_url = dashboard_utils.get_run_url(run)
if run_url:
logger.info(f"Dashboard URL: {run_url}")
else:
@@ -760,14 +794,8 @@ def _run(
"Dashboard`. In order to try it locally, please run "
"`zenml up`."
)
else:
logger.warning(
f"Your orchestrator '{stack.orchestrator.name}' is "
f"running remotely. Note that the pipeline run will "
f"only show up on the ZenML dashboard once the first "
f"step has started executing on the remote "
f"infrastructure.",
)

return run

@staticmethod
def log_pipeline_deployment_metadata(
9 changes: 1 addition & 8 deletions src/zenml/orchestrators/local/local_orchestrator.py
Original file line number Diff line number Diff line change
@@ -16,10 +16,8 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
from uuid import uuid4

from zenml.client import Client
from zenml.logger import get_logger
from zenml.orchestrators import BaseOrchestrator
from zenml.orchestrators import utils as orchestrator_utils
from zenml.orchestrators.base_orchestrator import (
BaseOrchestratorConfig,
BaseOrchestratorFlavor,
@@ -81,13 +79,8 @@ def prepare_or_run_pipeline(
)

run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=self._orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Run `%s` has finished in `%s`.",
run_model.name,
"Pipeline run has finished in `%s`.",
string_utils.get_human_readable_time(run_duration),
)
self._orchestrator_run_id = None
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
from docker.errors import ContainerError
from pydantic import validator

from zenml.client import Client
from zenml.config.base_settings import BaseSettings
from zenml.config.global_config import GlobalConfiguration
from zenml.constants import (
@@ -38,7 +37,6 @@
BaseOrchestratorFlavor,
ContainerizedOrchestrator,
)
from zenml.orchestrators import utils as orchestrator_utils
from zenml.stack import Stack, StackValidator
from zenml.utils import string_utils

@@ -193,13 +191,8 @@ def prepare_or_run_pipeline(
raise RuntimeError(error_message)

run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
"Pipeline run has finished in `%s`.",
string_utils.get_human_readable_time(run_duration),
)

26 changes: 8 additions & 18 deletions src/zenml/orchestrators/step_launcher.py
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@
from zenml.environment import get_run_environment_dict
from zenml.logger import get_logger
from zenml.logging import step_logging
from zenml.logging.step_logging import StepLogsStorageContext
from zenml.model.utils import link_artifact_config_to_model_version
from zenml.models import (
ArtifactVersionResponse,
@@ -50,7 +49,6 @@
)
from zenml.orchestrators import utils as orchestrator_utils
from zenml.orchestrators.step_runner import StepRunner
from zenml.orchestrators.utils import is_setting_enabled
from zenml.stack import Stack
from zenml.utils import string_utils

@@ -152,7 +150,7 @@ def launch(self) -> None:
if handle_bool_env_var(ENV_ZENML_DISABLE_STEP_LOGS_STORAGE, False):
step_logging_enabled = False
else:
step_logging_enabled = is_setting_enabled(
step_logging_enabled = orchestrator_utils.is_setting_enabled(
is_enabled_on_step=self._step.config.enable_step_logs,
is_enabled_on_pipeline=self._deployment.pipeline_configuration.enable_step_logs,
)
@@ -167,7 +165,9 @@ def launch(self) -> None:
self._step.config.name,
)

logs_context = StepLogsStorageContext(logs_uri=logs_uri) # type: ignore[assignment]
logs_context = step_logging.StepLogsStorageContext(
logs_uri=logs_uri
) # type: ignore[assignment]

logs_model = LogsRequest(
uri=logs_uri,
@@ -275,24 +275,14 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
The created or existing pipeline run,
and a boolean indicating whether the run was created or reused.
"""
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self._stack.orchestrator,
orchestrator_run_id=self._orchestrator_run_id,
)

date = datetime.utcnow().strftime("%Y_%m_%d")
time = datetime.utcnow().strftime("%H_%M_%S_%f")
run_name = self._deployment.run_name_template.format(
date=date, time=time
run_name = orchestrator_utils.get_run_name(
run_name_template=self._deployment.run_name_template
)

logger.debug(
"Creating pipeline run with ID: %s, name: %s", run_id, run_name
)
logger.debug("Creating pipeline run %s", run_name)

client = Client()
pipeline_run = PipelineRunRequest(
id=run_id,
name=run_name,
orchestrator_run_id=self._orchestrator_run_id,
user=client.active_user.id,
@@ -346,7 +336,7 @@ def _prepare(
step_run.parent_step_ids = parent_step_ids
step_run.cache_key = cache_key

cache_enabled = is_setting_enabled(
cache_enabled = orchestrator_utils.is_setting_enabled(
is_enabled_on_step=self._step.config.enable_cache,
is_enabled_on_pipeline=self._deployment.pipeline_configuration.enable_cache,
)
Loading

0 comments on commit 79d967e

Please sign in to comment.