-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathpysot.coco
269 lines (233 loc) · 8.84 KB
/
pysot.coco
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
The pySOT backend. Does black box optimization using pySOT.
"""
import sys
import numpy as np
# patch in abc.ABC if it doesn't already exist
import abc
if not hasattr(abc, "ABC"):
class ABC:
__metaclass__ = abc.ABCMeta
abc.ABC = ABC
from pySOT.optimization_problems.optimization_problem import OptimizationProblem
from pySOT.experimental_design import (
ExperimentalDesign,
LatinHypercube,
SymmetricLatinHypercube,
TwoFactorial,
)
from pySOT.surrogate import (
RBFInterpolant,
GPRegressor,
)
from pySOT.surrogate.kernels import (
CubicKernel,
LinearKernel,
)
from pySOT.surrogate.tails import (
ConstantTail,
LinearTail,
)
from pySOT.strategy import (
SRBFStrategy,
EIStrategy,
DYCORSStrategy,
LCBStrategy,
)
from bbopt.util import sorted_items
from bbopt.backends.util import StandardBackend
# Utilities:
class EmptyExperimentalDesign(ExperimentalDesign):
num_pts = 0
def __init__(self, dim):
self.dim = dim
def generate_points(self, lb=None, ub=None, int_var=None):
return np.empty((0, self.dim))
class BBoptOptimizationProblem(OptimizationProblem):
def __init__(self, params):
self.params = params
self.dim = len(params)
self.got_values = None
# lower and upper bounds; len must match self.dim
self.lb = []
self.ub = []
# lists of indices that are discrete vs. continuous
self.int_var = []
self.cont_var = []
self.names = []
self.choices = {}
for i, (name, (func, args, kwargs)) in enumerate(sorted_items(params)):
self.names.append(name)
match func, args:
case "choice", (choices,):
self.lb.append(0)
self.ub.append(len(choices) - 1)
self.int_var.append(i)
self.choices[name] = choices
case "randrange", (start, stop, step):
rng = range(start, stop, step)
self.lb.append(0)
self.ub.append(len(rng) - 1)
self.int_var.append(i)
self.choices[name] = rng
case "uniform", (start, stop):
self.lb.append(start)
self.ub.append(stop)
self.cont_var.append(i)
else:
raise TypeError(f"insufficiently specified parameter {name}")
self.lb |>= np.array
self.ub |>= np.array
self.int_var |>= np.array
self.cont_var |>= np.array
def \eval(self, xs):
"""Set got_values to the given values."""
self.__check_input__(xs)
self.got_values = {}
for i in range(self.dim):
name = self.names[i]
val = xs[i]
match {==name: choices, **_} in self.choices:
assert val == int(val), val
val = choices[int(val)]
self.got_values[name] = val
def get_points_values(self, new_data, new_losses):
"""Convert data and losses into pySOT-compatible points and values."""
points = []
values = []
for ex_dict, loss in zip(new_data, new_losses):
pt = []
for name, val in ex_dict.items():
match {==name: choices, **_} in self.choices:
chosen_ind = choices.index(val)
pt.append(chosen_ind)
else:
pt.append(val)
points.append(pt)
values.append(loss)
return np.array(points), np.array(values)
# Backend:
class PySOTBackend(StandardBackend):
"""The pySOT backend uses pySOT for black box optimization."""
backend_name = "pySOT"
implemented_funcs = (
"choice",
"randrange",
"uniform",
)
strategy = None
@override
def setup_backend(
self,
params,
strategy="SRBF",
surrogate="RBF",
design=None,
):
self.opt_problem = BBoptOptimizationProblem(params)
design_kwargs = dict(
dim=self.opt_problem.dim,
)
match design:
case None:
self.exp_design = EmptyExperimentalDesign(**design_kwargs)
case "latin_hypercube":
self.exp_design = LatinHypercube(
num_pts=2 * (self.opt_problem.dim + 1),
**design_kwargs,
)
case "symmetric_latin_hypercube":
self.exp_design = SymmetricLatinHypercube(
num_pts=2 * (self.opt_problem.dim + 1),
**design_kwargs,
)
case "two_factorial":
self.exp_design = TwoFactorial(**design_kwargs)
case design_cls if callable(design_cls):
self.exp_design = design_cls(**design_kwargs)
else:
raise TypeError(f"unknown experimental design {design!r}")
surrogate_kwargs = dict(
dim=self.opt_problem.dim,
lb=self.opt_problem.lb,
ub=self.opt_problem.ub,
)
match surrogate:
case "RBF":
self.surrogate = RBFInterpolant(
kernel=LinearKernel() if design is None else CubicKernel(),
tail=ConstantTail(self.opt_problem.dim) if design is None else LinearTail(self.opt_problem.dim),
**surrogate_kwargs,
)
case "GP":
self.surrogate = GPRegressor(**surrogate_kwargs)
case surrogate_cls if callable(surrogate_cls):
self.surrogate = surrogate_cls(**surrogate_kwargs)
else:
raise TypeError(f"unknown surrogate {surrogate!r}")
strategy_kwargs = dict(
max_evals=sys.maxsize,
opt_prob=self.opt_problem,
exp_design=self.exp_design,
surrogate=self.surrogate,
asynchronous=True,
batch_size=1,
)
match strategy:
case "SRBF":
self.strategy = SRBFStrategy(**strategy_kwargs)
case "EI":
self.strategy = EIStrategy(**strategy_kwargs)
case "DYCORS":
self.strategy = DYCORSStrategy(**strategy_kwargs)
case "LCB":
self.strategy = LCBStrategy(**strategy_kwargs)
case strategy_cls if callable(strategy_cls):
self.strategy = strategy_cls(**strategy_kwargs)
else:
raise TypeError(f"unknown strategy {strategy!r}")
@override
def tell_data(self, new_data, new_losses):
"""Special method that allows fast updating of the backend with new examples."""
points, values = self.opt_problem.get_points_values(new_data, new_losses)
for i in range(points.shape[0]):
X = np.copy(points[i, :])
self.strategy.X = np.vstack((self.strategy.X, X))
self.strategy._X = np.vstack((self.strategy._X, X))
self.strategy.fX = np.vstack((self.strategy.fX, values[i]))
self.strategy._fX = np.vstack((self.strategy._fX, values[i]))
assert self.surrogate is self.strategy.surrogate, (self.surrogate, self.strategy.surrogate)
self.surrogate.add_points(X, values[i])
@override
def get_next_values(self):
"""Special method to get the next set of values to evaluate."""
assert self.strategy._X.shape[0] > 0, self.strategy._X
assert self.surrogate.num_pts > 0, self.surrogate.num_pts
while True:
proposal = self.strategy.propose_action()
assert proposal, proposal
if proposal.action == "terminate":
proposal.accept()
elif proposal.action == "eval":
self.opt_problem.eval(*proposal.args)
self.strategy.pending_evals -= 1
self.strategy.remove_pending(proposal.args[0])
break
else:
proposal.reject()
assert self.opt_problem.got_values is not None, "pySOT optimization produced no values"
return self.opt_problem.got_values
# Registered names:
PySOTBackend.register()
PySOTBackend.register_alias("pysot")
# strategy-based, default design algs
PySOTBackend.register_alg("stochastic_radial_basis_function", strategy="SRBF", surrogate="RBF")
PySOTBackend.register_alg("expected_improvement", strategy="EI", surrogate="GP")
PySOTBackend.register_alg("DYCORS", strategy="DYCORS", surrogate="RBF")
PySOTBackend.register_alg("lower_confidence_bound", strategy="LCB", surrogate="GP")
# design-based, default strategy algs
PySOTBackend.register_alg("latin_hypercube", design="latin_hypercube")
PySOTBackend.register_alg("symmetric_latin_hypercube", design="symmetric_latin_hypercube")
PySOTBackend.register_alg("two_factorial", design="two_factorial")
# register meta alg
PySOTBackend.register_meta_for_all_algs("any_pysot")