Skip to content

Commit

Permalink
chore: Rollback telemetry related changes (4088)
Browse files Browse the repository at this point in the history
* Revert "fix length (#3934)"

This reverts commit 7fbb7ca

* Revert "[SDK] Add first party component label (#3861)"

This reverts commit 1e2b9d4

* Revert "[SDK] Add pod labels for telemetry purpose. (#3578)"

This reverts commit aa8da64
  • Loading branch information
Jiaxiao Zheng authored and Bobgy committed Jul 2, 2020
1 parent 51872e4 commit f64e105
Show file tree
Hide file tree
Showing 37 changed files with 192 additions and 556 deletions.
87 changes: 4 additions & 83 deletions sdk/python/kfp/compiler/_default_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Callable, Dict, Optional, Text

from ..dsl._container_op import BaseOp, ContainerOp

# Pod label indicating the SDK type from which the pipeline is
# generated. By default it's set to kfp.
_SDK_ENV_LABEL = 'pipelines.kubeflow.org/pipeline-sdk-type'
_SDK_ENV_DEFAULT = 'kfp'

# Common prefix of KFP OOB components url paths.
_OOB_COMPONENT_PATH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/'\
'pipelines'

# Key for component origin path pod label.
COMPONENT_PATH_LABEL_KEY = 'pipelines.kubeflow.org/component_origin_path'

# Key for component spec digest pod label.
COMPONENT_DIGEST_LABEL_KEY = 'pipelines.kubeflow.org/component_digest'


def get_default_telemetry_labels() -> Dict[Text, Text]:
"""Returns the default pod labels for telemetry purpose."""
result = {
_SDK_ENV_LABEL: _SDK_ENV_DEFAULT,
}
return result


def add_pod_env(op: BaseOp) -> BaseOp:
"""Adds pod environment info to ContainerOp.
"""
if isinstance(op, ContainerOp) and op.pod_labels and op.pod_labels.get('add-pod-env', None) == 'true':
if isinstance(op, ContainerOp) and op.pod_labels and 'add-pod-env' in op.pod_labels and op.pod_labels['add-pod-env'] == 'true':
from kubernetes import client as k8s_client
op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_POD_NAME',
name='KFP_POD_NAME',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.name'
Expand All @@ -57,64 +30,12 @@ def add_pod_env(op: BaseOp) -> BaseOp:
)
).add_env_variable(
k8s_client.V1EnvVar(
name='KFP_NAMESPACE',
name='KFP_NAMESPACE',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.namespace'
)
)
)
)
return op


def add_pod_labels(labels: Optional[Dict[Text, Text]] = None) -> Callable:
"""Adds provided pod labels to each pod."""

def _add_pod_labels(task):
for k, v in labels.items():
# Only append but not update.
# This is needed to bypass TFX pipelines/components.
if k not in task.pod_labels:
task.add_pod_label(k, v)
return task

return _add_pod_labels


def _remove_suffix(string: Text, suffix: Text) -> Text:
"""Removes the suffix from a string."""
if suffix and string.endswith(suffix):
return string[:-len(suffix)]
else:
return string


def add_name_for_oob_components() -> Callable:
"""Adds the OOB component name if applicable."""

def _add_name_for_oob_components(task):
# Detect the component origin uri in component_ref if exists, and
# attach the OOB component name as a pod label.
component_ref = getattr(task, '_component_ref', None)
if component_ref:
if component_ref.url:
origin_path = _remove_suffix(
component_ref.url, 'component.yaml').rstrip('/')
# Only include KFP OOB components.
if origin_path.startswith(_OOB_COMPONENT_PATH_PREFIX):
origin_path = origin_path.split('/', 7)[-1]
else:
return task
# Clean the label to comply with the k8s label convention.
origin_path = re.sub('[^-a-z0-9A-Z_.]', '.', origin_path)
origin_path_label = origin_path[-63:].strip('-_.')
task.add_pod_label(COMPONENT_PATH_LABEL_KEY, origin_path_label)
if component_ref.digest:
# We can only preserve the first 63 digits of the digest.
task.add_pod_label(
COMPONENT_DIGEST_LABEL_KEY, component_ref.digest[:63])

return task

return _add_name_for_oob_components
return op
34 changes: 6 additions & 28 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .. import dsl
from ._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env, add_pod_labels, add_name_for_oob_components, get_default_telemetry_labels
from ._default_transformers import add_pod_env

from ..components.structures import InputSpec
from ..components._yaml_utils import dump_yaml
Expand Down Expand Up @@ -768,8 +768,7 @@ def _create_workflow(self,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
allow_telemetry: bool = True,
) -> Dict[Text, Any]:
) -> Dict[Text, Any]:
""" Internal implementation of create_workflow."""
params_list = params_list or []

Expand Down Expand Up @@ -829,14 +828,6 @@ def _create_workflow(self,
default=param.value) for param in params_list]

op_transformers = [add_pod_env]
# By default adds telemetry instruments. Users can opt out toggling
# allow_telemetry.
# Also, TFX pipelines will be bypassed for pipeline compiled by tfx>0.21.4.
if allow_telemetry:
pod_labels = get_default_telemetry_labels()
op_transformers.append(add_pod_labels(pod_labels))
op_transformers.append(add_name_for_oob_components())

op_transformers.extend(pipeline_conf.op_transformers)

workflow = self._create_pipeline_workflow(
Expand Down Expand Up @@ -895,24 +886,14 @@ def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self._create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)

def compile(
self,
pipeline_func,
package_path,
type_check=True,
pipeline_conf: dsl.PipelineConf = None,
allow_telemetry: bool = True,
):
def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow yaml.
Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: whether to enable the type check or not, default: False.
pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
allow_telemetry: If set to true, two pod labels will be attached to k8s
pods spawned by this pipeline: 1) pipeline SDK style, 2) pipeline random
ID.
"""
import kfp
type_check_old_value = kfp.TYPE_CHECK
Expand All @@ -921,8 +902,7 @@ def compile(
self._create_and_write_workflow(
pipeline_func=pipeline_func,
pipeline_conf=pipeline_conf,
package_path=package_path,
allow_telemetry=allow_telemetry)
package_path=package_path)
finally:
kfp.TYPE_CHECK = type_check_old_value

Expand Down Expand Up @@ -969,17 +949,15 @@ def _create_and_write_workflow(
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf=None,
package_path: Text=None,
allow_telemetry: bool=True
package_path: Text=None
) -> None:
"""Compile the given pipeline function and dump it to specified file format."""
workflow = self._create_workflow(
pipeline_func,
pipeline_name,
pipeline_description,
params_list,
pipeline_conf,
allow_telemetry)
pipeline_conf)
self._write_workflow(workflow, package_path)
_validate_workflow(workflow)

Expand Down
49 changes: 8 additions & 41 deletions sdk/python/kfp/compiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
import tempfile
from deprecated.sphinx import deprecated

def _str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')


def parse_arguments():
"""Parse command line arguments."""
Expand All @@ -57,16 +47,12 @@ def parse_arguments():
parser.add_argument('--disable-type-check',
action='store_true',
help='disable the type check, default is enabled.')
parser.add_argument('--disable-telemetry',
action='store_true',
help='disable adding telemetry labels, default is enabled.')

args = parser.parse_args()
return args


def _compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check, allow_telemetry):
def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check):
if len(pipeline_funcs) == 0:
raise ValueError('A function with @dsl.pipeline decorator is required in the py file.')

Expand All @@ -82,8 +68,7 @@ def _compile_pipeline_function(
else:
pipeline_func = pipeline_funcs[0]

kfp.compiler.Compiler().compile(
pipeline_func, output_path, type_check, allow_telemetry=allow_telemetry)
kfp.compiler.Compiler().compile(pipeline_func, output_path, type_check)


class PipelineCollectorContext():
Expand All @@ -105,31 +90,26 @@ def __exit__(self, *args):
Please switch to compiling pipeline files or functions.
If you use this feature please create an issue in https://github.com/kubeflow/pipelines/issues .'''
)
def compile_package(
package_path, namespace, function_name, output_path, type_check, allow_telemetry):
def compile_package(package_path, namespace, function_name, output_path, type_check):
tmpdir = tempfile.mkdtemp()
sys.path.insert(0, tmpdir)
try:
subprocess.check_call(['python3', '-m', 'pip', 'install', package_path, '-t', tmpdir])
with PipelineCollectorContext() as pipeline_funcs:
__import__(namespace)
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
finally:
del sys.path[0]
shutil.rmtree(tmpdir)


def compile_pyfile(pyfile, function_name, output_path, type_check, allow_telemetry):
def compile_pyfile(pyfile, function_name, output_path, type_check):
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
finally:
del sys.path[0]

Expand All @@ -140,22 +120,9 @@ def main():
(args.py is not None and args.package is not None)):
raise ValueError('Either --py or --package is needed but not both.')
if args.py:
compile_pyfile(
args.py,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
else:
if args.namespace is None:
raise ValueError('--namespace is required for compiling packages.')
compile_package(
args.package,
args.namespace,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)
compile_package(args.package, args.namespace, args.function, args.output, not args.disable_type_check)

28 changes: 0 additions & 28 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import unittest
import yaml

from kfp import components
from kfp.compiler._default_transformers import COMPONENT_DIGEST_LABEL_KEY, COMPONENT_PATH_LABEL_KEY
from kfp.dsl._component import component
from kfp.dsl import ContainerOp, pipeline
from kfp.dsl.types import Integer, InconsistentTypeException
Expand All @@ -42,11 +40,6 @@ def some_op():
command=['sleep 1'],
)

_TEST_GCS_DOWNLOAD_COMPONENT_URL = 'https://raw.githubusercontent.com/kubeflow/'\
'pipelines/2dac60c400ad8767b452649d08f328df'\
'af230f96/components/google-cloud/storage/'\
'download/component.yaml'


class TestCompiler(unittest.TestCase):
# Define the places of samples covered by unit tests.
Expand Down Expand Up @@ -718,27 +711,6 @@ def some_pipeline():
container = template.get('container', None)
if container:
self.assertEqual(template['retryStrategy']['limit'], 5)

def test_oob_component_label(self):
gcs_download_op = components.load_component_from_url(
_TEST_GCS_DOWNLOAD_COMPONENT_URL)

@dsl.pipeline(name='some_pipeline')
def some_pipeline():
_download_task = gcs_download_op('gs://some_bucket/some_dir/some_file')

workflow_dict = compiler.Compiler()._compile(some_pipeline)

found_download_task = False
for template in workflow_dict['spec']['templates']:
if template.get('container', None):
found_download_task = True
self.assertEqual(
template['metadata']['labels'][COMPONENT_PATH_LABEL_KEY],
'google-cloud.storage.download')
self.assertIsNotNone(
template['metadata']['labels'].get(COMPONENT_DIGEST_LABEL_KEY))
self.assertTrue(found_download_task, 'download task not found in workflow.')

def test_image_pull_policy(self):
def some_op():
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/compiler/testdata/add_pod_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ spec:
image: library/bash
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
add-pod-env: 'true'
name: echo
- dag:
Expand Down
Empty file.
6 changes: 0 additions & 6 deletions sdk/python/tests/compiler/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ spec:
- -c
image: python:3.5-jessie
name: exiting
metadata: {'labels': {'pipelines.kubeflow.org/pipeline-sdk-type': 'kfp'}}
- container:
args:
- python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split());
Expand All @@ -73,9 +72,6 @@ spec:
parameters:
- name: message
name: get-frequent
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: get-frequent-word
Expand All @@ -102,8 +98,6 @@ spec:
- name: get-frequent-word
- name: outputpath
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
annotations:
tf-version.cloud-tpus.google.com: "1.12"
name: save
Expand Down
Loading

0 comments on commit f64e105

Please sign in to comment.