Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLEngine Commands Implementation #773

Merged
merged 7 commits into from
Feb 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion component_sdk/python/kfp_component/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._kfp_execution_context import KfpExecutionContext
from ._kfp_execution_context import KfpExecutionContext
from . import _display as display
104 changes: 104 additions & 0 deletions component_sdk/python/kfp_component/core/_display.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
import threading
import logging

_OUTPUT_PATH = os.environ.get('KFP_UI_METADATA_PATH', '/mlpipeline-ui-metadata.json')
_OUTPUT_FILE_LOCK = threading.Lock()

def display(obj):
"""Display an object to KFP UI.
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved

Args:
obj (object): the object to output the display metadata. It follows same
convention defined by IPython display API. The currently supported representation
functions:

* `_repr_html_`: it returns a html content which will be converted into a
web-app metadata to KFP UI.
* `_repr_kfpmetadata_`: it returns a KFP metadata json object, which follows
the convention from https://www.kubeflow.org/docs/pipelines/output-viewer/.

The supported builtin objects are HTML, Tensorboard, Link.
"""
obj_dir = dir(obj)
if '_repr_html_' in obj_dir:
display_html(obj)

if '_repr_kfpmetadata_' in obj_dir:
display_kfpmetadata(obj)

def display_html(obj):
"""Display html representation to KFP UI.
"""
if '_repr_html_' not in dir(obj):
raise ValueError('_repr_html_ function is not present.')
html = obj._repr_html_()
_output_ui_metadata({
'type': 'web-app',
'html': html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the documentation(https://www.kubeflow.org/docs/pipelines/output-viewer/), I did not find the html field key. Is the documentation outdated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new feature we will add. It's tracked by #795. @rileyjbauer

})

def display_kfpmetadata(obj):
"""Display from KFP UI metadata
"""
if '_repr_kfpmetadata_' not in dir(obj):
raise ValueError('_repr_kfpmetadata_ function is not present.')
kfp_metadata = obj._repr_kfpmetadata_()
_output_ui_metadata(kfp_metadata)

def _output_ui_metadata(output):
logging.info('Dumping metadata: {}'.format(output))
with _OUTPUT_FILE_LOCK:
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
metadata = {}
if os.path.isfile(_OUTPUT_PATH):
with open(_OUTPUT_PATH, 'r') as f:
metadata = json.load(f)

with open(_OUTPUT_PATH, 'w') as f:
if 'outputs' not in metadata:
metadata['outputs'] = []
metadata['outputs'].append(output)
json.dump(metadata, f)

class HTML(object):
"""Class to hold html raw data.
"""
def __init__(self, data):
self._html = data

def _repr_html_(self):
return self._html

class Tensorboard(object):
"""Class to hold tensorboard metadata.
"""
def __init__(self, job_dir):
self._job_dir = job_dir

def _repr_kfpmetadata_(self):
return {
'type': 'tensorboard',
'source': self._job_dir
}

class Link(HTML):
"""Class to hold an HTML hyperlink data.
"""
def __init__(self, href, text):
super(Link, self).__init__(
'<a href="{}">{}</a>'.format(href, text))
13 changes: 13 additions & 0 deletions component_sdk/python/kfp_component/google/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15 changes: 15 additions & 0 deletions component_sdk/python/kfp_component/google/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._utils import normalize_name, dump_file, check_resource_changed
92 changes: 92 additions & 0 deletions component_sdk/python/kfp_component/google/common/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
import os

def normalize_name(name,
valid_first_char_pattern='a-zA-Z',
valid_char_pattern='0-9a-zA-Z_',
invalid_char_placeholder='_',
prefix_placeholder='x_'):
"""Normalize a name to a valid resource name.

Uses ``valid_first_char_pattern`` and ``valid_char_pattern`` regex pattern
to find invalid characters from ``name`` and replaces them with
``invalid_char_placeholder`` or prefix the name with ``prefix_placeholder``.

Args:
name: The name to be normalized.
valid_first_char_pattern: The regex pattern for the first character.
valid_char_pattern: The regex pattern for all the characters in the name.
invalid_char_placeholder: The placeholder to replace invalid characters.
prefix_placeholder: The placeholder to prefix the name if the first char
is invalid.

Returns:
The normalized name. Unchanged if all characters are valid.
"""
if not name:
return name
normalized_name = re.sub('[^{}]+'.format(valid_char_pattern),
invalid_char_placeholder, name)
if not re.match('[{}]'.format(valid_first_char_pattern),
normalized_name[0]):
normalized_name = prefix_placeholder + normalized_name
if name != normalized_name:
logging.info('Normalize name from "{}" to "{}".'.format(
name, normalized_name))
return normalized_name

def dump_file(path, content):
"""Dumps string into local file.

Args:
path: the local path to the file.
content: the string content to dump.
"""
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
elif os.path.exists(path):
logging.warning('The file {} will be overwritten.'.format(path))
with open(path, 'w') as f:
f.write(content)

def check_resource_changed(requested_resource,
existing_resource, property_names):
"""Check if a resource has been changed.

The function checks requested resource with existing resource
by comparing specified property names. Check fails if any property
name in the list is in ``requested_resource`` but its value is
different with the value in ``existing_resource``.

Args:
requested_resource: the user requested resource paylod.
existing_resource: the existing resource payload from data storage.
property_names: a list of property names.

Return:
True if ``requested_resource`` has been changed.
"""
for property_name in property_names:
if not property_name in requested_resource:
continue
existing_value = existing_resource.get(property_name, None)
if requested_resource[property_name] != existing_value:
return True
return False

30 changes: 30 additions & 0 deletions component_sdk/python/kfp_component/google/ml_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module that contains a set of commands to call ML Engine APIs

The commands are aware of KFP execution context and can work under
retry and cancellation context. The currently supported commands
are: train, batch_prediction, create_model, create_version and
delete_version.

TODO(hongyes): Provides full ML Engine API support.
"""

from ._create_job import create_job
from ._create_model import create_model
from ._create_version import create_version
from ._delete_version import delete_version
from ._train import train
from ._batch_predict import batch_predict
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

from ._create_job import create_job

def batch_predict(project_id, model_path, input_paths, input_data_format,
output_path, region, output_data_format=None, prediction_input=None, job_id_prefix=None,
wait_interval=30):
"""Creates a MLEngine batch prediction job.

Args:
project_id (str): Required. The ID of the parent project of the job.
model_path (str): Required. The path to the model. It can be either:
`projects/[PROJECT_ID]/models/[MODEL_ID]` or
`projects/[PROJECT_ID]/models/[MODEL_ID]/versions/[VERSION_ID]`
or a GCS path of a model file.
input_paths (list): Required. The Google Cloud Storage location of
the input data files. May contain wildcards.
input_data_format (str): Required. The format of the input data files.
See https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#DataFormat.
output_path (str): Required. The output Google Cloud Storage location.
region (str): Required. The Google Compute Engine region to run the
prediction job in.
output_data_format (str): Optional. Format of the output data files,
defaults to JSON.
prediction_input (dict): Input parameters to create a prediction job.
job_id_prefix (str): the prefix of the generated job id.
wait_interval (int): optional wait interval between calls
to get job status. Defaults to 30.
"""
if not prediction_input:
prediction_input = {}
if not model_path:
raise ValueError('model_path must be provided.')
if _is_model_name(model_path):
prediction_input['modelName'] = model_path
elif _is_model_version_name(model_path):
prediction_input['versionName'] = model_path
elif _is_gcs_path(model_path):
prediction_input['uri'] = model_path
else:
raise ValueError('model_path value is invalid.')

if input_paths:
prediction_input['inputPaths'] = input_paths
if input_data_format:
prediction_input['dataFormat'] = input_data_format
if output_path:
prediction_input['outputPath'] = output_path
if output_data_format:
prediction_input['outputDataFormat'] = output_data_format
if region:
prediction_input['region'] = region
job = {
'predictionInput': prediction_input
}
create_job(project_id, job, job_id_prefix, wait_interval)

def _is_model_name(name):
return re.match(r'/projects/[^/]+/models/[^/]+$', name)

def _is_model_version_name(name):
return re.match(r'/projects/[^/]+/models/[^/]+/versions/[^/]+$', name)

def _is_gcs_path(name):
return name.startswith('gs://')
Loading