Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "submit trainingjobs". #526

Merged
merged 9 commits into from
Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 26 additions & 106 deletions go/paddlectl/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,23 @@ package paddlectl

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"os"
"path"
"strings"

"k8s.io/client-go/pkg/api/v1"

paddlejob "github.com/PaddlePaddle/cloud/go/api"
"github.com/PaddlePaddle/cloud/go/utils/config"
kubeutil "github.com/PaddlePaddle/cloud/go/utils/kubeutil"
"github.com/PaddlePaddle/cloud/go/utils/restclient"
"github.com/golang/glog"
"github.com/google/subcommands"
apiresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
invalidJobName = "jobname can not contain '.' or '_'"
invalidJobName = "jobname can not contain '.' or '_'"
trainingjobsPath = "/api/v1/trainingjobs/"
)

// Config is global config object for paddlectl commandline
Expand Down Expand Up @@ -52,9 +49,6 @@ type SubmitCmd struct {
MaxInstance int `json:"maxInstance"`
MinInstance int `json:"minInstance"`

// TODO(gongwb): init config in memory.
KubeConfig string `json:"kubeconfig"`

// TODO(gongwb): create from yaml.
}

Expand All @@ -72,84 +66,8 @@ func (*SubmitCmd) Usage() string {
`
}

func (p *SubmitCmd) getTrainer() *paddlejob.TrainerSpec {
return &paddlejob.TrainerSpec{
Entrypoint: p.Entry,
Workspace: getJobPfsPath(p.Jobpackage, p.Jobname),
MinInstance: p.MinInstance,
MaxInstance: p.MaxInstance,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(p.CPU), apiresource.DecimalSI),
"memory": apiresource.MustParse(p.Memory),
},
Requests: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(p.CPU), apiresource.DecimalSI),
"memory": apiresource.MustParse(p.Memory),
},
},
}
}

func (p *SubmitCmd) getPserver() *paddlejob.PserverSpec {
return &paddlejob.PserverSpec{
// TODO(gongwb):Pserver can be auto-scaled?
MinInstance: p.Pservers,
MaxInstance: p.Pservers,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(p.PSCPU), apiresource.DecimalSI),
"memory": apiresource.MustParse(p.PSMemory),
},
Requests: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(p.PSCPU), apiresource.DecimalSI),
"memory": apiresource.MustParse(p.PSMemory),
},
},
}
}

func (p *SubmitCmd) getMaster() *paddlejob.MasterSpec {
return &paddlejob.MasterSpec{}
}

// GetTrainingJob get's paddlejob.TrainingJob struct filed by Submitcmd paramters.
func (p *SubmitCmd) GetTrainingJob() *paddlejob.TrainingJob {
t := paddlejob.TrainingJob{
metav1.TypeMeta{
Kind: "TrainingJob",
APIVersion: "paddlepaddle.org/v1",
},
metav1.ObjectMeta{
Name: p.Jobname,
Namespace: kubeutil.NameEscape(Config.ActiveConfig.Username),
},

// General job attributes.
paddlejob.TrainingJobSpec{
Image: p.Image,

// TODO(gongwb): init them?

FaultTolerant: p.FaultTolerant,
Passes: p.Passes,

Trainer: *p.getTrainer(),
Pserver: *p.getPserver(),
Master: *p.getMaster(),
},
paddlejob.TrainingJobStatus{},
}

if glog.V(3) {
glog.Infof("GetTrainingJob: %s\n", t)
}
return &t
}

// SetFlags registers subcommands flags.
func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&p.KubeConfig, "kubeconfig", "", "Kubernetes config.")
f.StringVar(&p.Jobname, "jobname", "paddle-cluster-job", "Cluster job name.")
f.IntVar(&p.CPU, "cpu", 1, "CPU resource each trainer will use. Defaults to 1.")
f.IntVar(&p.GPU, "gpu", 0, "GPU resource each trainer will use. Defaults to 0.")
Expand Down Expand Up @@ -229,41 +147,43 @@ func putFilesToPfs(jobPackage, jobName string) error {
return nil
}

func (s *Submitter) getKubeConfig() (string, error) {
kubeconfig := s.args.KubeConfig
if _, err := os.Stat(kubeconfig); err != nil {
return "", fmt.Errorf("can't access kubeconfig '%s' error: %v", kubeconfig, err)
func (s *Submitter) createJobs() error {
jsonString, err := json.Marshal(s.args)
if err != nil {
return err
}

return kubeconfig, nil
}

// Submit current job.
func (s *Submitter) Submit(jobPackage string, jobName string) error {
if err := checkJobName(jobName); err != nil {
apiPath := Config.ActiveConfig.Endpoint + trainingjobsPath
respBody, err := restclient.PostCall(apiPath, jsonString)
if err != nil {
return err
}

if err := putFilesToPfs(jobPackage, jobName); err != nil {
var respObj interface{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to:

var respObj map[string]string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.Thanks.

if err = json.Unmarshal(respBody, &respObj); err != nil {
return err
}

kubeconfig, err := s.getKubeConfig()
if err != nil {
return err
// FIXME: Return an error if error message is not empty. Use response code instead.
errMsg := respObj.(map[string]interface{})["msg"].(string)
if len(errMsg) > 0 {
return errors.New(errMsg)
}

client, clientset, err := kubeutil.CreateClient(kubeconfig)
if err != nil {
glog.Infof("Submitting job: %s\n", s.args.Jobname)
return nil
}

// Submit current job.
func (s *Submitter) Submit(jobPackage string, jobName string) error {
if err := checkJobName(jobName); err != nil {
return err
}

namespace := kubeutil.NameEscape(Config.ActiveConfig.Username)
if err := kubeutil.FindNamespace(clientset, namespace); err != nil {
if err := putFilesToPfs(jobPackage, jobName); err != nil {
return err
}

return kubeutil.CreateTrainingJob(client, namespace, s.args.GetTrainingJob())
return s.createJobs()
}

func checkJobName(jobName string) error {
Expand Down
14 changes: 7 additions & 7 deletions k8s/minikube/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
},
"handlers": {
"mail_admins": {
"level": "ERROR",
"level": "INFO",
"filters": ["require_debug_false"],
"class": "django.utils.log.AdminEmailHandler"
},
Expand All @@ -208,12 +208,12 @@
"loggers": {
"": {
"handlers": ["stdout"],
"level": "ERROR",
"level": "INFO",
"propagate": True,
},
"django.request": {
"handlers": ["mail_admins"],
"level": "ERROR",
"level": "INFO",
"propagate": True,
},
}
Expand Down Expand Up @@ -270,14 +270,14 @@
#}
#for HostPath example:
DATACENTERS = {
"datacenter":{
"testpcloud":{
"fstype": "hostpath",
"host_path": "<yourpath>/users",
"host_path": "<yourpath>/%s",
"mount_path": "/pfs/%s/home/%s/" # mount_path % ( dc, username )
}
}
#FSTYPE_CEPHFS = "cephfs"
#FSTYPE_HOSTPATH = "hostpath"
FSTYPE_CEPHFS = "cephfs"
FSTYPE_HOSTPATH = "hostpath"
#DATACENTERS = {
# "meiyan":{
# "fstype": FSTYPE_CEPHFS,
Expand Down
1 change: 1 addition & 0 deletions paddlecloud/paddlecloud/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
url(r'^api/sample/$', notebook.views.SampleView.as_view()),

url(r"^api/v1/jobs/", paddlejob.views.JobsView.as_view()),
url(r"^api/v1/trainingjobs/", paddlejob.views.TrainingJobsView.as_view()),
url(r"^api/v1/pservers/", paddlejob.views.PserversView.as_view()),
url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()),
url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()),
Expand Down
44 changes: 41 additions & 3 deletions paddlecloud/paddlejob/cloudprovider/k8s_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import utils
import volume
import json

# FIXME(typhoonzero): need a base class to define the interfaces?
class K8sProvider:
Expand Down Expand Up @@ -91,7 +92,7 @@ def __setup_volumes(self, paddlejob, username):
pass
paddlejob.volumes = volumes

def submit_job(self, paddlejob, username):
def _valid_and_fill(self, paddlejob, username):
namespace = utils.email_escape(username)
api_client = utils.get_user_api_client(username)
self.__setup_volumes(paddlejob, username)
Expand All @@ -102,18 +103,20 @@ def submit_job(self, paddlejob, username):
paddlejob.image = settings.JOB_DOCKER_IMAGE["image_gpu"]
else:
paddlejob.image = settings.JOB_DOCKER_IMAGE["image"]

# jobPackage validation: startwith /pfs
# NOTE: job packages are uploaded to /pfs/[dc]/home/[user]/jobs/[jobname]
package_in_pod = os.path.join("/pfs/%s/home/%s"%(paddlejob.dc, username), "jobs", paddlejob.name)

logging.info("current package: %s", package_in_pod)
logging.info("valid_and_fill: current package: %s", package_in_pod)
# package must be ready before submit a job
current_package_path = package_in_pod.replace("/pfs/%s/home"%paddlejob.dc, settings.STORAGE_PATH)
if not os.path.exists(current_package_path):
current_package_path = package_in_pod.replace("/pfs/%s/home/%s"%(paddlejob.dc, username), settings.STORAGE_PATH)
if not os.path.exists(current_package_path):
raise Exception("package not exist in cloud: %s"%current_package_path)
logging.info("current package in pod: %s", current_package_path)
logging.info("valid_and_fill: current package in pod: %s", current_package_path)

# GPU quota management
# TODO(Yancey1989) We should move this to Kubernetes
if 'GPU_QUOTA' in dir(settings) and int(paddlejob.gpu) > 0:
Expand Down Expand Up @@ -143,6 +146,12 @@ def submit_job(self, paddlejob, username):
mount_path = "/usr/local/nvidia/lib64",
host_path = settings.NVIDIA_LIB_PATH
))

def submit_job(self, paddlejob, username):
self._valid_and_fill(paddlejob, username)

namespace = utils.email_escape(username)
api_client = utils.get_user_api_client(username)
# ========== submit master ReplicaSet if using fault_tolerant feature ==
# FIXME: alpha features in separate module
if paddlejob.fault_tolerant:
Expand Down Expand Up @@ -171,6 +180,34 @@ def submit_job(self, paddlejob, username):
raise e
return ret

def _create_traingingjobs(self, body, username):
namespace = utils.email_escape(username)
api_client = utils.get_user_api_client(username)
resource_path = '/apis/paddlepaddle.org/v1/namespaces/' + namespace + '/trainingjobs'
header_params = {}
header_params['Accept'] = api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = api_client.select_header_content_type(['*/*'])

(resp, code, header) = api_client.call_api(
resource_path, 'POST', {'namespace': namespace}, {}, header_params, body, [], _preload_content=False)

return json.loads(resp.data.decode('utf-8'))

def submit_trainingjobs(self, paddlejob, username):
self._valid_and_fill(paddlejob, username)

job = paddlejob.new_trainingjobs()
resp = self._create_traingingjobs(job, username)

logging.info(str(resp))

def delete_trainingjobs(self, paddlejob, username):
api_client = utils.get_user_api_client(username)
resp = client.ExtensionsV1beta1Api().\
delete_third_party_resource("TrainingJobs", body=kubernetes.client.V1DeleteOptions())
print("ThirdPartyResource delete")
print(str(resp))

def delete_job(self, jobname, username):
namespace = utils.email_escape(username)
api_client = utils.get_user_api_client(username)
Expand Down Expand Up @@ -254,6 +291,7 @@ def delete_job(self, jobname, username):
retcode = 200
return retcode, delete_status


def get_pservers(self, username):
namespace = utils.email_escape(username)
api_instance = client.ExtensionsV1beta1Api(api_client=utils.get_user_api_client(username))
Expand Down
5 changes: 4 additions & 1 deletion paddlecloud/paddlejob/paddle_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from kubernetes import client, config
import os

from specs import spec_master, spec_pserver, spec_trainer
from specs import spec_master, spec_pserver, spec_trainer, spec_trainingjob

DEFAULT_PADDLE_PORT=7164
DEFAULT_MASTER_PORT=8080
Expand Down Expand Up @@ -192,3 +192,6 @@ def new_pserver_job(self):

def new_trainer_job(self):
return spec_trainer.get_spec_trainer(self)

def new_trainingjobs(self):
return spec_trainingjob.get_trainingjob(self)
Loading