Skip to content

Commit

Permalink
add PipelineConfig to DSL to re-implement pipeline-level config
Browse files Browse the repository at this point in the history
KFP v1 supported setting pipeline-level configuration via a
`PipelineConf` class. This class was deprecated and no replacement
was added to KFP v2.

add new PipelineConfig class to support setting pipeline-level
configuration in KFP v2.

Co-authored-by: Ricardo M. Oliveira <rmartine@redhat.com>

Signed-off-by: Greg Sheremeta <gshereme@redhat.com>
  • Loading branch information
gregsheremeta committed Oct 28, 2024
1 parent 009132a commit 9525a11
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 6 deletions.
20 changes: 20 additions & 0 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from kfp.dsl import component_factory
from kfp.dsl import for_loop
from kfp.dsl import pipeline_channel
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_context
from kfp.dsl import pipeline_task
from kfp.dsl import placeholders
Expand Down Expand Up @@ -1879,13 +1880,15 @@ def create_pipeline_spec(
pipeline: pipeline_context.Pipeline,
component_spec: structures.ComponentSpec,
pipeline_outputs: Optional[Any] = None,
pipeline_config: pipeline_config.PipelineConfig = None,
) -> Tuple[pipeline_spec_pb2.PipelineSpec, pipeline_spec_pb2.PlatformSpec]:
"""Creates a pipeline spec object.
Args:
pipeline: The instantiated pipeline object.
component_spec: The component spec structures.
pipeline_outputs: The pipeline outputs via return.
pipeline_config: The pipeline config object.
Returns:
A PipelineSpec proto representing the compiled pipeline.
Expand Down Expand Up @@ -1947,6 +1950,12 @@ def create_pipeline_spec(
)

platform_spec = pipeline_spec_pb2.PlatformSpec()
if pipeline_config is not None:
_merge_pipeline_config(
pipelineConfig=pipeline_config,
platformSpec=platform_spec
)

for group in all_groups:
build_spec_by_group(
pipeline_spec=pipeline_spec,
Expand Down Expand Up @@ -2062,6 +2071,17 @@ def write_pipeline_spec_to_file(
f'The output path {package_path} should end with ".yaml".')


def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig,
platformSpec: pipeline_spec_pb2.PlatformSpec):
# TODO: add pipeline config options (ttl, semaphore, etc.) to the dict
# json_format.ParseDict(
# {'pipelineConfig': {
# '<some pipeline config option>': pipelineConfig.<get that value>,
# }}, platformSpec.platforms['kubernetes'])

return platformSpec


def extract_comments_from_pipeline_spec(pipeline_spec: dict,
pipeline_description: str) -> str:

Expand Down
2 changes: 2 additions & 0 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def my_pipeline():
from kfp.dsl.for_loop import Collected
from kfp.dsl.importer_node import importer
from kfp.dsl.pipeline_channel import OneOf
from kfp.dsl.pipeline_config import PipelineConfig
from kfp.dsl.pipeline_context import pipeline
from kfp.dsl.pipeline_task import PipelineTask
from kfp.dsl.placeholders import ConcatPlaceholder
Expand Down Expand Up @@ -292,4 +293,5 @@ def my_pipeline():
'IfPresentPlaceholder',
'ConcatPlaceholder',
'PipelineTask',
'PipelineConfig',
])
3 changes: 3 additions & 0 deletions sdk/python/kfp/dsl/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from kfp.dsl import container_component_artifact_channel
from kfp.dsl import container_component_class
from kfp.dsl import graph_component
from kfp.dsl import pipeline_config
from kfp.dsl import placeholders
from kfp.dsl import python_component
from kfp.dsl import structures
Expand Down Expand Up @@ -676,6 +677,7 @@ def create_graph_component_from_func(
name: Optional[str] = None,
description: Optional[str] = None,
display_name: Optional[str] = None,
pipeline_config: pipeline_config.PipelineConfig = None,
) -> graph_component.GraphComponent:
"""Implementation for the @pipeline decorator.
Expand All @@ -692,6 +694,7 @@ def create_graph_component_from_func(
component_spec=component_spec,
pipeline_func=func,
display_name=display_name,
pipeline_config=pipeline_config,
)


Expand Down
4 changes: 4 additions & 0 deletions sdk/python/kfp/dsl/graph_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kfp.compiler import pipeline_spec_builder as builder
from kfp.dsl import base_component
from kfp.dsl import pipeline_channel
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_context
from kfp.dsl import structures
from kfp.pipeline_spec import pipeline_spec_pb2
Expand All @@ -37,9 +38,11 @@ def __init__(
component_spec: structures.ComponentSpec,
pipeline_func: Callable,
display_name: Optional[str] = None,
pipeline_config: pipeline_config.PipelineConfig = None,
):
super().__init__(component_spec=component_spec)
self.pipeline_func = pipeline_func
self.pipeline_config = pipeline_config

args_list = []
signature = inspect.signature(pipeline_func)
Expand Down Expand Up @@ -69,6 +72,7 @@ def __init__(
pipeline=dsl_pipeline,
component_spec=self.component_spec,
pipeline_outputs=pipeline_outputs,
pipeline_config=pipeline_config,
)

pipeline_root = getattr(pipeline_func, 'pipeline_root', None)
Expand Down
23 changes: 23 additions & 0 deletions sdk/python/kfp/dsl/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline-level config options."""


class PipelineConfig:
"""PipelineConfig contains pipeline-level config options."""

def __init__(self):
pass

# TODO add pipeline level configs
18 changes: 12 additions & 6 deletions sdk/python/kfp/dsl/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
from typing import Callable, Optional

from kfp.dsl import component_factory
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_task
from kfp.dsl import tasks_group
from kfp.dsl import utils


def pipeline(func: Optional[Callable] = None,
*,
name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None,
display_name: Optional[str] = None) -> Callable:
def pipeline(
func: Optional[Callable] = None,
*,
name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None,
display_name: Optional[str] = None,
pipeline_config: pipeline_config.PipelineConfig = None) -> Callable:
"""Decorator used to construct a pipeline.
Example
Expand All @@ -50,6 +53,7 @@ def my_pipeline(a: str, b: int):
pipeline_root: The root directory from which to read input and output
parameters and artifacts.
display_name: A human-readable name for the pipeline.
pipeline_config: Pipeline-level config options.
"""
if func is None:
return functools.partial(
Expand All @@ -58,6 +62,7 @@ def my_pipeline(a: str, b: int):
description=description,
pipeline_root=pipeline_root,
display_name=display_name,
pipeline_config=pipeline_config,
)

if pipeline_root:
Expand All @@ -68,6 +73,7 @@ def my_pipeline(a: str, b: int):
name=name,
description=description,
display_name=display_name,
pipeline_config=pipeline_config,
)


Expand Down

0 comments on commit 9525a11

Please sign in to comment.