diff --git a/components/arena/docker/Dockerfile b/components/arena/docker/Dockerfile new file mode 100644 index 00000000000..ebf65b7ade8 --- /dev/null +++ b/components/arena/docker/Dockerfile @@ -0,0 +1,44 @@ +FROM golang:1.10-stretch as build + +RUN mkdir -p /go/src/github.com/kubeflow && \ + cd /go/src/github.com/kubeflow && \ + git clone https://github.com/kubeflow/arena.git + +WORKDIR /go/src/github.com/kubeflow/arena + +RUN cd /go/src/github.com/kubeflow/arena && make + +RUN wget https://storage.googleapis.com/kubernetes-helm/helm-v2.9.1-linux-amd64.tar.gz && \ + tar -xvf helm-v2.9.1-linux-amd64.tar.gz && \ + mv linux-amd64/helm /usr/local/bin/helm && \ + chmod u+x /usr/local/bin/helm + +ENV K8S_VERSION v1.11.2 +RUN curl -o /usr/local/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/${K8S_VERSION}/bin/linux/amd64/kubectl && chmod +x /usr/local/bin/kubectl + +# FROM python:3.6.8-stretch + +FROM python:3.7-alpine3.9 + +RUN apk update && \ + apk add --no-cache ca-certificates py-dev python-setuptools wget unzip git bash \ + rm -rf /var/cache/apk/* + +RUN pip install --upgrade pip && \ + pip install pyyaml==3.12 six==1.11.0 requests==2.18.4 + +COPY --from=build /go/src/github.com/kubeflow/arena/bin/arena /usr/local/bin/arena + +COPY --from=build /usr/local/bin/helm /usr/local/bin/helm + +COPY --from=build /go/src/github.com/kubeflow/arena/kubernetes-artifacts /root/kubernetes-artifacts + +COPY --from=build /usr/local/bin/kubectl /usr/local/bin/kubectl + +COPY --from=build /go/src/github.com/kubeflow/arena/charts /charts + +COPY arena_launcher.py /root + +WORKDIR /root + +ENTRYPOINT ["python","arean_launcher.py"] diff --git a/components/arena/docker/arena_launcher.py b/components/arena/docker/arena_launcher.py new file mode 100644 index 00000000000..7fa528224a5 --- /dev/null +++ b/components/arena/docker/arena_launcher.py @@ -0,0 +1,397 @@ +""" +Usage: +python arena_launcher.py + --name=tf-test + --tensorboard=true + mpijob + --gpus=1 + --workers=2 + --image=registry.cn-hangzhou.aliyuncs.com/tensorflow-samples/horovod:0.13.11-tf1.10.0-torch0.4.0-py3.5 + -- + mpirun python /benchmarks/scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py --model resnet101 --batch_size 64 --variable_update horovod --train_dir=/training_logs --summary_verbosity=3 --save_summaries_steps=10 +""" +# TODO: Add unit/integration tests + +import argparse +import datetime +import json +import os +import sys +import logging +import requests +import subprocess +import six +import time +import yaml +from subprocess import Popen,PIPE +from shlex import split + +def setup_custom_logging(): + logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + +def _submit_job(command): + logging.info("command: {0}".format(command)) + try: + output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True) + result = output.decode() + except subprocess.CalledProcessError as exc: + print("Status : FAIL", exc.returncode, exc.output) + sys.exit(-1) + logging.info('Submit Job: %s.' % result) + +def _is_active_status(status): + logging.info("status: {0}".format(status)) + return status == 'PENDING' or status == 'RUNNING' + +def _is_pending_status(status): + logging.info("status: {0}".format(status)) + return status == 'PENDING' + +def _wait_job_done(name, job_type, timeout): + end_time = datetime.datetime.now() + timeout + logging.info("expect done time: {0}".format(end_time)) + status = _get_job_status(name, job_type) + while _is_active_status(status): + if datetime.datetime.now() > end_time: + timeoutMsg = "Timeout waiting for job {0} with job type {1} completing.".format(name ,job_type) + logging.error(timeoutMsg) + raise Exception(timeoutMsg) + time.sleep(3) + status = _get_job_status(name, job_type) + logging.info("job {0} with type {1} status is {2}".format(name, job_type, status)) + +def _wait_job_running(name, job_type, timeout): + end_time = datetime.datetime.now() + timeout + logging.info("expect running time: {0}".format(end_time)) + status = _get_job_status(name, job_type) + while _is_pending_status(status): + if datetime.datetime.now() > end_time: + timeoutMsg = "Timeout waiting for job {0} with job type {1} running.".format(name ,job_type) + logging.error(timeoutMsg) + raise Exception(timeoutMsg) + time.sleep(3) + status = _get_job_status(name, job_type) + logging.info("job {0} with type {1} status is {2}".format(name, job_type, status)) + +def _job_logging(name, job_type): + logging_cmd = "arena logs -f %s" % (name) + process = Popen(split(logging_cmd), stdout = PIPE, stderr = PIPE, encoding='utf8') + while True: + output = process.stdout.readline() + if output == "" and process.poll() is not None: + break + if output: + # print("", output.strip()) + logging.info(output.strip()) + rc = process.poll() + return rc + +def _collect_metrics(name, job_type, metric_name): + metrics_cmd = "arena logs --tail=50 %s | grep -e '%s=' -e '%s:' | tail -1" % (name, metric_name, metric_name) + metric = 0 + logging.info("search metric_name %s" % (metric_name)) + try: + import re + output = subprocess.check_output(metrics_cmd, stderr=subprocess.STDOUT, shell=True) + result = output.decode().strip() + split_unit='' + if metric_name+"=" in result: + split_unit="=" + elif metric_name+":" in result: + split_unit=":" + else: + return 0 + array = result.split("%s%s" % (metric_name, split_unit)) + if len(array) > 0: + logging.info(array) + result = re.findall(r'\d+\.*\d*',array[-1]) + metric = float(array[-1]) + except Exception as e: + logging.warning("Failed to get job status due to" + e) + return 0 + + return metric + + + +def _get_job_status(name, job_type): + get_cmd = "arena get %s --type %s | grep -i STATUS:|awk -F: '{print $NF}'" % (name, job_type) + status = "" + try: + output=subprocess.check_output(get_cmd, stderr=subprocess.STDOUT, shell=True) + status = output.decode() + status = status.strip() + except subprocess.CalledProcessError as e: + logging.warning("Failed to get job status due to" + e) + + return status + +def _get_tensorboard_url(name, job_type): + get_cmd = "arena get %s --type %s | tail -1" % (name, job_type) + url = "N/A" + try: + output = subprocess.check_output(get_cmd, stderr=subprocess.STDOUT, shell=True) + url = output.decode() + except subprocess.CalledProcessError as e: + logging.warning("Failed to get job status due to" + e) + + return url + +# + +# Generate standalone job +def generate_job_command(args): + name = args.name + gpus = args.gpus + cpu = args.cpu + memory = args.memory + tensorboard = args.tensorboard + image = args.image + output_data = args.output_data + data = args.data + env = args.env + tensorboard_image = args.tensorboard_image + tensorboard = str2bool(args.tensorboard) + log_dir = args.log_dir + + commandArray = [ + 'arena', 'submit', 'tfjob', + '--name={0}'.format(name), + '--image={0}'.format(image), + ] + + if gpus > 0: + commandArray.append("--gpus={0}".format(gpus)) + + if cpu > 0: + commandArray.append("--cpu={0}".format(cpu)) + + if memory >0: + commandArray.append("--memory={0}".format(memory)) + + if tensorboard_image != "tensorflow/tensorflow:1.12.0": + commandArray.append("--tensorboardImage={0}".format(tensorboard_image)) + + if tensorboard: + commandArray.append("--tensorboard") + + if os.path.isdir(args.log_dir): + commandArray.append("--logdir={0}".format(args.log_dir)) + else: + logging.info("skip log dir :{0}".format(args.log_dir)) + + if len(data) > 0: + for d in data: + commandArray.append("--data={0}".format(d)) + + if len(env) > 0: + for e in env: + commandArray.append("--env={0}".format(e)) + + return commandArray, "tfjob" + +# Generate mpi job +def generate_mpjob_command(args): + name = args.name + workers = args.workers + gpus = args.gpus + cpu = args.cpu + memory = args.memory + tensorboard = args.tensorboard + image = args.image + output_data = args.output_data + data = args.data + env = args.env + tensorboard_image = args.tensorboard_image + tensorboard = str2bool(args.tensorboard) + rdma = str2bool(args.rdma) + log_dir = args.log_dir + + commandArray = [ + 'arena', 'submit', 'mpijob', + '--name={0}'.format(name), + '--workers={0}'.format(workers), + '--image={0}'.format(image), + ] + + if gpus > 0: + commandArray.append("--gpus={0}".format(gpus)) + + if cpu > 0: + commandArray.append("--cpu={0}".format(cpu)) + + if memory >0: + commandArray.append("--memory={0}".format(memory)) + + if tensorboard_image != "tensorflow/tensorflow:1.12.0": + commandArray.append("--tensorboardImage={0}".format(tensorboard_image)) + + if tensorboard: + commandArray.append("--tensorboard") + + if rdma: + commandArray.append("--rdma") + + if os.path.isdir(args.log_dir): + commandArray.append("--logdir={0}".format(args.log_dir)) + else: + logging.info("skip log dir :{0}".format(args.log_dir)) + + if len(data) > 0: + for d in data: + commandArray.append("--data={0}".format(d)) + + if len(env) > 0: + for e in env: + commandArray.append("--env={0}".format(e)) + + return commandArray, "mpijob" + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + + +def main(argv=None): + setup_custom_logging() + import sys + all_args = sys.argv[1:] + logging.info("args: {0}".format(' '.join(sys.argv))) + parser = argparse.ArgumentParser(description='Arena launcher') + parser.add_argument('--name', type=str, + help='The job name to specify.',default=None) + parser.add_argument('--tensorboard', type=str, default="False") + parser.add_argument('--rdma', type=str, default="False") + parser.add_argument('--tensorboard-image', type=str, default='tensorflow/tensorflow:1.12.0') + parser.add_argument('--timeout-hours', type=int, + default=200, + help='Time in hours to wait for the Job submitted by arena to complete') + # parser.add_argument('--command', type=str) + parser.add_argument('--output-dir', type=str, default='') + parser.add_argument('--output-data', type=str, default='None') + parser.add_argument('--log-dir', type=str, default='') + + parser.add_argument('--image', type=str) + parser.add_argument('--gpus', type=int, default=0) + parser.add_argument('--cpu', type=int, default=0) + parser.add_argument('--memory', type=int, default=0) + parser.add_argument('--workers', type=int, default=2) + + parser.add_argument('--env', action='append', type=str, default=[]) + parser.add_argument('--data', action='append', type=str, default=[]) + parser.add_argument('--metric', action='append', type=str, default=[]) + + subparsers = parser.add_subparsers(help='arena sub-command help') + + #create the parser for the 'mpijob' command + parser_mpi = subparsers.add_parser('mpijob', help='mpijob help') + parser_mpi.set_defaults(func=generate_mpjob_command) + + #create the parser for the 'job' command + parser_job = subparsers.add_parser('job', help='job help') + parser_job.set_defaults(func=generate_job_command) + + + separator_idx = all_args.index('--') + launcher_args = all_args[:separator_idx] + remaining_args = all_args[separator_idx + 1:] + + args = parser.parse_args(launcher_args) + commandArray, job_type = args.func(args) + + args_dict = vars(args) + if args.name is None: + logging.error("Please specify the name") + sys.exit(-1) + if len(remaining_args) == 0: + logging.error("Please specify the command.") + sys.exit(-1) + + internalCommand = ' '.join(remaining_args) + + name = args.name + fullname = name + datetime.datetime.now().strftime("%Y%M%d%H%M%S") + timeout_hours = args_dict.pop('timeout_hours') + logging.info("timeout_hours: {0}".format(timeout_hours)) + + enableTensorboard = str2bool(args.tensorboard) + + commandArray.append('"{0}"'.format(internalCommand)) + command = ' '.join(commandArray) + + command=command.replace("--name={0}".format(name),"--name={0}".format(fullname)) + + logging.info('Start training {0}.'.format(command)) + + _submit_job(command) + + #with open('/mlpipeline-ui-metadata.json', 'w') as f: + # json.dump(metadata, f) + + + succ = True + + # wait for job done + # _wait_job_done(fullname, job_type, datetime.timedelta(minutes=timeout_hours)) + _wait_job_running(fullname, job_type, datetime.timedelta(minutes=30)) + + rc = _job_logging(fullname, job_type) + logging.info("rc: {0}".format(rc)) + + _wait_job_done(fullname, job_type, datetime.timedelta(hours=timeout_hours)) + + status = _get_job_status(fullname, job_type) + + if status == "SUCCEEDED": + logging.info("Training Job {0} success.".format(fullname)) + if len(args.metric) > 0: + metrics_data = { + 'metrics': [] + } + metric_list = [] + metric_unit="RAW" + for m in args.metric: + mArray = m.split(":") + metric_name = mArray[0] + if len(mArray) > 1: + metric_unit = mArray[1] + logging.info("determine metric name {0} with metric unit {1}".format(metric_name, metric_unit)) + value = _collect_metrics(fullname, job_type, metric_name) + if value > 0: + import re + p = re.compile('^[a-z]([-a-z0-9]{0,62}[a-z0-9])?') + result = p.search(metric_name.lower()) + if result is None: + logging.info("Failed to parse metric_name {0},skip".format(metric_name)) + continue + else: + metric_name=result.group(0) + + metric_data = { + 'name': metric_name.lower(), # The name of the metric. Visualized as the column name in the runs table. + 'numberValue': value, # The value of the metric. Must be a numeric value. + 'format': metric_unit, # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format). + } + logging.info("metric data: {0}".format(metric_data)) + metric_list.append(metric_data) + metrics_data['metrics'] = metric_list + with open('/mlpipeline-metrics.json', 'w') as f: + logging.info("metrics: {0}".format(metrics_data)) + json.dump(metrics_data, f) + logging.info("write down /mlpipeline-metrics.json") + elif status == "FAILED": + logging.error("Training Job {0} fail.".format(fullname)) + sys.exit(-1) + else: + logging.error("Training Job {0}'s status {1}".format(fullname, status)) + sys.exit(-1) + + # TODO(cheyang): copy the output.txt from training job + output="" + with open('/output.txt', 'w') as f: + f.write(output) + + +if __name__== "__main__": + main() diff --git a/components/arena/docker/requirements.txt b/components/arena/docker/requirements.txt new file mode 100644 index 00000000000..0795e0a4f9b --- /dev/null +++ b/components/arena/docker/requirements.txt @@ -0,0 +1,3 @@ +requests +six +pyyaml \ No newline at end of file diff --git a/components/arena/python/arena/__init__.py b/components/arena/python/arena/__init__.py new file mode 100644 index 00000000000..b3422656e47 --- /dev/null +++ b/components/arena/python/arena/__init__.py @@ -0,0 +1,17 @@ +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._arena_mpi_op import mpi_job_op +from ._arena_standalone_op import standalone_job_op +from ._arena_distributed_tf_op import estimator_op, parameter_servers_op diff --git a/components/arena/python/arena/_arena_distributed_tf_op.py b/components/arena/python/arena/_arena_distributed_tf_op.py new file mode 100644 index 00000000000..f8a09065fb6 --- /dev/null +++ b/components/arena/python/arena/_arena_distributed_tf_op.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl +import datetime +import logging + + +def estimator_op(name, image, command, + chief_cpu, chief_memory, chief_port, + workers, worker_image, worker_cpu, worker_memory, + parameter_servers, ps_image, ps_cpu, ps_memory, ps_port, + gpus, rdma, + tensorboard, + worker_port, annotations=[], + evaluator=False, evaluator_cpu=0, evaluator_memory=0, + env=[], data=[], sync_source=None, + metrics=['Train-accuracy:PERCENTAGE'], + arena_image='cheyang/arena_launcher:v0.3', + timeout_hours=240): + + """This function submits Distributed TFJob in Estimator mode. + + Args: + name: the name of parameter_servers_op + image: the docker image name of training job + data: specify the datasource to mount to the job, like : + command: the command to run + """ + return distributed_tf_op(name=name, image=image, command=command, envs=envs, data=data, sync_source=sync_source, + workers=workers, worker_image=worker_image, worker_cpu=worker_cpu, worker_memory=worker_memory, + parameter_servers=parameter_servers, ps_image=ps_image, ps_cpu=ps_cpu, ps_memory=ps_memory, + gpus=gpus, rdma=rdma, + chief=True, + chief_cpu=chief_cpu, + worker_port=worker_port, + ps_port=ps_port, + tensorboard=tensorboard, + metrics=metrics, + arena_image=arena_image, + timeout_hours=timeout_hours) + +# def DistributeTFOp(name, image, gpus: int, ): + +def parameter_servers_op(name, image, command, env, data, sync_source, annotations, + workers, worker_image, worker_cpu, worker_memory, + parameter_servers, ps_image, ps_cpu, ps_memory, + gpus, rdma, + tensorboard, + worker_port, ps_port, + metrics=['Train-accuracy:PERCENTAGE'], + arena_image='cheyang/arena_launcher:v0.3', + timeout_hours=240): + + """This function submits Distributed TFJob in Parameter Servers mode. + + Args: + name: the name of parameter_servers_op + image: the docker image name of training job + data: specify the datasource to mount to the job, like : + command: the command to run + """ + return distributed_tf_op(name=name, image=image, command=command, envs=envs, data=data, sync_source=sync_source, + workers=workers, worker_image=worker_image, worker_cpu=worker_cpu, worker_memory=worker_memory, + parameter_servers=parameter_servers, ps_image=ps_image, ps_cpu=ps_cpu, ps_memory=ps_memory, + gpus=gpus, rdma=rdma, + worker_port=worker_port, + ps_port=ps_port, + tensorboard=tensorboard, + metrics=metrics, + arena_image=arena_image, + timeout_hours=timeout_hours) + + + +def distributed_tf_op(name, image, command, env=[], data=[], sync_source=None, + chief=False, chief_cpu=0, chief_memory=0, + workers=0, worker_image=None, worker_cpu=0, worker_memory=0, + parameter_servers=0, ps_image=None, ps_cpu=0, ps_memory=0, + evaluator=False, evaluator_cpu=0, evaluator_memory=0, + gpus=0, rdma=False, + chief_port=22222, + worker_port=22222, + ps_port=22224, + tensorboard=False, + metrics=['Train-accuracy:PERCENTAGE'], + arena_image='cheyang/arena_launcher:v0.3', + timeout_hours=240): + """This function submits Distributed TFJob in Distributed mode. + + Args: + name: the name of distributed_tf_op + image: the docker image name of training job + data: specify the datasource to mount to the job, like : + command: the command to run + """ + return dsl.ContainerOp( + name=name, + image=arena_image, + command=['python','arena_launcher.py'], + arguments=[ "--name", name, + "--tensorboard", tensorboard, + "--rdma", rdma, + "--data", data, + "--output-data", output_data, + "--image", image, + "--gpus", gpus, + "--cpu", cpu, + "--memory", memory, + "--timeout-hours", timeout_hours, + "--metric-name", metric_name, + "--metric-unit", metric_unit, + "tfjob", + "--workers", workers, + "--", command], + file_outputs={'train': '/output.txt'} + ) diff --git a/components/arena/python/arena/_arena_mpi_op.py b/components/arena/python/arena/_arena_mpi_op.py new file mode 100644 index 00000000000..3cd8dbaada6 --- /dev/null +++ b/components/arena/python/arena/_arena_mpi_op.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl +import datetime +import logging + +def mpi_job_op(name, image, command, workers=1, gpus=0, cpu=0, memory=0, env=[],annotations=[], + data=[], sync_source=None, + rdma=False, + tensorboard=False, tensorboard_image=None, + metrics=['Train-accuracy:PERCENTAGE'], + arenaImage='cheyang/arena_launcher:v0.3', + timeout_hours=240): + """This function submits MPI Job, it can run Allreduce-style Distributed Training. + + Args: + name: the name of mpi_job_op + image: the docker image name of training job + data: specify the datasource to mount to the job, like : + command: the command to run + """ + if not name: + raise ValueError("name must be specified") + if not image: + raise ValueError("image must be specified") + if not command: + raise ValueError("command must be specified") + + options = [] + if sync_source: + if not sync_source.startswith("http"): + raise ValueError("sync_source must be an http git url") + options.append('--sync-source') + options.append(str(sync_source)) + + for e in env: + options.append('--env') + options.append(str(e)) + + for d in data: + options.append('--data') + options.append(str(d)) + + for m in metrics: + options.append('--metric') + options.append(str(m)) + + if tensorboard_image: + options.append('--tensorboard-image') + options.append(str(tensorboard_image)) + + return dsl.ContainerOp( + name=name, + image=arenaImage, + command=['python','arena_launcher.py'], + arguments=[ "--name", name, + "--tensorboard", str(tensorboard), + "--rdma", str(rdma), + "--image", str(image), + "--gpus", str(gpus), + "--cpu", str(cpu), + "--memory", str(memory), + "--workers", str(workers), + "--timeout-hours", str(timeout_hours), + ] + options + + [ + "mpijob", + "--", str(command)], + file_outputs={'train': '/output.txt'} + ) \ No newline at end of file diff --git a/components/arena/python/arena/_arena_standalone_op.py b/components/arena/python/arena/_arena_standalone_op.py new file mode 100644 index 00000000000..e71f9e00b7c --- /dev/null +++ b/components/arena/python/arena/_arena_standalone_op.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl +import datetime +import logging + + +def standalone_job_op(name, image, command, gpus=0, cpu=0, memory=0, env=[], + tensorboard=False, tensorboard_image=None, + data=[], sync_source=None, annotations=[], + metrics=['Train-accuracy:PERCENTAGE'], + arena_image='cheyang/arena_launcher:v0.3', + timeout_hours=240): + + """This function submits a standalone training Job + + Args: + name: the name of standalone_job_op + image: the docker image name of training job + mount: specify the datasource to mount to the job, like : + command: the command to run + """ + if not name: + raise ValueError("name must be specified") + if not image: + raise ValueError("image must be specified") + if not command: + raise ValueError("command must be specified") + + options = [] + if sync_source: + if not sync_source.startswith("http"): + raise ValueError("sync_source must be an http git url") + options.append('--sync-source') + options.append(str(sync_source)) + + for e in env: + options.append('--env') + options.append(str(e)) + + for d in data: + options.append('--data') + options.append(str(d)) + + for m in metrics: + options.append('--metric') + options.append(str(m)) + + if tensorboard_image: + options.append('--tensorboard-image') + options.append(str(tensorboard_image)) + + return dsl.ContainerOp( + name=name, + image=arena_image, + command=['python','arena_launcher.py'], + arguments=[ "--name", name, + "--tensorboard", str(tensorboard), + "--image", str(image), + "--gpus", str(gpus), + "--cpu", str(cpu), + "--memory", str(memory), + "--timeout-hours", str(timeout_hours), + ] + options + + [ + "job", + "--", str(command)], + file_outputs={'train': '/output.txt'} + ) diff --git a/components/arena/python/arena/_utils.py b/components/arena/python/arena/_utils.py new file mode 100644 index 00000000000..783d22e560a --- /dev/null +++ b/components/arena/python/arena/_utils.py @@ -0,0 +1,8 @@ +# The default Data of training job +default_data = 'None' + +def set_defaultData(data): + default_data = data + +def get_defaultData(): + return default_data \ No newline at end of file diff --git a/components/arena/python/build.sh b/components/arena/python/build.sh new file mode 100755 index 00000000000..28809bc1074 --- /dev/null +++ b/components/arena/python/build.sh @@ -0,0 +1,21 @@ +#!/bin/bash -ex + +get_abs_filename() { + # $1 : relative filename + echo "$(cd "$(dirname "$1")" && pwd)/$(basename "$1")" +} + +target_archive_file=${1:-kfp-arena-0.3.tar.gz} +target_archive_file=$(get_abs_filename "$target_archive_file") + +DIR=$(mktemp -d) + + +cp -r arena $DIR +cp ./setup.py $DIR + +# Build tarball package. +cd $DIR +python setup.py sdist --format=gztar +cp $DIR/dist/*.tar.gz "$target_archive_file" +rm -rf $DIR \ No newline at end of file diff --git a/components/arena/python/setup.py b/components/arena/python/setup.py new file mode 100644 index 00000000000..ad65e966a02 --- /dev/null +++ b/components/arena/python/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup + + +NAME = 'kfp-arena' +VERSION = '0.3' + +REQUIRES = ['kfp >= 0.1'] + +setup( + name=NAME, + version=VERSION, + description='KubeFlow Pipelines Extended Arena SDK', + author='cheyang', + author_email="cheyang@163.com", + install_requires=REQUIRES, + packages=[ + 'arena', + ], + classifiers=[ + 'Intended Audience :: Developers', + 'Intended Audience :: Education', + 'Intended Audience :: Science/Research', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Topic :: Scientific/Engineering', + 'Topic :: Scientific/Engineering :: Artificial Intelligence', + 'Topic :: Software Development', + 'Topic :: Software Development :: Libraries', + 'Topic :: Software Development :: Libraries :: Python Modules', + ], + python_requires='>=3.5.3', + include_package_data=True +) diff --git a/samples/arena-samples/README.md b/samples/arena-samples/README.md new file mode 100644 index 00000000000..99d587acf02 --- /dev/null +++ b/samples/arena-samples/README.md @@ -0,0 +1,25 @@ +# Arena demo + +There are a series of examples about how to build deeplearning models with [Arena](https://github.com/kubeflow/arena). These demos will show how to run a pipeline standalone Job, MPI Job, TFJob(PS mode) and TensorFlow Estimator Job. + +## Setup + +1. Install the arena + +``` +kubectl create -f https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/arena-samples/arena.yaml +``` + +2. Add addtional RBAC role to service account `pipeline-runner` + +``` +kubectl create -f https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/arena-samples/arena_launcher_rbac.yaml +``` + +## Demos + +- [Standalone Job](standalonejob/README.md) +- [MPI Job]() +- [TensorFlow Estimator Job]() +- [TFJob]() + diff --git a/samples/arena-samples/arena.yaml b/samples/arena-samples/arena.yaml new file mode 100644 index 00000000000..5beecfafff8 --- /dev/null +++ b/samples/arena-samples/arena.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Pod +metadata: + name: arena-installer + namespace: kube-system +spec: + restartPolicy: Never + serviceAccountName: admin + hostNetwork: true + containers: + - name: arena + image: registry.cn-beijing.aliyuncs.com/acs/arena:0.2.0-f6b6188 \ No newline at end of file diff --git a/samples/arena-samples/arena_launcher_rbac.yaml b/samples/arena-samples/arena_launcher_rbac.yaml new file mode 100644 index 00000000000..e5e5aaf3cdc --- /dev/null +++ b/samples/arena-samples/arena_launcher_rbac.yaml @@ -0,0 +1,34 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: arena + namespace: kubeflow +rules: +- apiGroups: + - "" + resources: + - configmaps + verbs: + - '*' +- apiGroups: + - "" + resources: + - services/proxy + verbs: + - get + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: pipeline-runner-arena-role + namespace: kubeflow +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: arena +subjects: +- kind: ServiceAccount + name: pipeline-runner + namespace: kubeflow \ No newline at end of file diff --git a/samples/arena-samples/mpi/mpi_run.py b/samples/arena-samples/mpi/mpi_run.py new file mode 100644 index 00000000000..a373ee8c3c6 --- /dev/null +++ b/samples/arena-samples/mpi/mpi_run.py @@ -0,0 +1,60 @@ +import kfp +import arena +import kfp.dsl as dsl +import argparse + +FLAGS = None + +@dsl.pipeline( + name='pipeline to run mpi job', + description='shows how to run mpi job.' +) +def sample_mpirun_pipeline(image="registry.cn-hangzhou.aliyuncs.com/tensorflow-samples/horovod:0.13.11-tf1.10.0-torch0.4.0-py3.5", + batch_size="64", + optimizer='momentum'): + """A pipeline for end to end machine learning workflow.""" + data=["user-susan:/training"] + env=["NCCL_DEBUG=INFO", ""] + gpus=1 + + train=arena.mpi_job_op( + name="all-reduce", + image=image, + env=env, + command=""" + mpirun python /benchmarks/scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py --model resnet101 \ + --batch_size {0} --variable_update horovod --optimizer {1}\ + --summary_verbosity=3 --save_summaries_steps=10 + """.format(batch_size, optimizer) + ) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--image', type=str, + default="registry.cn-hangzhou.aliyuncs.com/tensorflow-samples/horovod:0.13.11-tf1.10.0-torch0.4.0-py3.5", + help='docker image.') + parser.add_argument('--batch_size', type=str, default="64", + help='batch size.') + parser.add_argument('--optimizer', type=str, default="momentum", + help='optimizer.') + FLAGS, unparsed = parser.parse_known_args() + + image = FLAGS.image + batch_size = FLAGS.batch_size + optimizer = FLAGS.optimizer + + EXPERIMENT_NAME="resnet101-allreduce" + RUN_ID="run" + KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888" + import kfp.compiler as compiler + compiler.Compiler().compile(sample_mpirun_pipeline, __file__ + '.tar.gz') + client = kfp.Client(host=KFP_SERVICE) + try: + experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id + except: + experiment_id = client.create_experiment(EXPERIMENT_NAME).id + run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz', + params={'image':image, + 'batch_size':batch_size, + 'optimizer':optimizer}) \ No newline at end of file diff --git a/samples/arena-samples/standalonejob/README.md b/samples/arena-samples/standalonejob/README.md new file mode 100644 index 00000000000..a09786da4c9 --- /dev/null +++ b/samples/arena-samples/standalonejob/README.md @@ -0,0 +1,98 @@ +# Run a Standalone Job + +The `standalone_pipeline.py` sample creates a pipeline runs preparing dataset, ML code, training and exporting the model. + +## Requirements + +- [Install arena](https://github.com/kubeflow/arena/blob/master/docs/installation/README.md) + +- This sample requires to create distributed storage. In this sample, we use NFS as example. + +1.You need to create `/data` in the NFS Server + +``` +# mkdir -p /nfs +# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs +# mkdir -p /data +# cd / +# umount /nfs +``` + +2\.Create Persistent Volume. Moidfy `NFS_SERVER_IP` to yours. + +``` +# cat nfs-pv.yaml +apiVersion: v1 +kind: PersistentVolume +metadata: + name: user-susan + labels: + user-susan: pipelines +spec: + persistentVolumeReclaimPolicy: Retain + capacity: + storage: 10Gi + accessModes: + - ReadWriteMany + nfs: + server: NFS_SERVER_IP + path: "/data" + + # kubectl create -f nfs-pv.yaml +``` + +3\.Create Persistent Volume Claim. + +``` +# cat nfs-pvc.yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: user-susan + annotations: + description: "this is the mnist demo" + owner: Tom +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 5Gi + selector: + matchLabels: + user-susan: pipelines +# kubectl create -f nfs-pvc.yaml +``` + +> Notice: suggest to add `description` and `owner` + +## Instructions + +### 1.With command line + +First, install the necessary Python Packages +```shell +pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade +pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.3.tar.gz --upgrade +``` + +Then run [standalone_pipeline.py](standalone_pipeline.py) with different parameters. + +``` +python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2 +``` + +``` +python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3 +``` + +### 2.With Jupyter Notebook +Run `jupyter notebook` to start running your jupyter server and load the notebook `standalone_pipeline.ipynb` + + +### 3.Compare the result in pipelines dashboard + + +![](demo.jpg) + + diff --git a/samples/arena-samples/standalonejob/demo.jpg b/samples/arena-samples/standalonejob/demo.jpg new file mode 100644 index 00000000000..88e1c3347a0 Binary files /dev/null and b/samples/arena-samples/standalonejob/demo.jpg differ diff --git a/samples/arena-samples/standalonejob/standalone_pipeline.ipynb b/samples/arena-samples/standalonejob/standalone_pipeline.ipynb new file mode 100644 index 00000000000..79cf6299020 --- /dev/null +++ b/samples/arena-samples/standalonejob/standalone_pipeline.ipynb @@ -0,0 +1,370 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Arena Kubeflow Pipeline Notebook demo\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prepare data volume\n", + "\n", + "You should prepare data volume `user-susan` by following [docs](https://github.com/kubeflow/arena/blob/master/docs/userguide/4-tfjob-distributed-data.md). \n", + "\n", + "And run `arena data list` to check if it's created." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "NAME ACCESSMODE DESCRIPTION OWNER AGE\r\n", + "katib-mysql ReadWriteOnce 49d\r\n", + "minio-pv-claim ReadWriteOnce 49d\r\n", + "mysql-pv-claim ReadWriteOnce 49d\r\n", + "user-susan ReadWriteMany 49d\r\n" + ] + } + ], + "source": [ + "! arena data list" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define the necessary environment variables and install the KubeFlow Pipeline SDK\n", + "We assume this notebook kernel has access to Python's site-packages and is in Python3.\n", + "\n", + "**Please fill in the below environment variables with you own settings.**\n", + "\n", + "- **EXPERIMENT_NAME**: A unique experiment name that will be created for this notebook demo.\n", + "- **KFP_PACKAGE**: The latest release of kubeflow pipeline platform library.\n", + "- **KUBEFLOW_PIPELINE_LINK**: The link to access the KubeFlow pipeline API.\n", + "- **MOUNT**: The mount configuration to map data above into the training job. The format is 'data:/directory'\n", + "- **GPUs**: The number of the GPUs for training.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "EXPERIMENT_NAME = 'myjob'\n", + "RUN_ID=\"run\"\n", + "KFP_SERVICE=\"ml-pipeline.kubeflow.svc.cluster.local:8888\"\n", + "KFP_PACKAGE = 'http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz'\n", + "KFP_ARENA_PACKAGE = 'http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.3.tar.gz'\n", + "KUBEFLOW_PIPELINE_LINK = ''\n", + "MOUNT=\"['user-susan:/training']\"\n", + "GPUs=1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install the necessary python packages\n", + "\n", + "Note: Please change pip3 to the package manager that's used for this Notebook Kernel." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collecting http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/v0.4.0/kfp.tar.gz\n", + "\u001b[?25l Downloading http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/v0.4.0/kfp.tar.gz (133kB)\n", + "\u001b[K 100% |████████████████████████████████| 143kB 2.0MB/s ta 0:00:01\n", + "\u001b[?25hRequirement already satisfied, skipping upgrade: urllib3>=1.15 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.22)\n", + "Requirement already satisfied, skipping upgrade: six>=1.10 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.11.0)\n", + "Requirement already satisfied, skipping upgrade: certifi in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2018.11.29)\n", + "Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.7.5)\n", + "Requirement already satisfied, skipping upgrade: PyYAML in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (3.13)\n", + "Requirement already satisfied, skipping upgrade: google-cloud-storage==1.13.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.13.0)\n", + "Requirement already satisfied, skipping upgrade: kubernetes==8.0.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (8.0.0)\n", + "Requirement already satisfied, skipping upgrade: PyJWT==1.6.4 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.4)\n", + "Requirement already satisfied, skipping upgrade: cryptography==2.4.2 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.4.2)\n", + "Requirement already satisfied, skipping upgrade: google-auth==1.6.1 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.1)\n", + "Requirement already satisfied, skipping upgrade: requests_toolbelt==0.8.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (0.8.0)\n", + "Requirement already satisfied, skipping upgrade: google-cloud-core<0.29dev,>=0.28.0 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.28.1)\n", + "Requirement already satisfied, skipping upgrade: google-resumable-media>=0.3.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.3.1)\n", + "Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=0.1.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (1.6.0)\n", + "Requirement already satisfied, skipping upgrade: adal>=1.0.2 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.2.1)\n", + "Requirement already satisfied, skipping upgrade: requests-oauthlib in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.0.0)\n", + "Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (0.54.0)\n", + "Requirement already satisfied, skipping upgrade: requests in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (2.18.4)\n", + "Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (38.4.0)\n", + "Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.7 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (1.11.4)\n", + "Requirement already satisfied, skipping upgrade: idna>=2.1 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (2.6)\n", + "Requirement already satisfied, skipping upgrade: asn1crypto>=0.21.0 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (0.24.0)\n", + "Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (0.2.2)\n", + "Requirement already satisfied, skipping upgrade: cachetools>=2.0.0 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (3.0.0)\n", + "Requirement already satisfied, skipping upgrade: rsa>=3.1.4 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (4.0)\n", + "Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (3.6.1)\n", + "Requirement already satisfied, skipping upgrade: pytz in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (2018.7)\n", + "Requirement already satisfied, skipping upgrade: googleapis-common-protos!=1.5.4,<2.0dev,>=1.5.3 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (1.5.5)\n", + "Requirement already satisfied, skipping upgrade: oauthlib>=0.6.2 in /opt/conda/lib/python3.6/site-packages (from requests-oauthlib->kubernetes==8.0.0->kfp==0.1) (2.1.0)\n", + "Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /opt/conda/lib/python3.6/site-packages (from requests->kubernetes==8.0.0->kfp==0.1) (3.0.4)\n", + "Requirement already satisfied, skipping upgrade: pycparser in /opt/conda/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.7->cryptography==2.4.2->kfp==0.1) (2.18)\n", + "Requirement already satisfied, skipping upgrade: pyasn1<0.5.0,>=0.4.1 in /opt/conda/lib/python3.6/site-packages (from pyasn1-modules>=0.2.1->google-auth==1.6.1->kfp==0.1) (0.4.4)\n", + "Building wheels for collected packages: kfp\n", + " Running setup.py bdist_wheel for kfp ... \u001b[?25ldone\n", + "\u001b[?25h Stored in directory: /tmp/pip-ephem-wheel-cache-1wm5ld15/wheels/f0/e0/47/2f1e28c1a54da10332867d1f9cc25bfb916c0a4b8ea47029db\n", + "Successfully built kfp\n", + "Installing collected packages: kfp\n", + " Found existing installation: kfp 0.1\n", + " Uninstalling kfp-0.1:\n", + " Successfully uninstalled kfp-0.1\n", + "Successfully installed kfp-0.1\n", + "\u001b[33mYou are using pip version 18.1, however version 19.0.3 is available.\n", + "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n" + ] + } + ], + "source": [ + "!pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note: Install arena's python package" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collecting http://kubeflow.oss-cn-beijing.aliyuncs.com/kip-arena/kfp-arena-0.1.tar.gz\n", + " Downloading http://kubeflow.oss-cn-beijing.aliyuncs.com/kip-arena/kfp-arena-0.1.tar.gz\n", + "Requirement already satisfied, skipping upgrade: kfp>=0.1 in /opt/conda/lib/python3.6/site-packages (from kfp-arena==0.1) (0.1)\n", + "Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (2.7.5)\n", + "Requirement already satisfied, skipping upgrade: urllib3>=1.15 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (1.22)\n", + "Requirement already satisfied, skipping upgrade: six>=1.10 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (1.11.0)\n", + "Requirement already satisfied, skipping upgrade: requests-toolbelt==0.8.0 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (0.8.0)\n", + "Requirement already satisfied, skipping upgrade: google-auth==1.6.1 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (1.6.1)\n", + "Requirement already satisfied, skipping upgrade: PyYAML in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (3.13)\n", + "Requirement already satisfied, skipping upgrade: kubernetes==8.0.0 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (8.0.0)\n", + "Requirement already satisfied, skipping upgrade: certifi in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (2018.11.29)\n", + "Requirement already satisfied, skipping upgrade: google-cloud-storage==1.13.0 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (1.13.0)\n", + "Requirement already satisfied, skipping upgrade: cryptography==2.4.2 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (2.4.2)\n", + "Requirement already satisfied, skipping upgrade: PyJWT==1.6.4 in /opt/conda/lib/python3.6/site-packages (from kfp>=0.1->kfp-arena==0.1) (1.6.4)\n", + "Requirement already satisfied, skipping upgrade: requests<3.0.0,>=2.0.1 in /opt/conda/lib/python3.6/site-packages (from requests-toolbelt==0.8.0->kfp>=0.1->kfp-arena==0.1) (2.18.4)\n", + "Requirement already satisfied, skipping upgrade: cachetools>=2.0.0 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp>=0.1->kfp-arena==0.1) (3.0.0)\n", + "Requirement already satisfied, skipping upgrade: rsa>=3.1.4 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp>=0.1->kfp-arena==0.1) (4.0)\n", + "Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp>=0.1->kfp-arena==0.1) (0.2.2)\n", + "Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp>=0.1->kfp-arena==0.1) (0.54.0)\n", + "Requirement already satisfied, skipping upgrade: requests-oauthlib in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp>=0.1->kfp-arena==0.1) (1.0.0)\n", + "Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp>=0.1->kfp-arena==0.1) (38.4.0)\n", + "Requirement already satisfied, skipping upgrade: adal>=1.0.2 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp>=0.1->kfp-arena==0.1) (1.2.1)\n", + "Requirement already satisfied, skipping upgrade: google-resumable-media>=0.3.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (0.3.1)\n", + "Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=0.1.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (1.6.0)\n", + "Requirement already satisfied, skipping upgrade: google-cloud-core<0.29dev,>=0.28.0 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (0.28.1)\n", + "Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.7 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp>=0.1->kfp-arena==0.1) (1.11.4)\n", + "Requirement already satisfied, skipping upgrade: asn1crypto>=0.21.0 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp>=0.1->kfp-arena==0.1) (0.24.0)\n", + "Requirement already satisfied, skipping upgrade: idna>=2.1 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp>=0.1->kfp-arena==0.1) (2.6)\n", + "Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /opt/conda/lib/python3.6/site-packages (from requests<3.0.0,>=2.0.1->requests-toolbelt==0.8.0->kfp>=0.1->kfp-arena==0.1) (3.0.4)\n", + "Requirement already satisfied, skipping upgrade: pyasn1>=0.1.3 in /opt/conda/lib/python3.6/site-packages (from rsa>=3.1.4->google-auth==1.6.1->kfp>=0.1->kfp-arena==0.1) (0.4.4)\n", + "Requirement already satisfied, skipping upgrade: oauthlib>=0.6.2 in /opt/conda/lib/python3.6/site-packages (from requests-oauthlib->kubernetes==8.0.0->kfp>=0.1->kfp-arena==0.1) (2.1.0)\n", + "Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (3.6.1)\n", + "Requirement already satisfied, skipping upgrade: pytz in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (2018.7)\n", + "Requirement already satisfied, skipping upgrade: googleapis-common-protos!=1.5.4,<2.0dev,>=1.5.3 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp>=0.1->kfp-arena==0.1) (1.5.5)\n", + "Requirement already satisfied, skipping upgrade: pycparser in /opt/conda/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.7->cryptography==2.4.2->kfp>=0.1->kfp-arena==0.1) (2.18)\n", + "Building wheels for collected packages: kfp-arena\n", + " Running setup.py bdist_wheel for kfp-arena ... \u001b[?25ldone\n", + "\u001b[?25h Stored in directory: /home/jovyan/.cache/pip/wheels/6a/d3/d5/f99c7966cacbcbad2922bf614c88c523c869c16d26e549a087\n", + "Successfully built kfp-arena\n", + "Installing collected packages: kfp-arena\n", + "Successfully installed kfp-arena-0.1\n", + "\u001b[33mYou are using pip version 18.1, however version 19.0.3 is available.\n", + "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n" + ] + } + ], + "source": [ + "!pip3 install $KFP_ARENA_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Import the KubeFlow Pipeline library and define the client and experiment " + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp\n", + "\n", + "client = kfp.Client(KUBEFLOW_PIPELINE_LINK)\n", + "\n", + "try:\n", + " experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id\n", + "except:\n", + " experiment_id = client.create_experiment(EXPERIMENT_NAME).id" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2. Define pipeline tasks using the kfp library. " + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "import arena\n", + "import kfp.dsl as dsl\n", + "import kfp.compiler as compiler\n", + "\n", + "@dsl.pipeline(\n", + " name='pipeline to run jobs',\n", + " description='shows how to run pipeline jobs.'\n", + ")\n", + "def sample_pipeline(learning_rate='0.01',\n", + " dropout='0.9',\n", + " model_version='1'):\n", + " \"\"\"A pipeline for end to end machine learning workflow.\"\"\"\n", + "\n", + " # 1. prepare data\n", + " prepare_data = arena.StandaloneOp(\n", + " name=\"prepare-data\",\n", + "\timage=\"byrnedo/alpine-curl\",\n", + " data=MOUNT,\n", + "\tcommand=\"mkdir -p /training/dataset/mnist && \\\n", + " cd /training/dataset/mnist && \\\n", + " curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \\\n", + " curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \\\n", + " curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \\\n", + " curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz\")\n", + " # 2. prepare source code\n", + " prepare_code = arena.StandaloneOp(\n", + " name=\"source-code\",\n", + " image=\"alpine/git\",\n", + " data=MOUNT,\n", + " command=\"mkdir -p /training/models/ && \\\n", + " cd /training/models/ && \\\n", + " if [ ! -d /training/models/tensorflow-sample-code ]; then https://github.com/cheyang/tensorflow-sample-code.git; else echo no need download;fi\")\n", + "\n", + " # 3. train the models\n", + " train = arena.StandaloneOp(\n", + " name=\"train\",\n", + " image=\"tensorflow/tensorflow:1.11.0-gpu-py3\",\n", + " gpus=GPUs,\n", + " data=MOUNT,\n", + " command=\"echo %s; \\\n", + " echo %s; \\\n", + " python /training/models/tensorflow-sample-code/tfjob/docker/mnist/main.py --max_steps 500 --data_dir /training/dataset/mnist --log_dir /training/output/mnist\" % (prepare_data.output, prepare_code.output),\n", + " metric_name=\"Train-accuracy\",\n", + " metric_unit=\"PERCENTAGE\",\n", + " )\n", + " # 4. export the model\n", + " export_model = arena.StandaloneOp(\n", + " name=\"export-model\",\n", + " image=\"tensorflow/tensorflow:1.11.0-py3\",\n", + " data=MOUNT,\n", + " command=\"echo %s; \\\n", + " python /training/models/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_step=400 --checkpoint_path=/training/output/mnist /training/output/models\" % (train.output,model_version))\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Run link here" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The above run link is assuming you ran this cell on JupyterHub that is deployed on the same cluster. The actual run link is /#/runs/details/cac8aef4-4aaa-11e9-8264-00163e13f33e\n" + ] + } + ], + "source": [ + "learning_rate = \"0.001\"\n", + "dropout = \"0.8\"\n", + "model_verison = \"1\"\n", + "\n", + "compiler.Compiler().compile(sample_pipeline, 'standalone.tar.gz')\n", + "\n", + "run = client.run_pipeline(experiment_id, 'mnist', 'standalone.tar.gz', params={'learning_rate':learning_rate,\n", + " 'dropout':dropout,\n", + " 'model_version':model_version})\n", + "\n", + "print('The above run link is assuming you ran this cell on JupyterHub that is deployed on the same cluster. ' +\n", + " 'The actual run link is ' + KUBEFLOW_PIPELINE_LINK + '/#/runs/details/' + run.id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/samples/arena-samples/standalonejob/standalone_pipeline.py b/samples/arena-samples/standalonejob/standalone_pipeline.py new file mode 100644 index 00000000000..749507a4573 --- /dev/null +++ b/samples/arena-samples/standalonejob/standalone_pipeline.py @@ -0,0 +1,82 @@ +import kfp +import arena +import kfp.dsl as dsl +import argparse + +FLAGS = None + +@dsl.pipeline( + name='pipeline to run jobs', + description='shows how to run pipeline jobs.' +) +def sample_pipeline(learning_rate='0.01', + dropout='0.9', + model_version='1'): + """A pipeline for end to end machine learning workflow.""" + data=["user-susan:/training"] + gpus=1 + + # 1. prepare data + prepare_data = arena.standalone_job_op( + name="prepare-data", + image="byrnedo/alpine-curl", + data=data, + command="mkdir -p /training/dataset/mnist && \ + cd /training/dataset/mnist && \ + curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ + curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ + curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ + curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz") + # 2. prepare source code + prepare_code = arena.standalone_job_op( + name="source-code", + image="alpine/git", + data=data, + command="mkdir -p /training/models/ && \ + cd /training/models/ && \ + if [ ! -d /training/models/tensorflow-sample-code ]; then git clone https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git; else echo no need download;fi") + + # 3. train the models + train = arena.standalone_job_op( + name="train", + image="tensorflow/tensorflow:1.11.0-gpu-py3", + gpus=gpus, + data=data, + command="echo %s;echo %s;python /training/models/tensorflow-sample-code/tfjob/docker/mnist/main.py --max_steps 500 --data_dir /training/dataset/mnist --log_dir /training/output/mnist --learning_rate %s --dropout %s" % (prepare_data.output, prepare_code.output, learning_rate, dropout), + metrics=["Train-accuracy:PERCENTAGE"]) + # 4. export the model + export_model = arena.standalone_job_op( + name="export-model", + image="tensorflow/tensorflow:1.11.0-py3", + data=data, + command="echo %s;python /training/models/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version)) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--model_version', type=str, + default="1", + help='model version.') + parser.add_argument('--dropout', type=str, default="0.9", + help='Keep probability for training dropout.') + parser.add_argument('--learning_rate', type=str, default="0.001", + help='Initial learning rate.') + FLAGS, unparsed = parser.parse_known_args() + + model_version = FLAGS.model_version + dropout = FLAGS.dropout + learning_rate = FLAGS.learning_rate + + EXPERIMENT_NAME="mnist" + RUN_ID="run" + KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888" + import kfp.compiler as compiler + compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz') + client = kfp.Client(host=KFP_SERVICE) + try: + experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id + except: + experiment_id = client.create_experiment(EXPERIMENT_NAME).id + run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz', + params={'learning_rate':learning_rate, + 'dropout':dropout, + 'model_version':model_version}) \ No newline at end of file