Skip to content

Commit

Permalink
Merge pull request #385 from PaddlePaddle/controller
Browse files Browse the repository at this point in the history
[WIP]Autoscaling Controller
  • Loading branch information
typhoonzero authored Oct 21, 2017
2 parents f92c7ac + a2d1adc commit d5816c5
Show file tree
Hide file tree
Showing 33 changed files with 2,560 additions and 15 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ matrix:
- language: go
go: 1.8.x
sudo: required
before_script:
- eval "$(GIMME_GO_VERSION=1.8.3 gimme)"
install:
- go get -u github.com/golang/lint/golint
- curl https://glide.sh/get | bash
- sudo pip install pre-commit
script:
- |
bash .tools/check_style.sh
RESULT=$?; if [ $RESULT -eq 0 ]; then true; else false; fi;
- cd go && bash .tools/gen_config.sh && glide install && go test $(glide novendor)
- cd go && bash .tools/gen_config.sh && glide install --strip-vendor && go test $(glide novendor)
- language: python
python: 2.7
sudo: required
Expand Down
53 changes: 53 additions & 0 deletions doc/autoscale_example/autoscale.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Run Autoscaling job on your local machine

This documentation shows an example to run two jobs on a local kubernetes cluster and see the job scaling status.

## Prerequisites

- [install minikube](https://kubernetes.io/docs/tasks/tools/install-minikube/)
- [install kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)

## Run local Autoscaling job

1. Start a local minikube cluster.

```bash
minikube start --kubernetes-version v1.6.4
```

1. Run the following commands to create sample training workspace and
data.

```bash
mkdir /path/to/workspace
cp $REPO_PATH/doc/autoscale_example/*.py /path/to/workspace
mkdir -p /path/to/workspace/data/
cp -r $REPO_PATH/doc/autoscale_example/uci_housing/ /path/to/workspace/data/
```

1. Mount the workspace folder into Minikube:

```bash
minikube mount /path/to/workspace:/workspace
```

The `minikube mount` command will block, so start a new terminal to
continue the tutorial.

1. Start controller and a example job:

```bash
cd $REPO_PATH/k8s/controller
kubectl create -f controller.yaml
kubectl create -f trainingjob_resource.yaml
kubectl create -f autoscale_job/
kubectl get pods
```

1. Start another job simulating cluster load, then you can observe the
scale process using `kubectl get pods`:

```bash
kubectl create -f autoscale_load/
kubectl get pods
```
17 changes: 17 additions & 0 deletions doc/autoscale_example/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys
import os
import errno
import recordio
import paddle.v2.dataset as ds

def convert(output_path, name):
mod = __import__("paddle.v2.dataset." + name, fromlist=[''])

path = os.path.join(output_path, name)

mod.convert(path)

if __name__ == '__main__':
a = ['uci_housing']
for m in a:
convert("./data", m)
70 changes: 70 additions & 0 deletions doc/autoscale_example/train_ft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import paddle.v2 as paddle
import os
import gzip
from paddle.v2.reader.creator import cloud_reader
import paddle.v2.dataset.uci_housing as uci_housing

etcd_ip = os.getenv("ETCD_IP")
etcd_endpoint = "http://" + etcd_ip + ":" + "2379"
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))

def main():
# init
paddle.init()

# network config
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13))
y_predict = paddle.layer.fc(input=x,
param_attr=paddle.attr.Param(name='w', learning_rate=1e-3),
size=1,
act=paddle.activation.Linear(),
bias_attr=paddle.attr.Param(name='b', learning_rate=1e-3))
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
cost = paddle.layer.square_error_cost(input=y_predict, label=y)

# create parameters
parameters = paddle.parameters.create(cost)

# create optimizer
optimizer = paddle.optimizer.Momentum(momentum=0, learning_rate=2e-4)

trainer = paddle.trainer.SGD(
cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec=etcd_endpoint,
use_etcd=True)

feeding = {'x': 0, 'y': 1}

# event_handler to print training and testing info
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
print "Pass %d, Batch %d, Cost %f" % (
event.pass_id, event.batch_id, event.cost)

if isinstance(event, paddle.event.EndPass):
result = trainer.test(
reader=paddle.batch(uci_housing.test(), batch_size=2),
feeding=feeding)
print "Test %d, Cost %f" % (event.pass_id, result.cost)
if trainer_id == "0":
with gzip.open("fit-a-line_pass_%05d.tar.gz" % event.pass_id,
"w") as f:
parameters.to_tar(f)
# training
trainer.train(
reader=paddle.batch(
paddle.reader.shuffle(cloud_reader(
["/workspace/data/uci_housing/uci_housing_train-*"],
etcd_endpoint), buf_size=500),
batch_size=2),
feeding=feeding,
event_handler=event_handler,
num_passes=30)


if __name__ == '__main__':
main()
130 changes: 130 additions & 0 deletions doc/autoscale_example/train_mnist_ft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from PIL import Image
import numpy as np
import paddle.v2 as paddle
import paddle.v2.dataset.common as common
import paddle.v2.dataset.mnist as mnist
from paddle.v2.reader.creator import cloud_reader
import os
import sys
import glob
import pickle


# NOTE: must change this to your own username on paddlecloud.
TRAIN_FILES_PATH = "/workspace/data/mnist/minist_train-*"
TEST_FILES_PATH = "/workspace/data/mnist/minist_test-*"

etcd_ip = os.getenv("ETCD_IP")
etcd_endpoint = "http://" + etcd_ip + ":" + "2379"
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1"))


def softmax_regression(img):
predict = paddle.layer.fc(
input=img, size=10, act=paddle.activation.Softmax())
return predict


def multilayer_perceptron(img):
# The first fully-connected layer
hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu())
# The second fully-connected layer and the according activation function
hidden2 = paddle.layer.fc(
input=hidden1, size=64, act=paddle.activation.Relu())
# The thrid fully-connected layer, note that the hidden size should be 10,
# which is the number of unique digits
predict = paddle.layer.fc(
input=hidden2, size=10, act=paddle.activation.Softmax())
return predict


def convolutional_neural_network(img):
# first conv layer
conv_pool_1 = paddle.networks.simple_img_conv_pool(
input=img,
filter_size=5,
num_filters=20,
num_channel=1,
pool_size=2,
pool_stride=2,
act=paddle.activation.Relu())
# second conv layer
conv_pool_2 = paddle.networks.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
num_channel=20,
pool_size=2,
pool_stride=2,
act=paddle.activation.Relu())
# fully-connected layer
predict = paddle.layer.fc(
input=conv_pool_2, size=10, act=paddle.activation.Softmax())
return predict


def main():
paddle.init()

# define network topology
images = paddle.layer.data(
name='pixel', type=paddle.data_type.dense_vector(784))
label = paddle.layer.data(
name='label', type=paddle.data_type.integer_value(10))

# Here we can build the prediction network in different ways. Please
# choose one by uncomment corresponding line.
# predict = softmax_regression(images)
# predict = multilayer_perceptron(images)
predict = convolutional_neural_network(images)

cost = paddle.layer.classification_cost(input=predict, label=label)

parameters = paddle.parameters.create(cost)

optimizer = paddle.optimizer.Momentum(
learning_rate=0.1 / 128.0,
momentum=0.9,
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128))

trainer = paddle.trainer.SGD(
cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec=etcd_endpoint,
use_etcd=True)

def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
if isinstance(event, paddle.event.EndPass):
result = trainer.test(
reader=paddle.batch(
mnist.test(),
batch_size=2))
print "Test with Pass %d, Cost %f, %s\n" % (
event.pass_id, result.cost, result.metrics)

trainer.train(
reader=paddle.batch(
cloud_reader([TRAIN_FILES_PATH], etcd_endpoint),
batch_size=128),
event_handler=event_handler,
num_passes=30)

if __name__ == '__main__':
usage = "python train.py [prepare|train]"
if len(sys.argv) != 2:
print usage
exit(1)

if trainer_id == -1 or etcd_ip == "":
print "no cloud environ found, must run on cloud"
exit(1)

if sys.argv[1] == "train":
main()

Binary file not shown.
Binary file not shown.
3 changes: 2 additions & 1 deletion docker/k8s_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def wait_pods_running(label_selector, desired):
print "label selector: %s, desired: %s" % (label_selector, desired)
while True:
count = count_pods_by_phase(label_selector, 'Running')
if count == int(desired):
# NOTE: pods may be scaled.
if count >= int(desired):
break
print 'current cnt: %d sleep for 5 seconds...' % count
time.sleep(5)
Expand Down
8 changes: 8 additions & 0 deletions go/api/deepcopy_gen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

# FIXME: should run this script before build, when using api >= 1.7
# api == 1.6 is not compatible with this deep copy code generations.

go get -u k8s.io/gengo
go build -o /tmp/deepcopy-gen k8s.io/gengo/examples/deepcopy-gen
/tmp/deepcopy-gen -i github.com/PaddlePaddle/cloud/go/api -O zz_generated.deepcopy 2> /dev/null
50 changes: 50 additions & 0 deletions go/api/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2016 PaddlePaddle Authors All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

package api

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
clientgoapi "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
)

// ConfigureClient will setup required field that the k8s rest client needs.
func ConfigureClient(config *rest.Config) {
groupversion := schema.GroupVersion{
Group: "paddlepaddle.org",
Version: "v1",
}

config.GroupVersion = &groupversion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: clientgoapi.Codecs}

schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
groupversion,
&TrainingJob{},
&TrainingJobList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
)
return nil
})
schemeBuilder.AddToScheme(clientgoapi.Scheme)
}
Loading

0 comments on commit d5816c5

Please sign in to comment.