Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Add pod labels for telemetry purpose. #3578

Merged
merged 7 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions sdk/python/kfp/compiler/_default_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Dict, Optional, Text
from ..dsl._container_op import BaseOp, ContainerOp

# Pod label indicating the SDK type from which the pipeline is
# generated. By default it's set to kfp.
_SDK_ENV_LABEL = 'pipelines.kubeflow.org/pipeline-sdk-type'
_SDK_ENV_DEFAULT = 'kfp'


def get_default_telemetry_labels() -> Dict[Text, Text]:
"""Returns the default pod labels for telemetry purpose."""
result = {
_SDK_ENV_LABEL: _SDK_ENV_DEFAULT,
numerology marked this conversation as resolved.
Show resolved Hide resolved
}
return result


def add_pod_env(op: BaseOp) -> BaseOp:
"""Adds pod environment info to ContainerOp.
"""
if isinstance(op, ContainerOp) and op.pod_labels and 'add-pod-env' in op.pod_labels and op.pod_labels['add-pod-env'] == 'true':
if isinstance(op, ContainerOp) and op.pod_labels and op.pod_labels.get('add-pod-env', None) == 'true':
from kubernetes import client as k8s_client
op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_POD_NAME',
name='KFP_POD_NAME',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.name'
Expand All @@ -30,12 +45,26 @@ def add_pod_env(op: BaseOp) -> BaseOp:
)
).add_env_variable(
k8s_client.V1EnvVar(
name='KFP_NAMESPACE',
name='KFP_NAMESPACE',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.namespace'
)
)
)
)
return op
return op


def add_pod_labels(labels: Optional[Dict[Text, Text]] = None) -> Callable:
"""Adds provided pod labels to each pod."""

def _add_pod_labels(task):
numerology marked this conversation as resolved.
Show resolved Hide resolved
for k, v in labels.items():
# Only append but not update.
# This is needed to bypass TFX pipelines/components.
if k not in task.pod_labels:
task.add_pod_label(k, v)
return task

return _add_pod_labels
33 changes: 27 additions & 6 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .. import dsl
from ._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env
from ._default_transformers import add_pod_env, add_pod_labels, get_default_telemetry_labels

from ..components.structures import InputSpec
from ..components._yaml_utils import dump_yaml
Expand Down Expand Up @@ -757,7 +757,8 @@ def _create_workflow(self,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
) -> Dict[Text, Any]:
allow_telemetry: bool = True,
) -> Dict[Text, Any]:
""" Internal implementation of create_workflow."""
params_list = params_list or []
argspec = inspect.getfullargspec(pipeline_func)
Expand Down Expand Up @@ -819,6 +820,13 @@ def _create_workflow(self,
default=param.value) for param in params_list]

op_transformers = [add_pod_env]
# By default adds telemetry instruments. Users can opt out toggling
# allow_telemetry.
# Also, TFX pipelines will be bypassed for pipeline compiled by tfx>0.21.4.
if allow_telemetry:
pod_labels = get_default_telemetry_labels()
op_transformers.append(add_pod_labels(pod_labels))

op_transformers.extend(pipeline_conf.op_transformers)

workflow = self._create_pipeline_workflow(
Expand Down Expand Up @@ -869,14 +877,24 @@ def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self._create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)

def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
def compile(
self,
pipeline_func,
package_path,
type_check=True,
pipeline_conf: dsl.PipelineConf = None,
allow_telemetry: bool = True,
):
"""Compile the given pipeline function into workflow yaml.

Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: whether to enable the type check or not, default: False.
pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
allow_telemetry: If set to true, two pod labels will be attached to k8s
pods spawned by this pipeline: 1) pipeline SDK style, 2) pipeline random
ID.
"""
import kfp
type_check_old_value = kfp.TYPE_CHECK
Expand All @@ -885,7 +903,8 @@ def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: d
self._create_and_write_workflow(
pipeline_func=pipeline_func,
pipeline_conf=pipeline_conf,
package_path=package_path)
package_path=package_path,
allow_telemetry=allow_telemetry)
finally:
kfp.TYPE_CHECK = type_check_old_value

Expand Down Expand Up @@ -932,15 +951,17 @@ def _create_and_write_workflow(
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf=None,
package_path: Text=None
package_path: Text=None,
allow_telemetry: bool=True
) -> None:
"""Compile the given pipeline function and dump it to specified file format."""
workflow = self._create_workflow(
pipeline_func,
pipeline_name,
pipeline_description,
params_list,
pipeline_conf)
pipeline_conf,
allow_telemetry)
self._write_workflow(workflow, package_path)
_validate_workflow(workflow)

Expand Down
49 changes: 41 additions & 8 deletions sdk/python/kfp/compiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
import tempfile
from deprecated.sphinx import deprecated

def _str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')


def parse_arguments():
"""Parse command line arguments."""
Expand All @@ -47,12 +57,16 @@ def parse_arguments():
parser.add_argument('--disable-type-check',
action='store_true',
help='disable the type check, default is enabled.')
parser.add_argument('--disable-telemetry',
action='store_true',
help='disable adding telemetry labels, default is enabled.')

args = parser.parse_args()
return args


def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check):
def _compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check, allow_telemetry):
if len(pipeline_funcs) == 0:
raise ValueError('A function with @dsl.pipeline decorator is required in the py file.')

Expand All @@ -68,7 +82,8 @@ def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_
else:
pipeline_func = pipeline_funcs[0]

kfp.compiler.Compiler().compile(pipeline_func, output_path, type_check)
kfp.compiler.Compiler().compile(
pipeline_func, output_path, type_check, allow_telemetry=allow_telemetry)


class PipelineCollectorContext():
Expand All @@ -90,26 +105,31 @@ def __exit__(self, *args):
Please switch to compiling pipeline files or functions.
If you use this feature please create an issue in https://github.com/kubeflow/pipelines/issues .'''
)
def compile_package(package_path, namespace, function_name, output_path, type_check):
def compile_package(
package_path, namespace, function_name, output_path, type_check, allow_telemetry):
tmpdir = tempfile.mkdtemp()
sys.path.insert(0, tmpdir)
try:
subprocess.check_call(['python3', '-m', 'pip', 'install', package_path, '-t', tmpdir])
with PipelineCollectorContext() as pipeline_funcs:
__import__(namespace)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
finally:
del sys.path[0]
shutil.rmtree(tmpdir)


def compile_pyfile(pyfile, function_name, output_path, type_check):
def compile_pyfile(pyfile, function_name, output_path, type_check, allow_telemetry):
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
finally:
del sys.path[0]

Expand All @@ -120,9 +140,22 @@ def main():
(args.py is not None and args.package is not None)):
raise ValueError('Either --py or --package is needed but not both.')
if args.py:
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
compile_pyfile(
args.py,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)
else:
if args.namespace is None:
raise ValueError('--namespace is required for compiling packages.')
compile_package(args.package, args.namespace, args.function, args.output, not args.disable_type_check)
compile_package(
args.package,
args.namespace,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)

1 change: 1 addition & 0 deletions sdk/python/tests/compiler/testdata/add_pod_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
image: library/bash
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
add-pod-env: 'true'
name: echo
- dag:
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions sdk/python/tests/compiler/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ spec:
- -c
image: python:3.5-jessie
name: exiting
metadata: {'labels': {'pipelines.kubeflow.org/pipeline-sdk-type': 'kfp'}}
- container:
args:
- python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split());
Expand All @@ -72,6 +73,9 @@ spec:
parameters:
- name: message
name: get-frequent
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: get-frequent-word
Expand All @@ -98,6 +102,8 @@ spec:
- name: get-frequent-word
- name: outputpath
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
annotations:
tf-version.cloud-tpus.google.com: "1.12"
name: save
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/tests/compiler/testdata/basic_no_decorator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ spec:
- -c
image: python:3.5-jessie
name: exiting
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
- container:
args:
- python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split());
Expand All @@ -73,6 +76,9 @@ spec:
parameters:
- name: message
name: get-frequent
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: get-frequent-word
Expand All @@ -99,6 +105,8 @@ spec:
- name: get-frequent-word
- name: outputpath
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
annotations:
tf-version.cloud-tpus.google.com: "1.12"
name: save
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/tests/compiler/testdata/coin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ spec:
- -c
image: python:alpine3.6
name: flip
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: flip-output
Expand All @@ -92,6 +95,9 @@ spec:
- -c
image: python:alpine3.6
name: flip-again
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: flip-again-output
Expand Down Expand Up @@ -129,6 +135,9 @@ spec:
parameters:
- name: flip-again-output
name: print1
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
- container:
command:
- echo
Expand All @@ -138,3 +147,6 @@ spec:
parameters:
- name: flip-again-output
name: print2
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
9 changes: 9 additions & 0 deletions sdk/python/tests/compiler/testdata/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ spec:
parameters:
- name: url
name: download
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: download-downloaded
Expand Down Expand Up @@ -87,6 +90,9 @@ spec:
parameters:
- name: download-downloaded
name: get-frequent
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: get-frequent-word
Expand All @@ -108,3 +114,6 @@ spec:
- name: get-frequent-word
- name: outputpath
name: save
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
Loading