From fa99eec24110c76a95109d7b2fa385038e59b114 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Wed, 5 Jun 2019 16:35:30 -0700 Subject: [PATCH 1/4] kfp CLI --- sdk/python/kfp/__main__.py | 66 ++++++++++++++++++++++++++++++++++++++ sdk/python/setup.py | 7 ++-- 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 sdk/python/kfp/__main__.py diff --git a/sdk/python/kfp/__main__.py b/sdk/python/kfp/__main__.py new file mode 100644 index 00000000000..6243ac6384a --- /dev/null +++ b/sdk/python/kfp/__main__.py @@ -0,0 +1,66 @@ +import fire +from ._client import Client +import sys +import subprocess +import pprint +import time +import json + +from tabulate import tabulate + +class KFPWrapper(object): + def __init__(self, host = None, client_id = None, namespace = 'kubeflow'): + self._client = Client(host, client_id, namespace) + self._namespace = namespace + + def run(self, experiment_name, run_name = None, pipeline_package_path = None, pipeline_id = None, watch = True, args = {}): + if not run_name: + run_name = experiment_name + + if not pipeline_package_path and not pipeline_id: + print('You must provide one of [pipeline_package_path, pipeline_id].') + sys.exit(1) + + print(args) + experiment = self._client.create_experiment(experiment_name) + run = self._client.run_pipeline(experiment.id, run_name, pipeline_package_path, args, pipeline_id) + print('Run {} is submitted'.format(run.id)) + self.get_run(run.id, watch) + + def get_run(self, run_id, watch=True): + run = self._client.get_run(run_id).run + self._print_runs([run]) + if not watch: + return + argo_workflow_name = None + while True: + time.sleep(1) + run_detail = self._client.get_run(run_id) + run = run_detail.run + if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: + manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) + if manifest['metadata'] and manifest['metadata']['name']: + argo_workflow_name = manifest['metadata']['name'] + break + if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: + print('Run is finished with status {}.'.format(run_detail.run.status)) + return + if argo_workflow_name: + subprocess.run(['argo', 'watch', argo_workflow_name, '-n', self._namespace]) + self._print_runs([run]) + + def list_runs(self, experiment_id = None, page_size=100): + response = self._client.list_runs(experiment_id=experiment_id, page_size=page_size, sort_by='created_at desc') + self._print_runs(response.runs) + + def _print_runs(self, runs): + headers = ['run id', 'name', 'status', 'created at'] + data = [[run.id, run.name, run.status, run.created_at.isoformat()] for run in runs] + print(tabulate(data, headers=headers, tablefmt='grid')) + + +def main(): + fire.Fire(KFPWrapper, name='kfp') + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 8ebb651541b..f5ec024094d 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -32,7 +32,8 @@ 'cloudpickle', 'kfp-server-api >= 0.1.18, < 0.1.19', #Update the upper version whenever a new version of the kfp-server-api package is released. Update the lower version when there is a breaking change in kfp-server-api. 'argo-models == 2.2.1a', #2.2.1a is equivalent to argo 2.2.1 - 'jsonschema >= 3.0.1' + 'jsonschema >= 3.0.1', + 'tabulate == 0.8.3' ] setup( @@ -67,4 +68,6 @@ ], python_requires='>=3.5.3', include_package_data=True, - entry_points={'console_scripts': ['dsl-compile = kfp.compiler.main:main',]}) + entry_points={'console_scripts': [ + 'dsl-compile = kfp.compiler.main:main', + 'kfp=kfp.__main__:main']}) From 889059af9fc31d618290b2c13071b22731b58945 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Wed, 5 Jun 2019 17:10:12 -0700 Subject: [PATCH 2/4] Add fire dependency --- sdk/python/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index f5ec024094d..9571ab1e890 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -33,7 +33,8 @@ 'kfp-server-api >= 0.1.18, < 0.1.19', #Update the upper version whenever a new version of the kfp-server-api package is released. Update the lower version when there is a breaking change in kfp-server-api. 'argo-models == 2.2.1a', #2.2.1a is equivalent to argo 2.2.1 'jsonschema >= 3.0.1', - 'tabulate == 0.8.3' + 'tabulate == 0.8.3', + 'fire == 0.1.3' ] setup( From 096b11673de78bade393755868a6c86b2eafd703 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Thu, 6 Jun 2019 22:17:26 -0700 Subject: [PATCH 3/4] Use click instead of fire --- sdk/python/kfp/__main__.py | 135 ++++++++++++++++++++++++------------ sdk/python/requirements.txt | 2 + sdk/python/setup.py | 2 +- 3 files changed, 93 insertions(+), 46 deletions(-) diff --git a/sdk/python/kfp/__main__.py b/sdk/python/kfp/__main__.py index 6243ac6384a..38294cf73d7 100644 --- a/sdk/python/kfp/__main__.py +++ b/sdk/python/kfp/__main__.py @@ -1,66 +1,111 @@ -import fire +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from ._client import Client import sys import subprocess import pprint import time import json +import click from tabulate import tabulate -class KFPWrapper(object): - def __init__(self, host = None, client_id = None, namespace = 'kubeflow'): - self._client = Client(host, client_id, namespace) - self._namespace = namespace +@click.group() +@click.option('--endpoint', help='Endpoint of the KFP API service to connect.') +@click.option('--iap-client-id', help='Client ID for IAP protected endpoint.') +@click.option('-n', '--namespace', default='kubeflow', help='Kubernetes namespace to connect to the KFP API.') +@click.pass_context +def cli(ctx, endpoint, iap_client_id, namespace): + """kfp is the command line interface to KFP service.""" + ctx.obj['client'] = Client(endpoint, iap_client_id, namespace) + ctx.obj['namespace']= namespace - def run(self, experiment_name, run_name = None, pipeline_package_path = None, pipeline_id = None, watch = True, args = {}): - if not run_name: - run_name = experiment_name +@cli.command() +@click.option('-e', '--experiment-id', help='Parent experiment ID of listed runs.') +@click.option('--max-size', default=100, help='Max size of the listed runs.') +@click.pass_context +def list_runs(ctx, experiment_id, max_size): + """list recent KFP runs""" + client = ctx.obj['client'] + response = client.list_runs(experiment_id=experiment_id, page_size=max_size, sort_by='created_at desc') + _print_runs(response.runs) - if not pipeline_package_path and not pipeline_id: - print('You must provide one of [pipeline_package_path, pipeline_id].') - sys.exit(1) +@cli.command() +@click.option('-e', '--experiment-name', required=True, help='Experiment name of the run.') +@click.option('-r', '--run-name', help='Name of the run.') +@click.option('-f', '--package-file', type=click.Path(exists=True, dir_okay=False), help='Path of the pipeline package file.') +@click.option('-p', '--pipeline-id', help='ID of the pipeline template.') +@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') +@click.argument('args', nargs=-1) +@click.pass_context +def run(ctx, experiment_name, run_name, package_file, pipeline_id, watch, args): + """submit a KFP run""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + if not run_name: + run_name = experiment_name - print(args) - experiment = self._client.create_experiment(experiment_name) - run = self._client.run_pipeline(experiment.id, run_name, pipeline_package_path, args, pipeline_id) - print('Run {} is submitted'.format(run.id)) - self.get_run(run.id, watch) + if not package_file and not pipeline_id: + print('You must provide one of [package_file, pipeline_id].') + sys.exit(1) - def get_run(self, run_id, watch=True): - run = self._client.get_run(run_id).run - self._print_runs([run]) - if not watch: - return - argo_workflow_name = None - while True: - time.sleep(1) - run_detail = self._client.get_run(run_id) - run = run_detail.run - if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: - manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) - if manifest['metadata'] and manifest['metadata']['name']: - argo_workflow_name = manifest['metadata']['name'] - break - if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: - print('Run is finished with status {}.'.format(run_detail.run.status)) - return - if argo_workflow_name: - subprocess.run(['argo', 'watch', argo_workflow_name, '-n', self._namespace]) - self._print_runs([run]) + arg_dict = dict(arg.split('=') for arg in args) + experiment = client.create_experiment(experiment_name) + run = client.run_pipeline(experiment.id, run_name, package_file, arg_dict, pipeline_id) + print('Run {} is submitted'.format(run.id)) + _display_run(client, namespace, run.id, watch) - def list_runs(self, experiment_id = None, page_size=100): - response = self._client.list_runs(experiment_id=experiment_id, page_size=page_size, sort_by='created_at desc') - self._print_runs(response.runs) +@cli.command() +@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') +@click.argument('run-id') +@click.pass_context +def get_run(ctx, watch, run_id): + """display the details of a KFP run""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + _display_run(client, namespace, run_id, watch) - def _print_runs(self, runs): - headers = ['run id', 'name', 'status', 'created at'] - data = [[run.id, run.name, run.status, run.created_at.isoformat()] for run in runs] - print(tabulate(data, headers=headers, tablefmt='grid')) +def _display_run(client, namespace, run_id, watch): + run = client.get_run(run_id).run + _print_runs([run]) + if not watch: + return + argo_workflow_name = None + while True: + time.sleep(1) + run_detail = client.get_run(run_id) + run = run_detail.run + if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: + manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) + if manifest['metadata'] and manifest['metadata']['name']: + argo_workflow_name = manifest['metadata']['name'] + break + if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: + print('Run is finished with status {}.'.format(run_detail.run.status)) + return + if argo_workflow_name: + subprocess.run(['argo', 'watch', argo_workflow_name, '-n', namespace]) + _print_runs([run]) +def _print_runs(runs): + headers = ['run id', 'name', 'status', 'created at'] + data = [[run.id, run.name, run.status, run.created_at.isoformat()] for run in runs] + print(tabulate(data, headers=headers, tablefmt='grid')) def main(): - fire.Fire(KFPWrapper, name='kfp') + cli(obj={}, auto_envvar_prefix='KFP') if __name__ == '__main__': main() \ No newline at end of file diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index 4c6e5a59016..dee38d3cae9 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -12,3 +12,5 @@ requests_toolbelt>=0.8.0 kfp-server-api >= 0.1.18, < 0.1.19 argo-models == 2.2.1a jsonschema >= 3.0.1 +tabulate == 0.8.3 +click == 7.0 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 9571ab1e890..9f4c5e4eb24 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -34,7 +34,7 @@ 'argo-models == 2.2.1a', #2.2.1a is equivalent to argo 2.2.1 'jsonschema >= 3.0.1', 'tabulate == 0.8.3', - 'fire == 0.1.3' + 'click == 7.0' ] setup( From 107a75894490b0dde6550d756c74479fa03bed23 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Fri, 7 Jun 2019 16:29:44 -0700 Subject: [PATCH 4/4] Refactor the cli command groups. --- sdk/python/kfp/__main__.py | 98 ++----------------------------- sdk/python/kfp/cli/__init__.py | 13 +++++ sdk/python/kfp/cli/cli.py | 31 ++++++++++ sdk/python/kfp/cli/run.py | 103 +++++++++++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 93 deletions(-) create mode 100644 sdk/python/kfp/cli/__init__.py create mode 100644 sdk/python/kfp/cli/cli.py create mode 100644 sdk/python/kfp/cli/run.py diff --git a/sdk/python/kfp/__main__.py b/sdk/python/kfp/__main__.py index 38294cf73d7..545e54ff4eb 100644 --- a/sdk/python/kfp/__main__.py +++ b/sdk/python/kfp/__main__.py @@ -12,100 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._client import Client -import sys -import subprocess -import pprint -import time -import json -import click +from .cli.cli import main -from tabulate import tabulate - -@click.group() -@click.option('--endpoint', help='Endpoint of the KFP API service to connect.') -@click.option('--iap-client-id', help='Client ID for IAP protected endpoint.') -@click.option('-n', '--namespace', default='kubeflow', help='Kubernetes namespace to connect to the KFP API.') -@click.pass_context -def cli(ctx, endpoint, iap_client_id, namespace): - """kfp is the command line interface to KFP service.""" - ctx.obj['client'] = Client(endpoint, iap_client_id, namespace) - ctx.obj['namespace']= namespace - -@cli.command() -@click.option('-e', '--experiment-id', help='Parent experiment ID of listed runs.') -@click.option('--max-size', default=100, help='Max size of the listed runs.') -@click.pass_context -def list_runs(ctx, experiment_id, max_size): - """list recent KFP runs""" - client = ctx.obj['client'] - response = client.list_runs(experiment_id=experiment_id, page_size=max_size, sort_by='created_at desc') - _print_runs(response.runs) - -@cli.command() -@click.option('-e', '--experiment-name', required=True, help='Experiment name of the run.') -@click.option('-r', '--run-name', help='Name of the run.') -@click.option('-f', '--package-file', type=click.Path(exists=True, dir_okay=False), help='Path of the pipeline package file.') -@click.option('-p', '--pipeline-id', help='ID of the pipeline template.') -@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') -@click.argument('args', nargs=-1) -@click.pass_context -def run(ctx, experiment_name, run_name, package_file, pipeline_id, watch, args): - """submit a KFP run""" - client = ctx.obj['client'] - namespace = ctx.obj['namespace'] - if not run_name: - run_name = experiment_name - - if not package_file and not pipeline_id: - print('You must provide one of [package_file, pipeline_id].') - sys.exit(1) - - arg_dict = dict(arg.split('=') for arg in args) - experiment = client.create_experiment(experiment_name) - run = client.run_pipeline(experiment.id, run_name, package_file, arg_dict, pipeline_id) - print('Run {} is submitted'.format(run.id)) - _display_run(client, namespace, run.id, watch) - -@cli.command() -@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') -@click.argument('run-id') -@click.pass_context -def get_run(ctx, watch, run_id): - """display the details of a KFP run""" - client = ctx.obj['client'] - namespace = ctx.obj['namespace'] - _display_run(client, namespace, run_id, watch) - -def _display_run(client, namespace, run_id, watch): - run = client.get_run(run_id).run - _print_runs([run]) - if not watch: - return - argo_workflow_name = None - while True: - time.sleep(1) - run_detail = client.get_run(run_id) - run = run_detail.run - if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: - manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) - if manifest['metadata'] and manifest['metadata']['name']: - argo_workflow_name = manifest['metadata']['name'] - break - if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: - print('Run is finished with status {}.'.format(run_detail.run.status)) - return - if argo_workflow_name: - subprocess.run(['argo', 'watch', argo_workflow_name, '-n', namespace]) - _print_runs([run]) - -def _print_runs(runs): - headers = ['run id', 'name', 'status', 'created at'] - data = [[run.id, run.name, run.status, run.created_at.isoformat()] for run in runs] - print(tabulate(data, headers=headers, tablefmt='grid')) - -def main(): - cli(obj={}, auto_envvar_prefix='KFP') +# TODO(hongyes): add more commands: +# kfp compile (migrate from dsl-compile) +# kfp experiment (manage experiments) +# kfp pipeline (manage pipelines) if __name__ == '__main__': main() \ No newline at end of file diff --git a/sdk/python/kfp/cli/__init__.py b/sdk/python/kfp/cli/__init__.py new file mode 100644 index 00000000000..c2fc82ab83f --- /dev/null +++ b/sdk/python/kfp/cli/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/sdk/python/kfp/cli/cli.py b/sdk/python/kfp/cli/cli.py new file mode 100644 index 00000000000..1db6469ffd0 --- /dev/null +++ b/sdk/python/kfp/cli/cli.py @@ -0,0 +1,31 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import click +from .._client import Client +from .run import run + +@click.group() +@click.option('--endpoint', help='Endpoint of the KFP API service to connect.') +@click.option('--iap-client-id', help='Client ID for IAP protected endpoint.') +@click.option('-n', '--namespace', default='kubeflow', help='Kubernetes namespace to connect to the KFP API.') +@click.pass_context +def cli(ctx, endpoint, iap_client_id, namespace): + """kfp is the command line interface to KFP service.""" + ctx.obj['client'] = Client(endpoint, iap_client_id, namespace) + ctx.obj['namespace']= namespace + +def main(): + cli.add_command(run) + cli(obj={}, auto_envvar_prefix='KFP') \ No newline at end of file diff --git a/sdk/python/kfp/cli/run.py b/sdk/python/kfp/cli/run.py new file mode 100644 index 00000000000..4fb76661953 --- /dev/null +++ b/sdk/python/kfp/cli/run.py @@ -0,0 +1,103 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .._client import Client +import sys +import subprocess +import pprint +import time +import json +import click + +from tabulate import tabulate + +@click.group() +def run(): + """manage run resources""" + pass + +@run.command() +@click.option('-e', '--experiment-id', help='Parent experiment ID of listed runs.') +@click.option('--max-size', default=100, help='Max size of the listed runs.') +@click.pass_context +def list(ctx, experiment_id, max_size): + """list recent KFP runs""" + client = ctx.obj['client'] + response = client.list_runs(experiment_id=experiment_id, page_size=max_size, sort_by='created_at desc') + if response and response.runs: + _print_runs(response.runs) + else: + print('No runs found.') + +@run.command() +@click.option('-e', '--experiment-name', required=True, help='Experiment name of the run.') +@click.option('-r', '--run-name', help='Name of the run.') +@click.option('-f', '--package-file', type=click.Path(exists=True, dir_okay=False), help='Path of the pipeline package file.') +@click.option('-p', '--pipeline-id', help='ID of the pipeline template.') +@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') +@click.argument('args', nargs=-1) +@click.pass_context +def submit(ctx, experiment_name, run_name, package_file, pipeline_id, watch, args): + """submit a KFP run""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + if not run_name: + run_name = experiment_name + + if not package_file and not pipeline_id: + print('You must provide one of [package_file, pipeline_id].') + sys.exit(1) + + arg_dict = dict(arg.split('=') for arg in args) + experiment = client.create_experiment(experiment_name) + run = client.run_pipeline(experiment.id, run_name, package_file, arg_dict, pipeline_id) + print('Run {} is submitted'.format(run.id)) + _display_run(client, namespace, run.id, watch) + +@run.command() +@click.option('-w', '--watch', is_flag=True, default=False, help='Watch the run status until it finishes.') +@click.argument('run-id') +@click.pass_context +def get(ctx, watch, run_id): + """display the details of a KFP run""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + _display_run(client, namespace, run_id, watch) + +def _display_run(client, namespace, run_id, watch): + run = client.get_run(run_id).run + _print_runs([run]) + if not watch: + return + argo_workflow_name = None + while True: + time.sleep(1) + run_detail = client.get_run(run_id) + run = run_detail.run + if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: + manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) + if manifest['metadata'] and manifest['metadata']['name']: + argo_workflow_name = manifest['metadata']['name'] + break + if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: + print('Run is finished with status {}.'.format(run_detail.run.status)) + return + if argo_workflow_name: + subprocess.run(['argo', 'watch', argo_workflow_name, '-n', namespace]) + _print_runs([run]) + +def _print_runs(runs): + headers = ['run id', 'name', 'status', 'created at'] + data = [[run.id, run.name, run.status, run.created_at.isoformat()] for run in runs] + print(tabulate(data, headers=headers, tablefmt='grid')) \ No newline at end of file