From 5b9591683785e94a128cdd82e0b550e745ed89de Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 12 Sep 2017 14:32:17 +0800 Subject: [PATCH 1/5] publish files --- go/cmd/paddlecloud/paddlecloud.go | 1 + paddlecloud/notebook/models.py | 6 +++ paddlecloud/paddlecloud/urls.py | 2 + paddlecloud/paddlejob/views.py | 61 ++++++++++++++++++++++++++++++- 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/go/cmd/paddlecloud/paddlecloud.go b/go/cmd/paddlecloud/paddlecloud.go index 5bc48fc0..a2a5872b 100644 --- a/go/cmd/paddlecloud/paddlecloud.go +++ b/go/cmd/paddlecloud/paddlecloud.go @@ -20,6 +20,7 @@ func main() { subcommands.Register(&paddlecloud.SimpleFileCmd{}, "") subcommands.Register(&paddlecloud.RegistryCmd{}, "") subcommands.Register(&paddlecloud.DeleteCommand{}, "") + subcommands.Register(&paddlecloud.PublishCmd{}, "") subcommands.Register(&pfsmod.LsCmd{}, "PFS") subcommands.Register(&pfsmod.CpCmd{}, "PFS") subcommands.Register(&pfsmod.RmCmd{}, "PFS") diff --git a/paddlecloud/notebook/models.py b/paddlecloud/notebook/models.py index 1bd0d388..1a732c87 100644 --- a/paddlecloud/notebook/models.py +++ b/paddlecloud/notebook/models.py @@ -9,3 +9,9 @@ class PaddleUser(models.Model): school = models.CharField(max_length=256) studentID = models.CharField(max_length=512) major = models.CharField(max_length=256) + +class FilePublish(models.Model): + path = models.CharField(max_length=4096) + url = models.CharField(max_length=4096) + uuid = models.CharField(max_length=256) + user = models.ForeignKey(User) diff --git a/paddlecloud/paddlecloud/urls.py b/paddlecloud/paddlecloud/urls.py index 202ef716..cb73a358 100644 --- a/paddlecloud/paddlecloud/urls.py +++ b/paddlecloud/paddlecloud/urls.py @@ -36,6 +36,8 @@ url(r"^api/v1/token2user/", paddlejob.views.GetUserView.as_view()), url(r"^api/v1/filelist/", paddlejob.views.SimpleFileList.as_view()), url(r"^api/v1/registry/", paddlejob.registry.RegistryView.as_view()), + url(r"^api/v1/publish/", paddlejob.views.FilePublishAPIView.as_view()), + url(r"^filepub/", paddlejob.views.file_publish_view), ] urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index d9e926df..e73e716d 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -1,6 +1,8 @@ -from django.http import HttpResponseRedirect, HttpResponse, JsonResponse +from django.http import HttpResponseRedirect, HttpResponse, JsonResponse, HttpResponseNotFound, HttpResponseForbidden from django.contrib import messages from django.conf import settings +from django.utils.encoding import smart_str +from django.contrib.auth.decorators import login_required from kubernetes import client, config from kubernetes.client.rest import ApiException from . import PaddleJob @@ -16,6 +18,63 @@ import volume import os import copy +from notebook.models import FilePublish +import uuid + +def file_publish_view(request): + """ + view for download published files + """ + username = request.user.username + publish_uuid = request.GET.get("uuid") + if not publish_uuid: + return HttpResponseNotFound() + record = FilePublish.objects.get(uuid=publish_uuid) + if not record: + return HttpResponseNotFound() + # FIXME(typhoonzero): not support folder currently + if record.path.endswith("/"): + return HttpResponseNotFound() + + real_path = "/".join([settings.STORAGE_PATH] + record.path.split("/")[4:]) + logging.info("downloading file from: %s, record(%s)", real_path, record.path) + + # mimetype is replaced by content_type for django 1.7 + response = HttpResponse(open(real_path), content_type='application/force-download') + response['Content-Disposition'] = 'attachment; filename=%s' % os.path.basename(record.path) + # It's usually a good idea to set the 'Content-Length' header too. + # You can also set any other required headers: Cache-Control, etc. + return response + +class FilePublishAPIView(APIView): + permission_classes = (permissions.IsAuthenticated,) + + def get(self, request, format=None): + """ + return a list of published files for current user + """ + record = FilePublish.objects.filter(user=request.user) + file_list = [rec.path for rec in record] + url_list = [rec.url for rec in record] + return Response({"files": file_list, "urls": url_list}) + + def post(self, request, format=None): + """ + given a pfs path generate a uniq sharing url for the path + """ + post_body = json.loads(request.body) + file_path = post_body.get("path") + publish_uuid = uuid.uuid4() + publish_url = "http://%s/filepub/?uuid=%s" % (request.META["HTTP_HOST"], publish_uuid) + # save publish_url to mysql + publish_record = FilePublish() + publish_record.url = publish_url + publish_record.user = request.user + publish_record.path = file_path + publish_record.uuid = publish_uuid + publish_record.save() + return Response({"url": publish_url}) + class JobsView(APIView): permission_classes = (permissions.IsAuthenticated,) From 04ab2f59db3a4fa9b192349d44c09fb394d1fc19 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 12 Sep 2017 14:33:41 +0800 Subject: [PATCH 2/5] publish files --- go/paddlecloud/publish.go | 57 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 go/paddlecloud/publish.go diff --git a/go/paddlecloud/publish.go b/go/paddlecloud/publish.go new file mode 100644 index 00000000..c7ec0d73 --- /dev/null +++ b/go/paddlecloud/publish.go @@ -0,0 +1,57 @@ +package paddlecloud + +import ( + "context" + "flag" + "fmt" + "net/url" + + "github.com/PaddlePaddle/cloud/go/utils/restclient" + "github.com/google/subcommands" +) + +// PublishCmd used for publish file for download and list published files. +type PublishCmd struct { +} + +// Name is subcommands name. +func (*PublishCmd) Name() string { return "publish" } + +// Synopsis is subcommands synopsis. +func (*PublishCmd) Synopsis() string { + return "publish file for download and list published files." +} + +// Usage is subcommands Usage. +func (*PublishCmd) Usage() string { + return `publish [path] + path must be like /pfs/[datacenter]/home/[username] + if path not specified, will return a list of current published files. +` +} + +// SetFlags registers subcommands flags. +func (p *PublishCmd) SetFlags(f *flag.FlagSet) { +} + +// Execute publish ops. +func (p *PublishCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() > 1 { + f.Usage() + return subcommands.ExitFailure + } + if f.NArg() == 0 { + queries := url.Values{} + ret, err := restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/publish/", queries) + if err != nil { + return subcommands.ExitFailure + } + fmt.Printf("%s\n", ret) + } else if f.NArg() == 1 { + queries := url.Values{} + queries.Set("path", f.Arg(0)) + restclient.PostCall(Config.ActiveConfig.Endpoint+"/api/v1/publish/", []byte("{\"path\": \""+f.Arg(0)+"\"}")) + } + + return subcommands.ExitSuccess +} From 63f1c957bfb0becbfa913c89439b878462e9c79a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 12 Sep 2017 17:01:24 +0800 Subject: [PATCH 3/5] fix travis error --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4ac9bff2..7c33468e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,6 @@ matrix: - mysql -e 'create database paddlecloud;' - mkdir $HOME/.kube && cp ./k8s/config $HOME/.kube/ - pip install -r paddlecloud/requirements.txt - - cd paddlecloud && python manage.py migrate + - cd paddlecloud && python manage.py makemigrations && python manage.py migrate script: - python manage.py test From 1913198bc23cc0f4d49ff90adf94539695e4fc5b Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 20 Sep 2017 14:57:22 +0800 Subject: [PATCH 4/5] split backend and job abstraction --- .../paddlejob/cloudprovider/__init__.py | 0 .../paddlejob/cloudprovider/k8s_provider.py | 309 +++++++++++++++ paddlecloud/paddlejob/cloudprovider/utils.py | 52 +++ .../paddlejob/{ => cloudprovider}/volume.py | 0 paddlecloud/paddlejob/paddle_job.py | 373 ++++++------------ paddlecloud/paddlejob/specs/__init__.py | 1 + paddlecloud/paddlejob/specs/spec_master.py | 53 +++ paddlecloud/paddlejob/specs/spec_pserver.py | 40 ++ paddlecloud/paddlejob/specs/spec_trainer.py | 44 +++ paddlecloud/paddlejob/views.py | 319 ++------------- 10 files changed, 662 insertions(+), 529 deletions(-) create mode 100644 paddlecloud/paddlejob/cloudprovider/__init__.py create mode 100644 paddlecloud/paddlejob/cloudprovider/k8s_provider.py create mode 100644 paddlecloud/paddlejob/cloudprovider/utils.py rename paddlecloud/paddlejob/{ => cloudprovider}/volume.py (100%) create mode 100644 paddlecloud/paddlejob/specs/__init__.py create mode 100644 paddlecloud/paddlejob/specs/spec_master.py create mode 100644 paddlecloud/paddlejob/specs/spec_pserver.py create mode 100644 paddlecloud/paddlejob/specs/spec_trainer.py diff --git a/paddlecloud/paddlejob/cloudprovider/__init__.py b/paddlecloud/paddlejob/cloudprovider/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/paddlecloud/paddlejob/cloudprovider/k8s_provider.py b/paddlecloud/paddlejob/cloudprovider/k8s_provider.py new file mode 100644 index 00000000..c6bacb45 --- /dev/null +++ b/paddlecloud/paddlejob/cloudprovider/k8s_provider.py @@ -0,0 +1,309 @@ +from kubernetes import client, config +from kubernetes.client.rest import ApiException +# FIXME(typhoonzero): still need to import settings +from django.conf import settings + +import copy +import os +import logging +import traceback + +import utils +import volume + +# FIXME(typhoonzero): need a base class to define the interfaces? +class K8sProvider: + """ + Kubernetes Cloud Porvider + Provide interfaces for manage jobs and resources. + """ + def __init__(self): + pass + + def get_jobs(self, username): + namespace = utils.email_escape(username) + api_instance =\ + client.BatchV1Api(api_client=utils.get_user_api_client(username)) + job_list = api_instance.list_namespaced_job(namespace) + # NOTE: when job is deleted, some pods of the job will be at "Terminating" status + # for a while, which may cause submit fail. Find all pods that are still "Terminating". + user_pod_list =\ + client.CoreV1Api(api_client=utils.get_user_api_client(username))\ + .list_namespaced_pod(namespace) + terminating_jobs = [] + for pod in user_pod_list.items: + jobname = "" + if not pod.metadata.labels: + continue + if "paddle-job" in pod.metadata.labels: + jobname = pod.metadata.labels["paddle-job"] + elif "paddle-job-master" in pod.metadata.labels: + jobname = pod.metadata.labels["paddle-job-master"] + elif "paddle-job-pserver" in pod.metadata.labels: + jobname = pod.metadata.labels["paddle-job-pserver"] + if pod.metadata.deletion_timestamp and jobname: + if jobname not in terminating_jobs: + terminating_jobs.append(jobname) + # NOTE: put it in the original dict for backward compability + ret_dict = copy.deepcopy(job_list.to_dict()) + ret_dict["terminating"] = terminating_jobs + return ret_dict + + def __setup_volumes(self, paddlejob, username): + volumes = [] + for k, cfg in settings.DATACENTERS.items(): + if k != paddlejob.dc and k != "public": + continue + fstype = cfg["fstype"] + if fstype == settings.FSTYPE_CEPHFS: + if k == "public": + mount_path = cfg["mount_path"] % paddlejob.dc + cephfs_path = cfg["cephfs_path"] + else: + mount_path = cfg["mount_path"] % (paddlejob.dc, username) + cephfs_path = cfg["cephfs_path"] % username + volumes.append(volume.get_volume_config( + fstype = fstype, + name = k.replace("_", "-"), + monitors_addr = cfg["monitors_addr"], + secret = cfg["secret"], + user = cfg["user"], + mount_path = mount_path, + cephfs_path = cephfs_path, + admin_key = cfg["admin_key"], + read_only = cfg.get("read_only", False) + )) + elif fstype == settings.FSTYPE_HOSTPATH: + if k == "public": + mount_path = cfg["mount_path"] % paddlejob.dc + host_path = cfg["host_path"] + else: + mount_path = cfg["mount_path"] % (paddlejob.dc, username) + host_path = cfg["host_path"] % username + + volumes.append(volume.get_volume_config( + fstype = fstype, + name = k.replace("_", "-"), + mount_path = mount_path, + host_path = host_path + )) + else: + pass + paddlejob.volumes = volumes + + def submit_job(self, paddlejob, username): + namespace = utils.email_escape(username) + api_client = utils.get_user_api_client(username) + self.__setup_volumes(paddlejob, username) + if not paddlejob.registry_secret: + paddlejob.registry_secret = settings.JOB_DOCKER_IMAGE.get("registry_secret", None) + if not paddlejob.image: + if paddlejob.gpu > 0: + paddlejob.image = settings.JOB_DOCKER_IMAGE["image_gpu"] + else: + paddlejob.image = settings.JOB_DOCKER_IMAGE["image"] + # jobPackage validation: startwith /pfs + # NOTE: job packages are uploaded to /pfs/[dc]/home/[user]/jobs/[jobname] + package_in_pod = os.path.join("/pfs/%s/home/%s"%(paddlejob.dc, username), "jobs", paddlejob.name) + + logging.info("current package: %s", package_in_pod) + # package must be ready before submit a job + current_package_path = package_in_pod.replace("/pfs/%s/home"%paddlejob.dc, settings.STORAGE_PATH) + if not os.path.exists(current_package_path): + current_package_path = package_in_pod.replace("/pfs/%s/home/%s"%(paddlejob.dc, username), settings.STORAGE_PATH) + if not os.path.exists(current_package_path): + raise Exception("package not exist in cloud: %s"%current_package_path) + logging.info("current package in pod: %s", current_package_path) + # GPU quota management + # TODO(Yancey1989) We should move this to Kubernetes + if 'GPU_QUOTA' in dir(settings) and int(paddlejob.gpu) > 0: + gpu_usage = 0 + pods = client.CoreV1Api(api_client=api_client).list_namespaced_pod(namespace=namespace) + for pod in pods.items: + # only statistics trainer GPU resource, pserver does not use GPU + if pod.metadata.labels and 'paddle-job' in pod.metadata.labels and \ + pod.status.phase == 'Running': + gpu_usage += int(pod.spec.containers[0].resources.limits.get('alpha.kubernetes.io/nvidia-gpu', '0')) + if username in settings.GPU_QUOTA: + gpu_quota = settings.GPU_QUOTA[username]['limit'] + else: + gpu_quota = settings.GPU_QUOTA['DEFAULT']['limit'] + gpu_available = gpu_quota - gpu_usage + gpu_request = int(paddlejob.gpu) * int(paddlejob.parallelism) + logging.info('gpu available: %d, gpu request: %d' % (gpu_available, gpu_request)) + if gpu_available < gpu_request: + raise Exception("You don't have enought GPU quota," + \ + "request: %d, usage: %d, limit: %d" % (gpu_request, gpu_usage, gpu_quota)) + + # add Nvidia lib volume if training with GPU + if paddlejob.gpu > 0: + volumes.append(volume.get_volume_config( + fstype = settings.FSTYPE_HOSTPATH, + name = "nvidia-libs", + mount_path = "/usr/local/nvidia/lib64", + host_path = settings.NVIDIA_LIB_PATH + )) + # ========== submit master ReplicaSet if using fault_tolerant feature == + # FIXME: alpha features in separate module + if paddlejob.fault_tolerant: + try: + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( + namespace, + paddlejob.new_master_job()) + except ApiException, e: + logging.error("error submitting master job: %s", traceback.format_exc()) + raise e + # ========================= submit pserver job ========================= + try: + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( + namespace, + paddlejob.new_pserver_job()) + except ApiException, e: + logging.error("error submitting pserver job: %s ", traceback.format_exc()) + raise e + # ========================= submit trainer job ========================= + try: + ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( + namespace, + paddlejob.new_trainer_job()) + except ApiException, e: + logging.error("error submitting trainer job: %s" % traceback.format_exc()) + raise e + return ret + + def delete_job(self, jobname, username): + namespace = utils.email_escape(username) + api_client = utils.get_user_api_client(username) + if not jobname: + return utils.simple_response(500, "must specify jobname") + # FIXME: options needed: grace_period_seconds, orphan_dependents, preconditions + # FIXME: cascade delteing + delete_status = [] + # delete job + trainer_name = jobname + "-trainer" + try: + u_status = client.BatchV1Api(api_client=api_client)\ + .delete_namespaced_job(trainer_name, namespace, {}) + except ApiException, e: + logging.error("error deleting job: %s, %s", jobname, str(e)) + delete_status.append(str(e)) + + # delete job pods + try: + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job=%s"%jobname) + for i in job_pod_list.items: + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) + except ApiException, e: + logging.error("error deleting job pod: %s", str(e)) + delete_status.append(str(e)) + + # delete pserver rs + pserver_name = jobname + "-pserver" + try: + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ + .delete_namespaced_replica_set(pserver_name, namespace, {}) + except ApiException, e: + logging.error("error deleting pserver: %s" % str(e)) + delete_status.append(str(e)) + + # delete pserver pods + try: + # pserver replica set has label with jobname + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job-pserver=%s"%jobname) + for i in job_pod_list.items: + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) + except ApiException, e: + logging.error("error deleting pserver pods: %s" % str(e)) + delete_status.append(str(e)) + + # delete master rs + master_name = jobname + "-master" + try: + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ + .delete_namespaced_replica_set(master_name, namespace, {}) + except ApiException, e: + logging.error("error deleting master: %s" % str(e)) + # just ignore deleting master failed, we do not set up master process + # without fault tolerant mode + #delete_status.append(str(e)) + + # delete master pods + try: + # master replica set has label with jobname + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job-master=%s"%jobname) + for i in job_pod_list.items: + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) + except ApiException, e: + logging.error("error deleting master pods: %s" % str(e)) + # just ignore deleting master failed, we do not set up master process + # without fault tolerant mode + #delete_status.append(str(e)) + + if len(delete_status) > 0: + retcode = 500 + else: + retcode = 200 + return retcode, delete_status + + def get_pservers(self, username): + namespace = utils.email_escape(username) + api_instance = client.ExtensionsV1beta1Api(api_client=utils.get_user_api_client(username)) + return api_instance.list_namespaced_replica_set(namespace).to_dict() + + def get_logs(self, jobname, num_lines, worker, username): + def _get_pod_log(api_client, namespace, pod_name, num_lines): + try: + if num_lines: + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log( + pod_name, namespace, tail_lines=int(num_lines)) + else: + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log(i.metadata.name, namespace) + return pod_log + except ApiException, e: + return str(e) + + namespace = utils.email_escape(username) + api_client = utils.get_user_api_client(username) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + total_job_log = "" + if not worker: + for i in job_pod_list.items: + total_job_log = "".join((total_job_log, + "==========================%s==========================" % i.metadata.name)) + pod_log = _get_pod_log(api_client, namespace, i.metadata.name, num_lines) + total_job_log = "\n".join((total_job_log, pod_log)) + else: + total_job_log = _get_pod_log(api_client, namespace, worker, num_lines) + return total_job_log + + def get_workers(self, jobname, username): + namespace = utils.email_escape(username) + job_pod_list = None + api_client = utils.get_user_api_client(username) + if not jobname: + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace) + else: + selector = "paddle-job=%s"%jobname + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, label_selector=selector) + return job_pod_list.to_dict() + + def get_quotas(self, username): + namespace = utils.email_escape(username) + api_client = utils.get_user_api_client(username) + quota_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_resource_quota(namespace) + return quota_list.to_dict() \ No newline at end of file diff --git a/paddlecloud/paddlejob/cloudprovider/utils.py b/paddlecloud/paddlejob/cloudprovider/utils.py new file mode 100644 index 00000000..e35ddb6e --- /dev/null +++ b/paddlecloud/paddlejob/cloudprovider/utils.py @@ -0,0 +1,52 @@ +import kubernetes +from kubernetes import client, config +from kubernetes.client.rest import ApiException +import os +# FIXME(typhoonzero): still need to import settings +from django.conf import settings + +def email_escape(email): + """ + Escape email to a safe string of kubernetes namespace + """ + safe_email = email.replace("@", "-") + safe_email = safe_email.replace(".", "-") + safe_email = safe_email.replace("_", "-") + return safe_email + +def get_user_api_client(username): + """ + Update kubernetes client to use current logined user's crednetials + """ + + conf_obj = kubernetes.client.Configuration() + conf_obj.host = settings.K8S_HOST + conf_obj.ssl_ca_cert = os.path.join(settings.CA_PATH) + conf_obj.cert_file = os.path.join(settings.USER_CERTS_PATH, username, "%s.pem"%username) + conf_obj.key_file = os.path.join(settings.USER_CERTS_PATH, username, "%s-key.pem"%username) + api_client = kubernetes.client.ApiClient(config=conf_obj) + return api_client + +def get_admin_api_client(): + """ + Update kubernetes client to use admin user to create namespace and authorizations + """ + + conf_obj = kubernetes.client.Configuration() + conf_obj.host = settings.K8S_HOST + conf_obj.ssl_ca_cert = os.path.join(settings.CA_PATH) + conf_obj.cert_file = os.path.join(settings.USER_CERTS_PATH, "admin.pem") + conf_obj.key_file = os.path.join(settings.USER_CERTS_PATH, "admin-key.pem") + api_client = kubernetes.client.ApiClient(config=conf_obj) + return api_client + +def user_certs_exist(username): + """ + Return True if the user's certs already generated. User's keys are of pairs. + """ + has_cert = os.path.isfile(os.path.join(settings.USER_CERTS_PATH, username, "%s.pem"%username)) + has_key = os.path.isfile(os.path.join(settings.USER_CERTS_PATH, username, "%s-key.pem"%username)) + if has_cert and has_key: + return True + else: + return False diff --git a/paddlecloud/paddlejob/volume.py b/paddlecloud/paddlejob/cloudprovider/volume.py similarity index 100% rename from paddlecloud/paddlejob/volume.py rename to paddlecloud/paddlejob/cloudprovider/volume.py diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index 9f781337..61f4d023 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -1,101 +1,122 @@ import kubernetes from kubernetes import client, config import os -__all__ = ["PaddleJob"] + +from specs import spec_master, spec_pserver, spec_trainer + DEFAULT_PADDLE_PORT=7164 DEFAULT_MASTER_PORT=8080 DEFAULT_ETCD_PORT=2379 -class PaddleJob(object): +class UniversionedAPI(object): """ - PaddleJob + Base defination for Paddle Cloud API fields. """ - def __init__(self, - name, - job_package, - parallelism, - cpu, - memory, - pservers, - pscpu, - psmemory, - topology, - entry, - image, - passes, - gpu=0, - volumes=[], - registry_secret=None, - envs = {}, - fault_tolerant=False, - etcd_image="quay.io/coreos/etcd:v3.2.1"): - - self._ports_num=1 - self._ports_num_for_sparse=1 - self._num_gradient_servers=parallelism - - self._name = name - self._job_package = job_package - self._parallelism = parallelism - self._cpu = cpu - self._gpu = gpu - self._memory = memory - self._pservers = pservers - self._pscpu = pscpu - self._psmemory = psmemory - self._topology = topology - self._entry = entry - self._image = image - self._volumes = volumes - self._registry_secret = registry_secret - self._passes = passes - self._usr_envs = envs - # master resources are static - self._mastercpu = 1 - self._mastermemory = "300Mi" - # use new pserver for tolerant - self._fault_tolerant = fault_tolerant - self._etcd_image = etcd_image - - @property - def pservers(self): - return self._pservers - - @property - def parallelism(self): - return self._parallelism - - @property - def runtime_image(self): - return self._image - - def _get_master_name(self): - return "%s-master" % self._name - - def _get_pserver_name(self): - return "%s-pserver" % self._name + required = { + "name": "The name of the job.", + "job_package": "A folder containing job programs.", + "parallelism": "Number of trainers to langch.", + "cpu": "CPU resource for each trainer.", + "memory": "memory resource for each trainer.", + "pservers": "Number of pservers to langch.", + "pscpu": "CPU resource for each pserver.", + "psmemory": "Memory resouce for each pserver.", + "topology": "Paddle V1 config file.", + "entry": "Command to run for training.", + "dc": "Datacenter specs" + } + optional = { + "image": "Docker image to use", + "passes": "Number of passes to run.", + "gpu": "Number of GPU for each trainer.", + "fault_tolerant": "Whether use new fault tolerant mode.", + "volumes": "Mount data volumes to pod.", + "registry_secret": "secret for reading registry.", + "envs": "Envs for all pods", + "etcd_image": "Docker image of etcd." + } + optional_defaults = { + "image": "", + "passes": 1, + "gpu": 0, + "fault_tolerant": False, + "volumes": [], + "registry_secret": "", + "envs": {}, + "etcd_image": "quay.io/coreos/etcd:v3.2.1" + } + # do not expose to user attributes. + internal = { + "ports_num": "ports_num argument for trainer.", + "ports_num_for_sparse": "ports_num_for_sparse argument for trainer.", + "mastercpu": "master process cpu resource.", + "mastermemory": "master process memory resource." + } + # internal_defaults may be changed during setup. + internal_defaults = { + "ports_num": 1, + "ports_num_for_sparse": 1, + "mastercpu": 1, + "mastermemory": "300Mi", + "num_gradient_servers": 1 + } + +class APIV1(UniversionedAPI): + """ + For v1 implementation + """ + pass - def _get_trainer_name(self): - return "%s-trainer" % self._name +class PaddleJob(object): + """ + PaddleJob Abstraction. + A job can be submited to any cluster environment + using one submit engine. + """ + def __init__(self, **kwargs): + self.apiv1 = APIV1() + # setup required + for k in self.apiv1.required: + if k not in kwargs: + raise Exception("Field required: %s" % k) + setattr(self, k, kwargs[k]) + for k in self.apiv1.optional: + if k in kwargs: + setattr(self, k, kwargs[k]) + else: + setattr(self, k, self.apiv1.optional_defaults[k]) + for k in self.apiv1.internal: + setattr(self, k, self.apiv1.internal_defaults[k]) + + self.num_gradient_servers = self.parallelism + + def get_master_name(self): + return "%s-master" % self.name + + def get_pserver_name(self): + return "%s-pserver" % self.name + + def get_trainer_name(self): + return "%s-trainer" % self.name def get_env(self): envs = [] - envs.append({"name":"PADDLE_JOB_NAME", "value":self._name}) - envs.append({"name":"TRAINERS", "value":str(self._parallelism)}) - envs.append({"name":"PSERVERS", "value":str(self._pservers)}) - envs.append({"name":"TOPOLOGY", "value":self._topology}) - envs.append({"name":"ENTRY", "value":self._entry}) - envs.append({"name":"TRAINER_PACKAGE", "value":self._job_package}) + envs.append({"name":"PADDLE_JOB_NAME", "value":self.name}) + envs.append({"name":"TRAINERS", "value":str(self.parallelism)}) + envs.append({"name":"PSERVERS", "value":str(self.pservers)}) + envs.append({"name":"TOPOLOGY", "value":self.topology}) + envs.append({"name":"ENTRY", "value":self.entry}) + envs.append({"name":"TRAINER_PACKAGE", "value":self.job_package}) envs.append({"name":"PADDLE_INIT_PORT", "value":str(DEFAULT_PADDLE_PORT)}) - if self._gpu > 0: - envs.append({"name":"PADDLE_INIT_TRAINER_COUNT", "value":str(self._gpu)}) + if self.gpu > 0: + envs.append({"name":"PADDLE_INIT_TRAINER_COUNT", "value":str(self.gpu)}) else: - envs.append({"name":"PADDLE_INIT_TRAINER_COUNT", "value":str(self._cpu)}) - envs.append({"name":"PADDLE_INIT_PORTS_NUM", "value":str(self._ports_num)}) - envs.append({"name":"PADDLE_INIT_PORTS_NUM_FOR_SPARSE", "value":str(self._ports_num_for_sparse)}) - envs.append({"name":"PADDLE_INIT_NUM_GRADIENT_SERVERS", "value":str(self._num_gradient_servers)}) - envs.append({"name":"PADDLE_INIT_NUM_PASSES", "value":str(self._passes)}) - if self._gpu: + envs.append({"name":"PADDLE_INIT_TRAINER_COUNT", "value":str(self.cpu)}) + envs.append({"name":"PADDLE_INIT_PORTS_NUM", "value":str(self.ports_num)}) + envs.append({"name":"PADDLE_INIT_PORTS_NUM_FOR_SPARSE", "value":str(self.ports_num_for_sparse)}) + envs.append({"name":"PADDLE_INIT_NUM_GRADIENT_SERVERS", "value":str(self.num_gradient_servers)}) + envs.append({"name":"PADDLE_INIT_NUM_PASSES", "value":str(self.passes)}) + if self.gpu: envs.append({"name":"PADDLE_INIT_USE_GPU", "value":str("1")}) # HACK: add nvidia lib LD_LIBRARY_PATH for all pods envs.append({"name":"LD_LIBRARY_PATH", "value":"/usr/local/nvidia/lib64"}) @@ -103,203 +124,69 @@ def get_env(self): envs.append({"name":"PADDLE_INIT_USE_GPU", "value":str("0")}) envs.append({"name":"NAMESPACE", "valueFrom":{ "fieldRef":{"fieldPath":"metadata.namespace"}}}) - if self._usr_envs: - for k, v in self._usr_envs.items(): + if self.envs: + for k, v in self.envs.items(): envs.append({"name": k, "value": v}) return envs - def _get_pserver_container_ports(self): + def get_pserver_container_ports(self): ports = [] port = DEFAULT_PADDLE_PORT - for i in xrange(self._ports_num + self._ports_num_for_sparse): + for i in xrange(self.ports_num + self.ports_num_for_sparse): ports.append({"containerPort":port, "name":"jobport-%d" % i}) port += 1 return ports - def _get_master_container_ports(self): + def get_master_container_ports(self): ports = [] port = DEFAULT_MASTER_PORT ports.append({"containerPort": DEFAULT_MASTER_PORT, "name":"master-port"}) ports.append({"containerPort": DEFAULT_ETCD_PORT, "name":"etcd-port"}) return ports - def _get_master_labels(self): - return {"paddle-job-master": self._name} + def get_master_labels(self): + return {"paddle-job-master": self.name} - def _get_pserver_labels(self): - return {"paddle-job-pserver": self._name} + def get_pserver_labels(self): + return {"paddle-job-pserver": self.name} - def _get_master_entrypoint(self): + def get_master_entrypoint(self): return ["paddle_k8s", "start_master"] - def _get_pserver_entrypoint(self): - if not self._fault_tolerant: + def get_pserver_entrypoint(self): + if not self.fault_tolerant: return ["paddle_k8s", "start_pserver"] else: return ["paddle_k8s", "start_new_pserver"] - def _get_trainer_entrypoint(self): - if self._entry: - if self._fault_tolerant: + def get_trainer_entrypoint(self): + if self.entry: + if self.fault_tolerant: return ["paddle_k8s", "start_new_trainer"] return ["paddle_k8s", "start_trainer", "v2"] return ["paddle_k8s", "start_trainer", "v1"] - def _get_trainer_labels(self): - return {"paddle-job": self._name} + def get_trainer_labels(self): + return {"paddle-job": self.name} - def _get_trainer_volumes(self): + def get_trainer_volumes(self): volumes = [] - for item in self._volumes: + for item in self.volumes: volumes.append(item["volume"]) return volumes - def _get_trainer_volume_mounts(self): + def get_trainer_volume_mounts(self): volume_mounts = [] - for item in self._volumes: + for item in self.volumes: volume_mounts.append(item["volume_mount"]) return volume_mounts - + def new_master_job(self): - """ - return: Master ReplicaSet - """ - rs = { - "apiVersion": "extensions/v1beta1", - "kind": "ReplicaSet", - "metadata":{ - "name": self._get_master_name(), - }, - "spec":{ - "replicas": 1, # NOTE: always 1 replica of master - "template": { - "metadata": { - "labels": self._get_master_labels() - }, - "spec": { - # mount trainer volumes to dispatch datasets - "volumes": self._get_trainer_volumes(), - "containers":[{ - "name": self._name, - "image": self._image, - "ports": self._get_master_container_ports(), - "env": self.get_env(), - "volumeMounts": self._get_trainer_volume_mounts(), - "command": self._get_master_entrypoint(), - "resources": { - "requests": { - "memory": str(self._mastermemory), - "cpu": str(self._mastercpu) - }, - "limits": { - "memory": str(self._mastermemory), - "cpu": str(self._mastercpu) - } - } - }, { - "name": self._name + "-etcd", - "image": self._etcd_image, - "command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-initial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"], - "env": [{ - "name": "POD_IP", - "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}} - }] + return spec_master.get_spec_master(self) - }] - } - } - } - } - return rs + def new_pserver_job(self): + return spec_pserver.get_spec_pserver(self) def new_trainer_job(self): - """ - return: Trainer job, it's a Kubernetes Job - """ - job = { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": { - "name": self._get_trainer_name(), - }, - "spec": { - "parallelism": self._parallelism, - "completions": self._parallelism, - "template": { - "metadata":{ - "labels": self._get_trainer_labels() - }, - "spec": { - "volumes": self._get_trainer_volumes(), - "containers":[{ - "name": "trainer", - "image": self._image, - "imagePullPolicy": "Always", - "command": self._get_trainer_entrypoint(), - "env": self.get_env(), - "volumeMounts": self._get_trainer_volume_mounts(), - "resources": { - "requests": { - "memory": str(self._memory), - "cpu": str(self._cpu) - }, - "limits": { - "memory": str(self._memory), - "cpu" : str(self._cpu*1.5) - } - } - }], - "restartPolicy": "Never" - } - } - } - } - if self._registry_secret: - job["spec"]["template"]["spec"].update({"imagePullSecrets": [{"name": self._registry_secret}]}) - if self._gpu > 0: - job["spec"]["template"]["spec"]["containers"][0]["resources"]["limits"]["alpha.kubernetes.io/nvidia-gpu"] = str(self._gpu) - return job - def new_pserver_job(self): - """ - return: PServer job, it's a Kubernetes ReplicaSet - """ - rs = { - "apiVersion": "extensions/v1beta1", - "kind": "ReplicaSet", - "metadata":{ - "name": self._get_pserver_name(), - }, - "spec":{ - "replicas": self._pservers, - "template": { - "metadata": { - "labels": self._get_pserver_labels() - }, - "spec": { - "volumes": self._get_trainer_volumes(), - "containers":[{ - "name": self._name, - "image": self._image, - "ports": self._get_pserver_container_ports(), - "env": self.get_env(), - "volumeMounts": self._get_trainer_volume_mounts(), - "command": self._get_pserver_entrypoint(), - "resources": { - "requests": { - "memory": str(self._psmemory), - "cpu": str(self._pscpu) - }, - "limits": { - "memory": str(self._psmemory), - "cpu": str(self._pscpu*1.5) - } - } - }] - } - } - } - } - if self._registry_secret: - rs["spec"]["template"]["spec"].update({"imagePullSecrets": [{"name": self._registry_secret}]}) - return rs + return spec_trainer.get_spec_trainer(self) \ No newline at end of file diff --git a/paddlecloud/paddlejob/specs/__init__.py b/paddlecloud/paddlejob/specs/__init__.py new file mode 100644 index 00000000..152d6962 --- /dev/null +++ b/paddlecloud/paddlejob/specs/__init__.py @@ -0,0 +1 @@ +__all__ = ["spec_pserver", "spec_master", "spec_trainer"] \ No newline at end of file diff --git a/paddlecloud/paddlejob/specs/spec_master.py b/paddlecloud/paddlejob/specs/spec_master.py new file mode 100644 index 00000000..a3c34995 --- /dev/null +++ b/paddlecloud/paddlejob/specs/spec_master.py @@ -0,0 +1,53 @@ +def get_spec_master(paddlejob): + return { + "apiVersion": "extensions/v1beta1", + "kind": "ReplicaSet", + "metadata":{ + "name": paddlejob.get_master_name(), + }, + "spec":{ + "replicas": 1, # NOTE: always 1 replica of master + "template": { + "metadata": { + "labels": paddlejob.get_master_labels() + }, + "spec": { + # mount trainer volumes to dispatch datasets + "volumes": paddlejob.get_trainer_volumes(), + "containers":[{ + "name": paddlejob.name, + "image": paddlejob.image, + "ports": paddlejob.get_master_container_ports(), + "env": paddlejob.get_env(), + "volumeMounts": paddlejob.get_trainer_volume_mounts(), + "command": paddlejob.get_master_entrypoint(), + "resources": { + "requests": { + "memory": str(paddlejob.mastermemory), + "cpu": str(paddlejob.mastercpu) + }, + "limits": { + "memory": str(paddlejob.mastermemory), + "cpu": str(paddlejob.mastercpu) + } + } + }, { + "name": paddlejob.name + "-etcd", + "image": paddlejob.etcd_image, + "command": ["etcd", "-name", "etcd0", + "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", + "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", + "-initial-advertise-peer-urls", "http://$(POD_IP):2380", + "-listen-peer-urls", "http://0.0.0.0:2380", + "-initial-cluster", "etcd0=http://$(POD_IP):2380", + "-initial-cluster-state", "new"], + "env": [{ + "name": "POD_IP", + "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}} + }] + + }] + } + } + } + } \ No newline at end of file diff --git a/paddlecloud/paddlejob/specs/spec_pserver.py b/paddlecloud/paddlejob/specs/spec_pserver.py new file mode 100644 index 00000000..ba932dc9 --- /dev/null +++ b/paddlecloud/paddlejob/specs/spec_pserver.py @@ -0,0 +1,40 @@ +def get_spec_pserver(paddlejob): + rs = { + "apiVersion": "extensions/v1beta1", + "kind": "ReplicaSet", + "metadata":{ + "name": paddlejob.get_pserver_name(), + }, + "spec":{ + "replicas": paddlejob.pservers, + "template": { + "metadata": { + "labels": paddlejob.get_pserver_labels() + }, + "spec": { + "volumes": paddlejob.get_trainer_volumes(), + "containers":[{ + "name": paddlejob.name, + "image": paddlejob.image, + "ports": paddlejob.get_pserver_container_ports(), + "env": paddlejob.get_env(), + "volumeMounts": paddlejob.get_trainer_volume_mounts(), + "command": paddlejob.get_pserver_entrypoint(), + "resources": { + "requests": { + "memory": str(paddlejob.psmemory), + "cpu": str(paddlejob.pscpu) + }, + "limits": { + "memory": str(paddlejob.psmemory), + "cpu": str(paddlejob.pscpu * 1.5) + } + } + }] + } + } + } + } + if paddlejob.registry_secret: + rs["spec"]["template"]["spec"].update({"imagePullSecrets": [{"name": paddlejob.registry_secret}]}) + return rs \ No newline at end of file diff --git a/paddlecloud/paddlejob/specs/spec_trainer.py b/paddlecloud/paddlejob/specs/spec_trainer.py new file mode 100644 index 00000000..c0b9ccad --- /dev/null +++ b/paddlecloud/paddlejob/specs/spec_trainer.py @@ -0,0 +1,44 @@ +def get_spec_trainer(paddlejob): + job = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": paddlejob.get_trainer_name(), + }, + "spec": { + "parallelism": paddlejob.parallelism, + "completions": paddlejob.parallelism, + "template": { + "metadata":{ + "labels": paddlejob.get_trainer_labels() + }, + "spec": { + "volumes": paddlejob.get_trainer_volumes(), + "containers":[{ + "name": "trainer", + "image": paddlejob.image, + "imagePullPolicy": "Always", + "command": paddlejob.get_trainer_entrypoint(), + "env": paddlejob.get_env(), + "volumeMounts": paddlejob.get_trainer_volume_mounts(), + "resources": { + "requests": { + "memory": str(paddlejob.memory), + "cpu": str(paddlejob.cpu) + }, + "limits": { + "memory": str(paddlejob.memory), + "cpu" : str(paddlejob.cpu * 1.5) + } + } + }], + "restartPolicy": "Never" + } + } + } + } + if paddlejob.registry_secret: + job["spec"]["template"]["spec"].update({"imagePullSecrets": [{"name": paddlejob.registry_secret}]}) + if paddlejob.gpu > 0: + job["spec"]["template"]["spec"]["containers"][0]["resources"]["limits"]["alpha.kubernetes.io/nvidia-gpu"] = str(paddlejob.gpu) + return job \ No newline at end of file diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index e73e716d..588020d7 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -15,11 +15,13 @@ import utils import notebook.utils import logging -import volume import os import copy from notebook.models import FilePublish import uuid +from cloudprovider.k8s_provider import K8sProvider +from paddle_job import PaddleJob + def file_publish_view(request): """ @@ -80,33 +82,9 @@ class JobsView(APIView): permission_classes = (permissions.IsAuthenticated,) def get(self, request, format=None): - """ - List all jobs - """ username = request.user.username - namespace = notebook.utils.email_escape(username) - api_instance = client.BatchV1Api(api_client=notebook.utils.get_user_api_client(username)) - job_list = api_instance.list_namespaced_job(namespace) - # NOTE: when job is deleted, some pods of the job will be at "Terminating" status - # for a while, which may cause submit fail. Find all pods that are still "Terminating". - user_pod_list = client.CoreV1Api(api_client=notebook.utils.get_user_api_client(username)).list_namespaced_pod(namespace) - terminating_jobs = [] - for pod in user_pod_list.items: - jobname = "" - if not pod.metadata.labels: - continue - if "paddle-job" in pod.metadata.labels: - jobname = pod.metadata.labels["paddle-job"] - elif "paddle-job-master" in pod.metadata.labels: - jobname = pod.metadata.labels["paddle-job-master"] - elif "paddle-job-pserver" in pod.metadata.labels: - jobname = pod.metadata.labels["paddle-job-pserver"] - if pod.metadata.deletion_timestamp and jobname: - if jobname not in terminating_jobs: - terminating_jobs.append(jobname) - # NOTE: put it in the original dict for backward compability - ret_dict = copy.deepcopy(job_list.to_dict()) - ret_dict["terminating"] = terminating_jobs + p = K8sProvider() + ret_dict = p.get_jobs(username) return Response(ret_dict) def post(self, request, format=None): @@ -114,64 +92,15 @@ def post(self, request, format=None): Submit the PaddlePaddle job """ username = request.user.username - namespace = notebook.utils.email_escape(username) obj = json.loads(request.body) topology = obj.get("topology", "") entry = obj.get("entry", "") - fault_tolerant = obj.get("faulttolerant", False) - api_client = notebook.utils.get_user_api_client(username) if not topology and not entry: return utils.simple_response(500, "no topology or entry specified") if not obj.get("datacenter"): return utils.simple_response(500, "no datacenter specified") cfgs = {} dc = obj.get("datacenter") - - volumes = [] - for k, cfg in settings.DATACENTERS.items(): - if k != dc and k != "public": - continue - fstype = cfg["fstype"] - if fstype == settings.FSTYPE_CEPHFS: - if k == "public": - mount_path = cfg["mount_path"] % dc - cephfs_path = cfg["cephfs_path"] - else: - mount_path = cfg["mount_path"] % (dc, username) - cephfs_path = cfg["cephfs_path"] % username - volumes.append(volume.get_volume_config( - fstype = fstype, - name = k.replace("_", "-"), - monitors_addr = cfg["monitors_addr"], - secret = cfg["secret"], - user = cfg["user"], - mount_path = mount_path, - cephfs_path = cephfs_path, - admin_key = cfg["admin_key"], - read_only = cfg.get("read_only", False) - )) - elif fstype == settings.FSTYPE_HOSTPATH: - if k == "public": - mount_path = cfg["mount_path"] % dc - host_path = cfg["host_path"] - else: - mount_path = cfg["mount_path"] % (dc, username) - host_path = cfg["host_path"] % username - - volumes.append(volume.get_volume_config( - fstype = fstype, - name = k.replace("_", "-"), - mount_path = mount_path, - host_path = host_path - )) - else: - pass - registry_secret = obj.get("registry", None) - if not registry_secret: - registry_secret = settings.JOB_DOCKER_IMAGE.get("registry_secret", None) - # get user specified image - job_image = obj.get("image", None) - gpu_count = obj.get("gpu", 0) # jobPackage validation: startwith /pfs # NOTE: job packages are uploaded to /pfs/[dc]/home/[user]/jobs/[jobname] job_name = obj.get("name", "paddle-cluster-job") @@ -179,49 +108,13 @@ def post(self, request, format=None): logging.info("current package: %s", package_in_pod) # package must be ready before submit a job - current_package_path = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH) - if not os.path.exists(current_package_path): - current_package_path = package_in_pod.replace("/pfs/%s/home/%s"%(dc, username), settings.STORAGE_PATH) - if not os.path.exists(current_package_path): - return utils.error_message_response("package not exist in cloud: %s"%current_package_path) - logging.info("current package in pod: %s", current_package_path) - - # checkout GPU quota - # TODO(Yancey1989) We should move this to Kubernetes - if 'GPU_QUOTA' in dir(settings) and int(obj.get('gpu', '0')) > 0: - gpu_usage = 0 - pods = client.CoreV1Api(api_client=api_client).list_namespaced_pod(namespace=namespace) - for pod in pods.items: - # only statistics trainer GPU resource, pserver does not use GPU - if pod.metadata.labels and 'paddle-job' in pod.metadata.labels and \ - pod.status.phase == 'Running': - gpu_usage += int(pod.spec.containers[0].resources.limits.get('alpha.kubernetes.io/nvidia-gpu', '0')) - if username in settings.GPU_QUOTA: - gpu_quota = settings.GPU_QUOTA[username]['limit'] - else: - gpu_quota = settings.GPU_QUOTA['DEFAULT']['limit'] - gpu_available = gpu_quota - gpu_usage - gpu_request = int(obj.get('gpu', 0)) * int(obj.get('parallelism', 1)) - print 'gpu available: %d, gpu request: %d' % (gpu_available, gpu_request) - if gpu_available < gpu_request: - return utils.error_message_response("You don't have enought GPU quota," + \ - "request: %d, usage: %d, limit: %d" % (gpu_request, gpu_usage, gpu_quota)) - - # use default images - if not job_image : - if gpu_count > 0: - job_image = settings.JOB_DOCKER_IMAGE["image_gpu"] - else: - job_image = settings.JOB_DOCKER_IMAGE["image"] - - # add Nvidia lib volume if training with GPU - if gpu_count > 0: - volumes.append(volume.get_volume_config( - fstype = settings.FSTYPE_HOSTPATH, - name = "nvidia-libs", - mount_path = "/usr/local/nvidia/lib64", - host_path = settings.NVIDIA_LIB_PATH - )) + package_path_4test = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH) + if not os.path.exists(package_path_4test): + package_path_4test = package_in_pod.replace("/pfs/%s/home/%s"%(dc, username), settings.STORAGE_PATH) + if not os.path.exists(package_path_4test): + return utils.error_message_response("package not exist in cloud: %s"%package_path_4test) + logging.info("current package in pod: %s", package_path_4test) + envs = {} envs.update({"PADDLE_CLOUD_CURRENT_DATACENTER": dc}) envs.update({"PADDLE_CLOUD_USERNAME": username}) @@ -238,42 +131,21 @@ def post(self, request, format=None): topology = topology, entry = entry, gpu = obj.get("gpu", 0), - image = job_image, + image = obj.get("image", None), passes = obj.get("passes", 1), - registry_secret = registry_secret, - volumes = volumes, + registry_secret = obj.get("registry", None), + volumes = [], envs = envs, - fault_tolerant = fault_tolerant, - etcd_image = settings.ETCD_IMAGE + fault_tolerant = obj.get("faulttolerant", False), + etcd_image = settings.ETCD_IMAGE, + dc = dc ) # ========== submit master ReplicaSet if using fault_tolerant feature == - # FIXME: alpha features in separate module - if fault_tolerant: - try: - ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( - namespace, - paddle_job.new_master_job()) - except ApiException, e: - logging.error("error submitting master job: %s", e) - return utils.simple_response(500, str(e)) - # ========================= submit pserver job ========================= + p = K8sProvider() try: - ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( - namespace, - paddle_job.new_pserver_job()) - except ApiException, e: - logging.error("error submitting pserver job: %s ", e) - return utils.simple_response(500, str(e)) - # ========================= submit trainer job ========================= - try: - ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( - namespace, - paddle_job.new_trainer_job()) - except ApiException, e: - logging.error("error submitting trainer job: %s" % e) - return utils.simple_response(500, str(e)) - - # TODO(typhoonzero): stop master and pservers when job finish or fails + p.submit_job(paddle_job, username) + except Exception, e: + return utils.error_message_response(str(e)) return utils.simple_response(200, "") @@ -282,89 +154,11 @@ def delete(self, request, format=None): Kill a job """ username = request.user.username - namespace = notebook.utils.email_escape(username) obj = json.loads(request.body) jobname = obj.get("jobname") - api_client = notebook.utils.get_user_api_client(username) - if not jobname: - return utils.simple_response(500, "must specify jobname") - # FIXME: options needed: grace_period_seconds, orphan_dependents, preconditions - # FIXME: cascade delteing - delete_status = [] - # delete job - trainer_name = jobname + "-trainer" - try: - u_status = client.BatchV1Api(api_client=api_client)\ - .delete_namespaced_job(trainer_name, namespace, {}) - except ApiException, e: - logging.error("error deleting job: %s, %s", jobname, str(e)) - delete_status.append(str(e)) - - # delete job pods - try: - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace, - label_selector="paddle-job=%s"%jobname) - for i in job_pod_list.items: - u_status = client.CoreV1Api(api_client=api_client)\ - .delete_namespaced_pod(i.metadata.name, namespace, {}) - except ApiException, e: - logging.error("error deleting job pod: %s", str(e)) - delete_status.append(str(e)) - - # delete pserver rs - pserver_name = jobname + "-pserver" - try: - u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ - .delete_namespaced_replica_set(pserver_name, namespace, {}) - except ApiException, e: - logging.error("error deleting pserver: %s" % str(e)) - delete_status.append(str(e)) - - # delete pserver pods - try: - # pserver replica set has label with jobname - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace, - label_selector="paddle-job-pserver=%s"%jobname) - for i in job_pod_list.items: - u_status = client.CoreV1Api(api_client=api_client)\ - .delete_namespaced_pod(i.metadata.name, namespace, {}) - except ApiException, e: - logging.error("error deleting pserver pods: %s" % str(e)) - delete_status.append(str(e)) - - # delete master rs - master_name = jobname + "-master" - try: - u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ - .delete_namespaced_replica_set(master_name, namespace, {}) - except ApiException, e: - logging.error("error deleting master: %s" % str(e)) - # just ignore deleting master failed, we do not set up master process - # without fault tolerant mode - #delete_status.append(str(e)) - - # delete master pods - try: - # master replica set has label with jobname - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace, - label_selector="paddle-job-master=%s"%jobname) - for i in job_pod_list.items: - u_status = client.CoreV1Api(api_client=api_client)\ - .delete_namespaced_pod(i.metadata.name, namespace, {}) - except ApiException, e: - logging.error("error deleting master pods: %s" % str(e)) - # just ignore deleting master failed, we do not set up master process - # without fault tolerant mode - #delete_status.append(str(e)) - - if len(delete_status) > 0: - retcode = 500 - else: - retcode = 200 - return utils.simple_response(retcode, "\n".join(delete_status)) + p = K8sProvider() + retcode, status = p.delete_job(jobname, username) + return utils.simple_response(retcode, "\n".join(status)) class PserversView(APIView): permission_classes = (permissions.IsAuthenticated,) @@ -374,84 +168,37 @@ def get(self, request, format=None): List all pservers """ username = request.user.username - namespace = notebook.utils.email_escape(username) - api_instance = client.ExtensionsV1beta1Api(api_client=notebook.utils.get_user_api_client(username)) - pserver_rs_list = api_instance.list_namespaced_replica_set(namespace) - return Response(pserver_rs_list.to_dict()) - + p = K8sProvider() + return Response(p.get_pservers(username)) class LogsView(APIView): permission_classes = (permissions.IsAuthenticated,) def get(self, request, format=None): - """ - Get logs for jobs - """ - def _get_pod_log(api_client, namespace, pod_name, num_lines): - try: - if num_lines: - pod_log = client.CoreV1Api(api_client=api_client)\ - .read_namespaced_pod_log( - pod_name, namespace, tail_lines=int(num_lines)) - else: - pod_log = client.CoreV1Api(api_client=api_client)\ - .read_namespaced_pod_log(i.metadata.name, namespace) - return pod_log - except ApiException, e: - return str(e) - username = request.user.username - namespace = notebook.utils.email_escape(username) - api_client = notebook.utils.get_user_api_client(username) jobname = request.query_params.get("jobname") num_lines = request.query_params.get("n") worker = request.query_params.get("w") - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) - total_job_log = "" - if not worker: - for i in job_pod_list.items: - total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name)) - pod_log = _get_pod_log(api_client, namespace, i.metadata.name, num_lines) - total_job_log = "\n".join((total_job_log, pod_log)) - else: - total_job_log = _get_pod_log(api_client, namespace, worker, num_lines) + + total_job_log = K8sProvider().get_logs(jobname, num_lines, worker, username) return utils.simple_response(200, total_job_log) class WorkersView(APIView): permission_classes = (permissions.IsAuthenticated,) def get(self, request, format=None): - """ - Get logs for jobs - """ username = request.user.username - namespace = notebook.utils.email_escape(username) jobname = request.query_params.get("jobname") - job_pod_list = None - api_client = notebook.utils.get_user_api_client(username) - if not jobname: - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace) - else: - selector = "paddle-job=%s"%jobname - job_pod_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_pod(namespace, label_selector=selector) - return Response(job_pod_list.to_dict()) + ret = K8sProvider().get_workers(jobname, username) + return Response(ret) class QuotaView(APIView): permission_classes = (permissions.IsAuthenticated,) def get(self, request, format=None): - """ - Get user quotas - """ username = request.user.username - namespace = notebook.utils.email_escape(username) - api_client = notebook.utils.get_user_api_client(username) - quota_list = client.CoreV1Api(api_client=api_client)\ - .list_namespaced_resource_quota(namespace) - return Response(quota_list.to_dict()) + ret = K8sProvider().get_quotas(username) + return Response(ret) class GetUserView(APIView): permission_classes = (permissions.IsAuthenticated,) From ed225422c1187095b75da470e177af125ddea392 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 20 Sep 2017 16:40:42 +0800 Subject: [PATCH 5/5] try fix ci --- paddlecloud/paddlejob/registry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paddlecloud/paddlejob/registry.py b/paddlecloud/paddlejob/registry.py index ba92bdfb..93411619 100644 --- a/paddlecloud/paddlejob/registry.py +++ b/paddlecloud/paddlejob/registry.py @@ -13,7 +13,6 @@ import utils import notebook.utils import logging -import volume import os import base64