Skip to content

Commit

Permalink
feat: Support bayesianoptimization
Browse files Browse the repository at this point in the history
Signed-off-by: Ce Gao <gaoce@caicloud.io>
  • Loading branch information
gaocegege committed May 29, 2019
1 parent 2bc89ed commit b7602a2
Show file tree
Hide file tree
Showing 22 changed files with 1,041 additions and 20 deletions.
8 changes: 8 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/bayesianoptimization/v1alpha2
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/api/v1alpha2/python

ENTRYPOINT ["python", "main.py"]
14 changes: 14 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
- start the service

```
python suggestion/bayesian/main.py
```

- start the testing client

```
python suggestion/test_client.py
```

note:
the testing client uses the [Franke's function](http://www.sfu.ca/~ssurjano/franke2d.html) as the black box, and the maximum of Franke's function is around 1.22
Empty file.
25 changes: 25 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import grpc
from concurrent import futures

import time

from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesian_service import BayesianService

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
DEFAULT_PORT = "0.0.0.0:6789"

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
api_pb2_grpc.add_SuggestionServicer_to_server(BayesianService(), server)
server.add_insecure_port(DEFAULT_PORT)
print("Listening...")
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()
9 changes: 9 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
grpcio
duecredit
cloudpickle==0.5.6
numpy>=1.13.3
scikit-learn>=0.19.0
scipy>=0.19.1
forestci
protobuf
googleapis-common-protos
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
replicas: 1
template:
metadata:
name: katib-suggestion-bayesianoptimization
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
containers:
- name: katib-suggestion-bayesianoptimization
image: katib/v1alpha2/suggestion-bayesianoptimization
ports:
- name: api
containerPort: 6789
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: v1
kind: Service
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
type: ClusterIP
ports:
- port: 6789
protocol: TCP
name: api
selector:
app: katib
component: katib-bayesianoptimization
209 changes: 209 additions & 0 deletions pkg/suggestion/v1alpha2/bayesian_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import random
import string

import grpc
import numpy as np

from pkg.api.v1alpha2.python import api_pb2
from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesianoptimization.src.bayesian_optimization_algorithm import BOAlgorithm
from pkg.suggestion.v1alpha2.bayesianoptimization.src.algorithm_manager import AlgorithmManager
import logging
from logging import getLogger, StreamHandler, INFO, DEBUG


class BayesianService(api_pb2_grpc.SuggestionServicer):
def __init__(self, logger=None):
self.manager_addr = "katib-manager"
self.manager_port = 6789
if logger == None:
self.logger = getLogger(__name__)
FORMAT = '%(asctime)-15s StudyID %(studyid)s %(message)s'
logging.basicConfig(format=FORMAT)
handler = StreamHandler()
handler.setLevel(INFO)
self.logger.setLevel(INFO)
self.logger.addHandler(handler)
self.logger.propagate = False
else:
self.logger = logger

def _get_experiment(self, name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
exp = client.GetExperiment(
api_pb2.GetExperimentRequest(experiment_name=name), 10)
return exp.experiment

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
service_params = self.parseParameters(request.experiment_name)
experiment = self._get_experiment(request.experiment_name)
X_train, y_train = self.getEvalHistory(
request.experiment_name, experiment.spec.objective.objective_metric_name, service_params["burn_in"])

algo_manager = AlgorithmManager(
experiment_name=request.experiment_name,
experiment=experiment,
X_train=X_train,
y_train=y_train,
logger=self.logger,
)

lowerbound = np.array(algo_manager.lower_bound)
upperbound = np.array(algo_manager.upper_bound)
self.logger.debug("lowerbound: %r", lowerbound,
extra={"StudyID": request.study_id})
self.logger.debug("upperbound: %r", upperbound,
extra={"StudyID": request.study_id})
alg = BOAlgorithm(
dim=algo_manager.dim,
N=int(service_params["N"]),
lowerbound=lowerbound,
upperbound=upperbound,
X_train=algo_manager.X_train,
y_train=algo_manager.y_train,
mode=service_params["mode"],
trade_off=service_params["trade_off"],
# todo: support length_scale with array type
length_scale=service_params["length_scale"],
noise=service_params["noise"],
nu=service_params["nu"],
kernel_type=service_params["kernel_type"],
n_estimators=service_params["n_estimators"],
max_features=service_params["max_features"],
model_type=service_params["model_type"],
logger=self.logger,
)
trials = []
x_next_list = alg.get_suggestion(request.request_number)
for x_next in x_next_list:
x_next = x_next.squeeze()
self.logger.debug("xnext: %r ", x_next, extra={
"StudyID": request.study_id})
x_next = algo_manager.parse_x_next(x_next)
x_next = algo_manager.convert_to_dict(x_next)
trials.append(api_pb2.Trial(
spec=api_pb2.TrialSpec(
experiment_name=request.experiment_name,
parameter_assignments=api_pb2.TrialSpec.ParameterAssignments(
assignments=[
api_pb2.ParameterAssignment(
name=x["name"],
value=str(x["value"]),
) for x in x_next
]
)
)
))
return api_pb2.GetSuggestionsReply(
trials=trials
)

def getEvalHistory(self, experiment_name, obj_name, burn_in):
worker_hist = []
x_train = []
y_train = []
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
trialsrep = client.GetTrialList(api_pb2.GetTrialListRequest(
experiment_name=experiment_name
))
for t in trialsrep.trials:
if t.status.condition == 2:
gwfrep = client.GetObservationLog(
api_pb2.GetObservationLogRequest(
trial_name=t.name,
metric_name=obj_name))
w = gwfrep.observation_log
for ml in w.metrics_logs:
if ml.name == obj_name:
y_train.append(float(ml.values[-1].value))
x_train.append(w.parameter_set)
break
self.logger.info("%d completed trials are found.",
len(x_train), extra={"Experiment": experiment_name})
if len(x_train) <= burn_in:
x_train = []
y_train = []
self.logger.info("Trials will be sampled until %d trials for burn-in are completed.",
burn_in, extra={"Experiment": experiment_name})
else:
self.logger.debug("Completed trials: %r", x_train,
extra={"Experiment": experiment_name})

return x_train, y_train

def parseParameters(self, experiment_name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
params = []
with api_pb2.beta_create_Manager_stub(channel) as client:
gsprep = client.GetAlgorithmExtraSettings(
api_pb2.GetAlgorithmExtraSettingsRequest(param_id=experiment_name), 10)
params = gsprep.extra_algorithm_settings

parsed_service_params = {
"N": 100,
"model_type": "gp",
"max_features": "auto",
"length_scale": 0.5,
"noise": 0.0005,
"nu": 1.5,
"kernel_type": "matern",
"n_estimators": 50,
"mode": "pi",
"trade_off": 0.01,
"trial_hist": "",
"burn_in": 10,
}
modes = ["pi", "ei"]
model_types = ["gp", "rf"]
kernel_types = ["matern", "rbf"]

for param in params:
if param.name in parsed_service_params.keys():
if param.name == "length_scale" or param.name == "noise" or param.name == "nu" or param.name == "trade_off":
try:
float(param.value)
except ValueError:
self.logger.warning(
"Parameter must be float for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = float(param.value)

elif param.name == "N" or param.name == "n_estimators" or param.name == "burn_in":
try:
int(param.value)
except ValueError:
self.logger.warning(
"Parameter must be int for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = int(param.value)

elif param.name == "kernel_type":
if param.value != "rbf" and param.value != "matern":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "mode" and param.value in modes:
if param.value != "lcb" and param.value != "ei" and param.value != "pi":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "model_type" and param.value in model_types:
if param.value != "rf" and param.value != "gp":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
else:
self.logger.warning("Unknown Parameter name: %s ", param.name)

return parsed_service_params
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
""" module for acquisition function"""
import numpy as np
from scipy.stats import norm


class AcquisitionFunc:
"""
Class for acquisition function with options for expected improvement,
probability of improvement, or lower confident bound.
"""

def __init__(self, model, current_optimal, mode="ei", trade_off=0.01):
"""
:param mode: pi: probability of improvement, ei: expected improvement, lcb: lower confident bound
:param trade_off: a parameter to control the trade off between exploiting and exploring
:param model_type: gp: gaussian process, rf: random forest
"""
self.model = model
self.current_optimal = current_optimal
self.mode = mode
self.trade_off = trade_off

def compute(self, X_test):
y_mean, y_std, y_variance = self.model.predict(X_test)

z = (y_mean - self.current_optimal - self.trade_off) / y_std

if self.mode == "ei":
if y_std.any() < 0.000001:
return 0, y_mean, y_variance
result = y_std * (z * norm.cdf(z) + norm.pdf(z))
elif self.mode == "pi":
result = norm.cdf(z)
else:
result = - (y_mean - self.trade_off * y_std)
return np.squeeze(result), np.squeeze(y_mean), np.squeeze(y_variance)
Loading

0 comments on commit b7602a2

Please sign in to comment.