Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rollback telemetry related changes #4088

Merged
merged 3 commits into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 4 additions & 83 deletions sdk/python/kfp/compiler/_default_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Callable, Dict, Optional, Text

from ..dsl._container_op import BaseOp, ContainerOp

# Pod label indicating the SDK type from which the pipeline is
# generated. By default it's set to kfp.
_SDK_ENV_LABEL = 'pipelines.kubeflow.org/pipeline-sdk-type'
_SDK_ENV_DEFAULT = 'kfp'

# Common prefix of KFP OOB components url paths.
_OOB_COMPONENT_PATH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/'\
'pipelines'

# Key for component origin path pod label.
COMPONENT_PATH_LABEL_KEY = 'pipelines.kubeflow.org/component_origin_path'

# Key for component spec digest pod label.
COMPONENT_DIGEST_LABEL_KEY = 'pipelines.kubeflow.org/component_digest'


def get_default_telemetry_labels() -> Dict[Text, Text]:
"""Returns the default pod labels for telemetry purpose."""
result = {
_SDK_ENV_LABEL: _SDK_ENV_DEFAULT,
}
return result


def add_pod_env(op: BaseOp) -> BaseOp:
"""Adds pod environment info to ContainerOp.
"""
if isinstance(op, ContainerOp) and op.pod_labels and op.pod_labels.get('add-pod-env', None) == 'true':
if isinstance(op, ContainerOp) and op.pod_labels and 'add-pod-env' in op.pod_labels and op.pod_labels['add-pod-env'] == 'true':
from kubernetes import client as k8s_client
op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_POD_NAME',
name='KFP_POD_NAME',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.name'
Expand All @@ -57,64 +30,12 @@ def add_pod_env(op: BaseOp) -> BaseOp:
)
).add_env_variable(
k8s_client.V1EnvVar(
name='KFP_NAMESPACE',
name='KFP_NAMESPACE',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.namespace'
)
)
)
)
return op


def add_pod_labels(labels: Optional[Dict[Text, Text]] = None) -> Callable:
"""Adds provided pod labels to each pod."""

def _add_pod_labels(task):
for k, v in labels.items():
# Only append but not update.
# This is needed to bypass TFX pipelines/components.
if k not in task.pod_labels:
task.add_pod_label(k, v)
return task

return _add_pod_labels


def _remove_suffix(string: Text, suffix: Text) -> Text:
"""Removes the suffix from a string."""
if suffix and string.endswith(suffix):
return string[:-len(suffix)]
else:
return string


def add_name_for_oob_components() -> Callable:
"""Adds the OOB component name if applicable."""

def _add_name_for_oob_components(task):
# Detect the component origin uri in component_ref if exists, and
# attach the OOB component name as a pod label.
component_ref = getattr(task, '_component_ref', None)
if component_ref:
if component_ref.url:
origin_path = _remove_suffix(
component_ref.url, 'component.yaml').rstrip('/')
# Only include KFP OOB components.
if origin_path.startswith(_OOB_COMPONENT_PATH_PREFIX):
origin_path = origin_path.split('/', 7)[-1]
else:
return task
# Clean the label to comply with the k8s label convention.
origin_path = re.sub('[^-a-z0-9A-Z_.]', '.', origin_path)
origin_path_label = origin_path[-63:].strip('-_.')
task.add_pod_label(COMPONENT_PATH_LABEL_KEY, origin_path_label)
if component_ref.digest:
# We can only preserve the first 63 digits of the digest.
task.add_pod_label(
COMPONENT_DIGEST_LABEL_KEY, component_ref.digest[:63])

return task

return _add_name_for_oob_components
return op
34 changes: 6 additions & 28 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .. import dsl
from ._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env, add_pod_labels, add_name_for_oob_components, get_default_telemetry_labels
from ._default_transformers import add_pod_env

from ..components.structures import InputSpec
from ..components._yaml_utils import dump_yaml
Expand Down Expand Up @@ -768,8 +768,7 @@ def _create_workflow(self,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
allow_telemetry: bool = True,
) -> Dict[Text, Any]:
) -> Dict[Text, Any]:
""" Internal implementation of create_workflow."""
params_list = params_list or []

Expand Down Expand Up @@ -829,14 +828,6 @@ def _create_workflow(self,
default=param.value) for param in params_list]

op_transformers = [add_pod_env]
# By default adds telemetry instruments. Users can opt out toggling
# allow_telemetry.
# Also, TFX pipelines will be bypassed for pipeline compiled by tfx>0.21.4.
if allow_telemetry:
pod_labels = get_default_telemetry_labels()
op_transformers.append(add_pod_labels(pod_labels))
op_transformers.append(add_name_for_oob_components())

op_transformers.extend(pipeline_conf.op_transformers)

workflow = self._create_pipeline_workflow(
Expand Down Expand Up @@ -898,24 +889,14 @@ def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self._create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)

def compile(
self,
pipeline_func,
package_path,
type_check=True,
pipeline_conf: dsl.PipelineConf = None,
allow_telemetry: bool = True,
):
def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow yaml.

Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: whether to enable the type check or not, default: False.
pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
allow_telemetry: If set to true, two pod labels will be attached to k8s
pods spawned by this pipeline: 1) pipeline SDK style, 2) pipeline random
ID.
"""
import kfp
type_check_old_value = kfp.TYPE_CHECK
Expand All @@ -924,8 +905,7 @@ def compile(
self._create_and_write_workflow(
pipeline_func=pipeline_func,
pipeline_conf=pipeline_conf,
package_path=package_path,
allow_telemetry=allow_telemetry)
package_path=package_path)
finally:
kfp.TYPE_CHECK = type_check_old_value

Expand Down Expand Up @@ -972,17 +952,15 @@ def _create_and_write_workflow(
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf=None,
package_path: Text=None,
allow_telemetry: bool=True
package_path: Text=None
) -> None:
"""Compile the given pipeline function and dump it to specified file format."""
workflow = self._create_workflow(
pipeline_func,
pipeline_name,
pipeline_description,
params_list,
pipeline_conf,
allow_telemetry)
pipeline_conf)
self._write_workflow(workflow, package_path)
_validate_workflow(workflow)

Expand Down
49 changes: 8 additions & 41 deletions sdk/python/kfp/compiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
import tempfile
from deprecated.sphinx import deprecated

def _str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')


def parse_arguments():
"""Parse command line arguments."""
Expand All @@ -57,16 +47,12 @@ def parse_arguments():
parser.add_argument('--disable-type-check',
action='store_true',
help='disable the type check, default is enabled.')
parser.add_argument('--disable-telemetry',
action='store_true',
help='disable adding telemetry labels, default is enabled.')

args = parser.parse_args()
return args


def _compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check, allow_telemetry):
def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check):
if len(pipeline_funcs) == 0:
raise ValueError('A function with @dsl.pipeline decorator is required in the py file.')

Expand All @@ -82,8 +68,7 @@ def _compile_pipeline_function(
else:
pipeline_func = pipeline_funcs[0]

kfp.compiler.Compiler().compile(
pipeline_func, output_path, type_check, allow_telemetry=allow_telemetry)
kfp.compiler.Compiler().compile(pipeline_func, output_path, type_check)


class PipelineCollectorContext():
Expand All @@ -105,31 +90,26 @@ def __exit__(self, *args):
Please switch to compiling pipeline files or functions.
If you use this feature please create an issue in https://github.com/kubeflow/pipelines/issues .'''
)
def compile_package(
package_path, namespace, function_name, output_path, type_check, allow_telemetry):
def compile_package(package_path, namespace, function_name, output_path, type_check):
tmpdir = tempfile.mkdtemp()
sys.path.insert(0, tmpdir)
try:
subprocess.check_call(['python3', '-m', 'pip', 'install', package_path, '-t', tmpdir])
with PipelineCollectorContext() as pipeline_funcs:
__import__(namespace)
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
finally:
del sys.path[0]
shutil.rmtree(tmpdir)


def compile_pyfile(pyfile, function_name, output_path, type_check, allow_telemetry):
def compile_pyfile(pyfile, function_name, output_path, type_check):
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(
pipeline_funcs, function_name, output_path, type_check,
allow_telemetry=allow_telemetry)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
finally:
del sys.path[0]

Expand All @@ -140,22 +120,9 @@ def main():
(args.py is not None and args.package is not None)):
raise ValueError('Either --py or --package is needed but not both.')
if args.py:
compile_pyfile(
args.py,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
else:
if args.namespace is None:
raise ValueError('--namespace is required for compiling packages.')
compile_package(
args.package,
args.namespace,
args.function,
args.output,
not args.disable_type_check,
not args.disable_telemetry
)
compile_package(args.package, args.namespace, args.function, args.output, not args.disable_type_check)

28 changes: 0 additions & 28 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import unittest
import yaml

from kfp import components
from kfp.compiler._default_transformers import COMPONENT_DIGEST_LABEL_KEY, COMPONENT_PATH_LABEL_KEY
from kfp.dsl._component import component
from kfp.dsl import ContainerOp, pipeline
from kfp.dsl.types import Integer, InconsistentTypeException
Expand All @@ -42,11 +40,6 @@ def some_op():
command=['sleep 1'],
)

_TEST_GCS_DOWNLOAD_COMPONENT_URL = 'https://raw.githubusercontent.com/kubeflow/'\
'pipelines/2dac60c400ad8767b452649d08f328df'\
'af230f96/components/google-cloud/storage/'\
'download/component.yaml'


class TestCompiler(unittest.TestCase):
# Define the places of samples covered by unit tests.
Expand Down Expand Up @@ -718,27 +711,6 @@ def some_pipeline():
container = template.get('container', None)
if container:
self.assertEqual(template['retryStrategy']['limit'], 5)

def test_oob_component_label(self):
gcs_download_op = components.load_component_from_url(
_TEST_GCS_DOWNLOAD_COMPONENT_URL)

@dsl.pipeline(name='some_pipeline')
def some_pipeline():
_download_task = gcs_download_op('gs://some_bucket/some_dir/some_file')

workflow_dict = compiler.Compiler()._compile(some_pipeline)

found_download_task = False
for template in workflow_dict['spec']['templates']:
if template.get('container', None):
found_download_task = True
self.assertEqual(
template['metadata']['labels'][COMPONENT_PATH_LABEL_KEY],
'google-cloud.storage.download')
self.assertIsNotNone(
template['metadata']['labels'].get(COMPONENT_DIGEST_LABEL_KEY))
self.assertTrue(found_download_task, 'download task not found in workflow.')

def test_image_pull_policy(self):
def some_op():
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/compiler/testdata/add_pod_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ spec:
image: library/bash
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
add-pod-env: 'true'
name: echo
- dag:
Expand Down
Empty file.
6 changes: 0 additions & 6 deletions sdk/python/tests/compiler/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ spec:
- -c
image: python:3.5-jessie
name: exiting
metadata: {'labels': {'pipelines.kubeflow.org/pipeline-sdk-type': 'kfp'}}
- container:
args:
- python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split());
Expand All @@ -73,9 +72,6 @@ spec:
parameters:
- name: message
name: get-frequent
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
outputs:
artifacts:
- name: get-frequent-word
Expand All @@ -102,8 +98,6 @@ spec:
- name: get-frequent-word
- name: outputpath
metadata:
labels:
pipelines.kubeflow.org/pipeline-sdk-type: kfp
annotations:
tf-version.cloud-tpus.google.com: "1.12"
name: save
Expand Down
Loading