diff --git a/botorch/acquisition/input_constructors.py b/botorch/acquisition/input_constructors.py index 6e6f7dcd62..f4ac50fd9c 100644 --- a/botorch/acquisition/input_constructors.py +++ b/botorch/acquisition/input_constructors.py @@ -78,6 +78,7 @@ from botorch.acquisition.preference import AnalyticExpectedUtilityOfBestOption from botorch.acquisition.risk_measures import RiskMeasureMCObjective from botorch.acquisition.utils import ( + compute_best_feasible_objective, expand_trace_observations, get_optimal_samples, project_to_target_fidelity, @@ -457,7 +458,9 @@ def construct_inputs_qEI( X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, best_f: Optional[Union[float, Tensor]] = None, - **kwargs: Any, + constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, + eta: Union[Tensor, float] = 1e-3, + **ignored: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qExpectedImprovement` constructor. @@ -473,7 +476,15 @@ def construct_inputs_qEI( sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. best_f: Threshold above (or below) which improvement is defined. - kwargs: Not used. + constraints: A list of constraint callables which map a Tensor of posterior + samples of dimension `sample_shape x batch-shape x q x m`-dim to a + `sample_shape x batch-shape x q`-dim Tensor. The associated constraints + are considered satisfied if the output is less than zero. + eta: Temperature parameter(s) governing the smoothness of the sigmoid + approximation to the constraint indicators. For more details, on this + parameter, see the docs of `compute_smoothed_constraint_indicator`. + ignored: Not used. + Returns: A dict mapping kwarg names of the constructor to values. """ @@ -489,9 +500,11 @@ def construct_inputs_qEI( training_data=training_data, objective=objective, posterior_transform=posterior_transform, + constraints=constraints, + model=model, ) - return {**base_inputs, "best_f": best_f} + return {**base_inputs, "best_f": best_f, "constraints": constraints, "eta": eta} @acqf_input_constructor(qNoisyExpectedImprovement) @@ -505,7 +518,9 @@ def construct_inputs_qNEI( X_baseline: Optional[Tensor] = None, prune_baseline: Optional[bool] = True, cache_root: Optional[bool] = True, - **kwargs: Any, + constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, + eta: Union[Tensor, float] = 1e-3, + **ignored: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qNoisyExpectedImprovement` constructor. @@ -527,7 +542,14 @@ def construct_inputs_qNEI( prune_baseline: If True, remove points in `X_baseline` that are highly unlikely to be the best point. This can significantly improve performance and is generally recommended. - kwargs: Not used. + constraints: A list of constraint callables which map a Tensor of posterior + samples of dimension `sample_shape x batch-shape x q x m`-dim to a + `sample_shape x batch-shape x q`-dim Tensor. The associated constraints + are considered satisfied if the output is less than zero. + eta: Temperature parameter(s) governing the smoothness of the sigmoid + approximation to the constraint indicators. For more details, on this + parameter, see the docs of `compute_smoothed_constraint_indicator`. + ignored: Not used. Returns: A dict mapping kwarg names of the constructor to values. @@ -553,6 +575,8 @@ def construct_inputs_qNEI( "X_baseline": X_baseline, "prune_baseline": prune_baseline, "cache_root": cache_root, + "constraints": constraints, + "eta": eta, } @@ -566,7 +590,9 @@ def construct_inputs_qPI( sampler: Optional[MCSampler] = None, tau: float = 1e-3, best_f: Optional[Union[float, Tensor]] = None, - **kwargs: Any, + constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, + eta: Union[Tensor, float] = 1e-3, + **ignored: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qProbabilityOfImprovement` constructor. @@ -588,13 +614,26 @@ def construct_inputs_qPI( best_f: The best objective value observed so far (assumed noiseless). Can be a `batch_shape`-shaped tensor, which in case of a batched model specifies potentially different values for each element of the batch. - kwargs: Not used. + constraints: A list of constraint callables which map a Tensor of posterior + samples of dimension `sample_shape x batch-shape x q x m`-dim to a + `sample_shape x batch-shape x q`-dim Tensor. The associated constraints + are considered satisfied if the output is less than zero. + eta: Temperature parameter(s) governing the smoothness of the sigmoid + approximation to the constraint indicators. For more details, on this + parameter, see the docs of `compute_smoothed_constraint_indicator`. + ignored: Not used. + Returns: A dict mapping kwarg names of the constructor to values. """ if best_f is None: - best_f = get_best_f_mc(training_data=training_data, objective=objective) - + best_f = get_best_f_mc( + training_data=training_data, + objective=objective, + posterior_transform=posterior_transform, + constraints=constraints, + model=model, + ) base_inputs = _construct_inputs_mc_base( model=model, objective=objective, @@ -603,7 +642,13 @@ def construct_inputs_qPI( X_pending=X_pending, ) - return {**base_inputs, "tau": tau, "best_f": best_f} + return { + **base_inputs, + "tau": tau, + "best_f": best_f, + "constraints": constraints, + "eta": eta, + } @acqf_input_constructor(qUpperConfidenceBound) @@ -615,7 +660,7 @@ def construct_inputs_qUCB( X_pending: Optional[Tensor] = None, sampler: Optional[MCSampler] = None, beta: float = 0.2, - **kwargs: Any, + **ignored: Any, ) -> Dict[str, Any]: r"""Construct kwargs for the `qUpperConfidenceBound` constructor. @@ -631,7 +676,7 @@ def construct_inputs_qUCB( sampler: The sampler used to draw base samples. If omitted, uses the acquisition functions's default sampler. beta: Controls tradeoff between mean and standard deviation in UCB. - kwargs: Not used. + ignored: Not used. Returns: A dict mapping kwarg names of the constructor to values. @@ -1083,18 +1128,28 @@ def get_best_f_mc( training_data: MaybeDict[SupervisedDataset], objective: Optional[MCAcquisitionObjective] = None, posterior_transform: Optional[PosteriorTransform] = None, + constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, + model: Optional[Model] = None, ) -> Tensor: if isinstance(training_data, dict) and not _field_is_shared( training_data, fieldname="X" ): raise NotImplementedError("Currently only block designs are supported.") + X_baseline = _get_dataset_field( + training_data, + fieldname="X", + transform=lambda field: field(), + assert_shared=True, + first_only=True, + ) + Y = _get_dataset_field( training_data, fieldname="Y", transform=lambda field: field(), join_rule=lambda field_tensors: torch.cat(field_tensors, dim=-1), - ) + ) # batch_shape x n x d if posterior_transform is not None: # retain the original tensor dimension since objective expects explicit @@ -1111,7 +1166,16 @@ def get_best_f_mc( "acquisition functions)." ) objective = IdentityMCObjective() - return objective(Y).max(-1).values + obj = objective(Y, X=X_baseline) # batch_shape x n + return compute_best_feasible_objective( + samples=Y, + obj=obj, + constraints=constraints, + model=model, + objective=objective, + posterior_transform=posterior_transform, + X_baseline=X_baseline, + ) def optimize_objective( diff --git a/botorch/acquisition/monte_carlo.py b/botorch/acquisition/monte_carlo.py index f0bd2be915..c366fdcb0d 100644 --- a/botorch/acquisition/monte_carlo.py +++ b/botorch/acquisition/monte_carlo.py @@ -27,7 +27,6 @@ from typing import Any, Callable, List, Optional, Protocol, Tuple, Union import torch -from botorch import acquisition from botorch.acquisition.acquisition import AcquisitionFunction, MCSamplerMixin from botorch.acquisition.cached_cholesky import CachedCholeskyMCAcquisitionFunction from botorch.acquisition.objective import ( @@ -36,7 +35,10 @@ MCAcquisitionObjective, PosteriorTransform, ) -from botorch.acquisition.utils import prune_inferior_points +from botorch.acquisition.utils import ( + compute_best_feasible_objective, + prune_inferior_points, +) from botorch.exceptions.errors import UnsupportedError from botorch.models.model import Model from botorch.sampling.base import MCSampler @@ -591,7 +593,8 @@ def _get_samples_and_objectives(self, X: Tensor) -> Tuple[Tensor, Tensor]: return samples, obj def _compute_best_feasible_objective(self, samples: Tensor, obj: Tensor) -> Tensor: - """ + r"""Computes best feasible objective value from samples. + Args: samples: `sample_shape x batch_shape x q x m`-dim posterior samples. obj: A `sample_shape x batch_shape x q`-dim Tensor of MC objective values. @@ -599,38 +602,15 @@ def _compute_best_feasible_objective(self, samples: Tensor, obj: Tensor) -> Tens Returns: A `sample_shape x batch_shape x 1`-dim Tensor of best feasible objectives. """ - if self._constraints is not None: - # is_feasible is sample_shape x batch_shape x q - is_feasible = compute_smoothed_constraint_indicator( - constraints=self._constraints, samples=samples, eta=self._eta - ) - is_feasible = is_feasible > 0.5 # due to smooth approximation - if is_feasible.any(): - obj = torch.where(is_feasible, obj, -torch.inf) - else: # if there are no feasible observations, estimate a lower - # bound on the objective by sampling convex combinations of X_baseline. - convex_weights = torch.rand( - 32, - self.X_baseline.shape[-2], - dtype=self.X_baseline.dtype, - device=self.X_baseline.device, - ) - weights_sum = convex_weights.sum(dim=0, keepdim=True) - convex_weights = convex_weights / weights_sum - # infeasible cost M is such that -M < min_x f(x), thus - # 0 < min_x f(x) - (-M), so we should take -M as a lower - # bound on the best feasible objective - return -acquisition.utils.get_infeasible_cost( - X=convex_weights @ self.X_baseline, - model=self.model, - objective=self.objective, - posterior_transform=self.posterior_transform, - ).expand(*obj.shape[:-1], 1) - - # we don't need to differentiate through X_baseline for now, so taking - # the regular max over the n points to get best_f is fine - with torch.no_grad(): - return obj.amax(dim=-1, keepdim=True) + return compute_best_feasible_objective( + samples=samples, + obj=obj, + constraints=self._constraints, + model=self.model, + objective=self.objective, + posterior_transform=self.posterior_transform, + X_baseline=self.X_baseline, + ) class qProbabilityOfImprovement(SampleReducingMCAcquisitionFunction): diff --git a/botorch/acquisition/utils.py b/botorch/acquisition/utils.py index 595bd6750c..a683896840 100644 --- a/botorch/acquisition/utils.py +++ b/botorch/acquisition/utils.py @@ -32,6 +32,7 @@ FastNondominatedPartitioning, NondominatedPartitioning, ) +from botorch.utils.objective import compute_feasibility_indicator from botorch.utils.sampling import optimize_posterior_samples from botorch.utils.transforms import is_fully_bayesian from torch import Tensor @@ -100,10 +101,19 @@ def get_acquisition_function( ) # instantiate and return the requested acquisition function if acquisition_function_name in ("qEI", "qPI"): - obj = objective( - model.posterior(X_observed, posterior_transform=posterior_transform).mean + # Since these are the non-noisy variants, use the posterior mean at the observed + # inputs directly to compute the best feasible value without sampling. + Y = model.posterior(X_observed, posterior_transform=posterior_transform).mean + obj = objective(samples=Y, X=X_observed) + best_f = compute_best_feasible_objective( + samples=Y, + obj=obj, + constraints=constraints, + model=model, + objective=objective, + posterior_transform=posterior_transform, + X_baseline=X_observed, ) - best_f = obj.max(dim=-1).values if acquisition_function_name == "qEI": return monte_carlo.qExpectedImprovement( model=model, @@ -112,6 +122,8 @@ def get_acquisition_function( objective=objective, posterior_transform=posterior_transform, X_pending=X_pending, + constraints=constraints, + eta=eta, ) elif acquisition_function_name == "qPI": return monte_carlo.qProbabilityOfImprovement( @@ -122,6 +134,8 @@ def get_acquisition_function( posterior_transform=posterior_transform, X_pending=X_pending, tau=kwargs.get("tau", 1e-3), + constraints=constraints, + eta=eta, ) elif acquisition_function_name == "qNEI": return monte_carlo.qNoisyExpectedImprovement( @@ -134,6 +148,8 @@ def get_acquisition_function( prune_baseline=kwargs.get("prune_baseline", True), marginalize_dim=kwargs.get("marginalize_dim"), cache_root=kwargs.get("cache_root", True), + constraints=constraints, + eta=eta, ) elif acquisition_function_name == "qSR": return monte_carlo.qSimpleRegret( @@ -213,6 +229,113 @@ def get_acquisition_function( ) +def compute_best_feasible_objective( + samples: Tensor, + obj: Tensor, + constraints: Optional[List[Callable[[Tensor], Tensor]]], + model: Optional[Model] = None, + objective: Optional[MCAcquisitionObjective] = None, + posterior_transform: Optional[PosteriorTransform] = None, + X_baseline: Optional[Tensor] = None, + infeasible_obj: Optional[Tensor] = None, +) -> Tensor: + """Computes the largest `obj` value that is feasible under the `constraints`. If + `constraints` is None, returns the best unconstrained objective value. + + When no feasible observations exist and `infeasible_obj` is not `None`, returns + `infeasible_obj` (potentially reshaped). When no feasible observations exist and + `infeasible_obj` is `None`, uses `model`, `objective`, `posterior_transform`, and + `X_baseline` to infer and return an `infeasible_obj` `M` s.t. `M < min_x f(x)`. + + Args: + samples: `(sample_shape) x batch_shape x q x m`-dim posterior samples. + obj: A `(sample_shape) x batch_shape x q`-dim Tensor of MC objective values. + constraints: A list of constraint callables which map posterior samples to + a scalar. The associated constraint is considered satisfied if this + scalar is less than zero. + model: A Model, only required when there are no feasible observations. + objective: An MCAcquisitionObjective, only optionally used when there are no + feasible observations. + posterior_transform: A PosteriorTransform, only optionally used when there are + no feasible observations. + X_baseline: A `batch_shape x d`-dim Tensor of baseline points, only required + when there are no feasible observations. + infeasible_obj: A Tensor to be returned when no feasible points exist. + + Returns: + A `(sample_shape) x batch_shape x 1`-dim Tensor of best feasible objectives. + """ + if constraints is None: # unconstrained case + # we don't need to differentiate through X_baseline for now, so taking + # the regular max over the n points to get best_f is fine + with torch.no_grad(): + return obj.amax(dim=-1, keepdim=True) + + is_feasible = compute_feasibility_indicator( + constraints=constraints, samples=samples + ) # sample_shape x batch_shape x q + if is_feasible.any(): + obj = torch.where(is_feasible, obj, -torch.inf) + with torch.no_grad(): + return obj.amax(dim=-1, keepdim=True) + + elif infeasible_obj is not None: + return infeasible_obj.expand(*obj.shape[:-1], 1) + + else: + if model is None: + raise ValueError( + "Must specify `model` when no feasible observation exists." + ) + if X_baseline is None: + raise ValueError( + "Must specify `X_baseline` when no feasible observation exists." + ) + return _estimate_objective_lower_bound( + model=model, + objective=objective, + posterior_transform=posterior_transform, + X=X_baseline, + ).expand(*obj.shape[:-1], 1) + + +def _estimate_objective_lower_bound( + model: Model, + objective: Optional[MCAcquisitionObjective], + posterior_transform: Optional[PosteriorTransform], + X: Tensor, +) -> Tensor: + """Estimates a lower bound on the objective values by evaluating the model at convex + combinations of `X`, returning the 6-sigma lower bound of the computed statistics. + + Args: + model: A fitted model. + objective: An MCAcquisitionObjective with `m` outputs. + posterior_transform: A PosteriorTransform. + X: A `n x d`-dim Tensor of design points from which to draw convex combinations. + + Returns: + A `m`-dimensional Tensor of lower bounds of the objectives. + """ + convex_weights = torch.rand( + 32, + X.shape[-2], + dtype=X.dtype, + device=X.device, + ) + weights_sum = convex_weights.sum(dim=0, keepdim=True) + convex_weights = convex_weights / weights_sum + # infeasible cost M is such that -M < min_x f(x), thus + # 0 < min_x f(x) - (-M), so we should take -M as a lower + # bound on the best feasible objective + return -get_infeasible_cost( + X=convex_weights @ X, + model=model, + objective=objective, + posterior_transform=posterior_transform, + ) + + def get_infeasible_cost( X: Tensor, model: Model, diff --git a/botorch/utils/objective.py b/botorch/utils/objective.py index 0738812287..5e8db010e5 100644 --- a/botorch/utils/objective.py +++ b/botorch/utils/objective.py @@ -95,14 +95,37 @@ def apply_constraints_nonnegative_soft( return obj.clamp_min(0).mul(w) # Enforce non-negativity of obj, apply constraints. +def compute_feasibility_indicator( + constraints: Optional[List[Callable[[Tensor], Tensor]]], + samples: Tensor, +) -> Tensor: + r"""Computes the feasibility of a list of constraints given posterior samples. + + Args: + constraints: A list of callables, each mapping a batch_shape x q x m`-dim Tensor + to a `batch_shape x q`-dim Tensor, where negative values imply feasibility. + samples: A batch_shape x q x m`-dim Tensor of posterior samples. + + Returns: + A `batch_shape x q`-dim tensor of Boolean feasibility values. + """ + ind = torch.ones(samples.shape[:-1], dtype=torch.bool, device=samples.device) + if constraints is not None: + for constraint in constraints: + ind = ind.logical_and(constraint(samples) < 0) + return ind + + def compute_smoothed_constraint_indicator( constraints: List[Callable[[Tensor], Tensor]], samples: Tensor, eta: Union[Tensor, float], ) -> Tensor: - r"""Computes the feasibility indicator of a list of constraints given posterior - samples, using a sigmoid to smoothly approximate the feasibility indicator - of each individual constraint to ensure differentiability and high gradient signal. + r"""Computes the smoothed feasibility indicator of a list of constraints. + + Given posterior samples, using a sigmoid to smoothly approximate the feasibility + indicator of each individual constraint to ensure differentiability and high + gradient signal. Args: constraints: A list of callables, each mapping a Tensor of size `b x q x m` diff --git a/test/acquisition/test_input_constructors.py b/test/acquisition/test_input_constructors.py index dce2e5e5e7..e2a59d34ad 100644 --- a/test/acquisition/test_input_constructors.py +++ b/test/acquisition/test_input_constructors.py @@ -139,13 +139,13 @@ def test_get_best_f_mc(self): best_f = get_best_f_mc(training_data=self.blockX_multiY, objective=obj) multi_Y = torch.cat([d.Y() for d in self.blockX_multiY.values()], dim=-1) - best_f_expected = (multi_Y @ obj.weights).max() + best_f_expected = (multi_Y @ obj.weights).amax(dim=-1, keepdim=True) self.assertEqual(best_f, best_f_expected) post_tf = ScalarizedPosteriorTransform(weights=torch.ones(2)) best_f = get_best_f_mc( training_data=self.blockX_multiY, posterior_transform=post_tf ) - best_f_expected = (multi_Y.sum(dim=-1)).max() + best_f_expected = (multi_Y.sum(dim=-1)).amax(dim=-1, keepdim=True) self.assertEqual(best_f, best_f_expected) @mock.patch("botorch.acquisition.input_constructors.optimize_acqf") @@ -350,6 +350,9 @@ def test_construct_inputs_qEI(self): self.assertIsNone(kwargs["objective"]) self.assertIsNone(kwargs["X_pending"]) self.assertIsNone(kwargs["sampler"]) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) X_pending = torch.rand(2, 2) objective = LinearMCObjective(torch.rand(2)) kwargs = c( @@ -362,6 +365,9 @@ def test_construct_inputs_qEI(self): self.assertTrue(torch.equal(kwargs["objective"].weights, objective.weights)) self.assertTrue(torch.equal(kwargs["X_pending"], X_pending)) self.assertIsNone(kwargs["sampler"]) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) multi_Y = torch.cat([d.Y() for d in self.blockX_multiY.values()], dim=-1) best_f_expected = objective(multi_Y).max() self.assertEqual(kwargs["best_f"], best_f_expected) @@ -386,6 +392,10 @@ def test_construct_inputs_qNEI(self): self.assertIsNone(kwargs["sampler"]) self.assertTrue(kwargs["prune_baseline"]) self.assertTrue(torch.equal(kwargs["X_baseline"], self.blockX_blockY[0].X())) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) + with self.assertRaisesRegex(ValueError, "Field `X` must be shared"): c(model=mock_model, training_data=self.multiX_multiY) X_baseline = torch.rand(2, 2) @@ -401,6 +411,9 @@ def test_construct_inputs_qNEI(self): self.assertIsNone(kwargs["sampler"]) self.assertFalse(kwargs["prune_baseline"]) self.assertTrue(torch.equal(kwargs["X_baseline"], X_baseline)) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) def test_construct_inputs_qPI(self): c = get_acqf_input_constructor(qProbabilityOfImprovement) @@ -411,6 +424,9 @@ def test_construct_inputs_qPI(self): self.assertIsNone(kwargs["X_pending"]) self.assertIsNone(kwargs["sampler"]) self.assertEqual(kwargs["tau"], 1e-3) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) X_pending = torch.rand(2, 2) objective = LinearMCObjective(torch.rand(2)) kwargs = c( @@ -425,6 +441,9 @@ def test_construct_inputs_qPI(self): self.assertTrue(torch.equal(kwargs["X_pending"], X_pending)) self.assertIsNone(kwargs["sampler"]) self.assertEqual(kwargs["tau"], 1e-2) + self.assertIsNone(kwargs["constraints"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertTrue(kwargs["eta"] < 1) multi_Y = torch.cat([d.Y() for d in self.blockX_multiY.values()], dim=-1) best_f_expected = objective(multi_Y).max() self.assertEqual(kwargs["best_f"], best_f_expected) diff --git a/test/acquisition/test_utils.py b/test/acquisition/test_utils.py index 41b4384e85..4fdcf33487 100644 --- a/test/acquisition/test_utils.py +++ b/test/acquisition/test_utils.py @@ -5,6 +5,7 @@ # LICENSE file in the root directory of this source tree. import itertools +import math from unittest import mock import torch @@ -19,6 +20,7 @@ ScalarizedPosteriorTransform, ) from botorch.acquisition.utils import ( + compute_best_feasible_objective, expand_trace_observations, get_acquisition_function, get_infeasible_cost, @@ -59,12 +61,15 @@ def setUp(self): self.qmc = True self.ref_point = [0.0, 0.0] self.mo_objective = DummyMCMultiOutputObjective() - self.Y = torch.tensor([[1.0, 2.0]]) + self.Y = torch.tensor([[1.0, 2.0]]) # (2 x 1)-dim multi-objective outcomes self.seed = 1 @mock.patch(f"{monte_carlo.__name__}.qExpectedImprovement") def test_GetQEI(self, mock_acqf): - self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2))) + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) acqf = get_acquisition_function( acquisition_function_name="qEI", model=self.model, @@ -84,6 +89,8 @@ def test_GetQEI(self, mock_acqf): objective=self.objective, posterior_transform=None, X_pending=self.X_pending, + constraints=None, + eta=1e-3, ) # test batched model self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) @@ -124,10 +131,50 @@ def test_GetQEI(self, mock_acqf): ) self.assertEqual(mock_acqf.call_args[-1]["best_f"].item(), -1.0) + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + + acqf = get_acquisition_function( + acquisition_function_name="qEI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_feasible_f = compute_best_feasible_objective( + samples=mean, + obj=self.objective(mean), + constraints=constraints, + model=self.model, + objective=self.objective, + X_baseline=self.X_observed, + ) + mock_acqf.assert_called_with( + model=self.model, + best_f=best_feasible_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + constraints=constraints, + eta=eta, + ) + @mock.patch(f"{monte_carlo.__name__}.qProbabilityOfImprovement") def test_GetQPI(self, mock_acqf): # basic test - self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2))) + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) acqf = get_acquisition_function( acquisition_function_name="qPI", model=self.model, @@ -147,6 +194,8 @@ def test_GetQPI(self, mock_acqf): posterior_transform=None, X_pending=self.X_pending, tau=1e-3, + constraints=None, + eta=1e-3, ) args, kwargs = mock_acqf.call_args self.assertEqual(args, ()) @@ -196,9 +245,54 @@ def test_GetQPI(self, mock_acqf): ) self.assertTrue(acqf == mock_acqf.return_value) + # with constraints + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_feasible_f = compute_best_feasible_objective( + samples=mean, + obj=self.objective(mean), + constraints=constraints, + model=self.model, + objective=self.objective, + X_baseline=self.X_observed, + ) + mock_acqf.assert_called_with( + model=self.model, + best_f=best_feasible_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + tau=1e-3, + constraints=constraints, + eta=eta, + ) + @mock.patch(f"{monte_carlo.__name__}.qNoisyExpectedImprovement") def test_GetQNEI(self, mock_acqf): # basic test + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) acqf = get_acquisition_function( acquisition_function_name="qNEI", model=self.model, @@ -256,6 +350,38 @@ def test_GetQNEI(self, mock_acqf): self.assertEqual(sampler.seed, 2) self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + + acqf = get_acquisition_function( + acquisition_function_name="qNEI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_with( + model=self.model, + X_baseline=self.X_observed, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + prune_baseline=True, + marginalize_dim=0, + cache_root=True, + constraints=constraints, + eta=eta, + ) + @mock.patch(f"{monte_carlo.__name__}.qSimpleRegret") def test_GetQSR(self, mock_acqf): # basic test @@ -575,7 +701,83 @@ def test_GetUnknownAcquisitionFunction(self): ) -class TestGetInfeasibleCost(BotorchTestCase): +class TestConstraintUtils(BotorchTestCase): + def test_compute_best_feasible_objective(self): + for dtype in (torch.float, torch.double): + with self.subTest(dtype=dtype): + tkwargs = {"dtype": dtype, "device": self.device} + n = 5 + X = torch.arange(n, **tkwargs).view(-1, 1) + means = torch.arange(n, **tkwargs).view(-1, 1) + samples = means + variances = torch.tensor( + [0.09, 0.25, 0.36, 0.25, 0.09], **tkwargs + ).view(-1, 1) + mm = MockModel( + MockPosterior(mean=means, variance=variances, samples=samples) + ) + + # testing all feasible points + obj = means.squeeze(-1) + constraints = [lambda samples: -torch.ones_like(samples[..., 0])] + best_f = compute_best_feasible_objective( + samples=means, obj=obj, constraints=constraints + ) + self.assertAllClose(best_f, obj.amax(dim=-1, keepdim=True)) + + # testing with some infeasible points + con_cutoff = 3.0 + best_f = compute_best_feasible_objective( + samples=means, + obj=obj, + constraints=[ + lambda samples: samples[..., 0] - (con_cutoff + 1 / 2) + ], + ) + # only first three points are feasible + self.assertAllClose(best_f, torch.tensor([con_cutoff], **tkwargs)) + + # testing with no feasible points and infeasible obj + infeasible_obj = torch.tensor(torch.pi, **tkwargs) + best_f = compute_best_feasible_objective( + samples=means, + obj=obj, + constraints=[lambda X: torch.ones_like(X[..., 0])], + infeasible_obj=infeasible_obj, + ) + self.assertAllClose(best_f, infeasible_obj.unsqueeze(0)) + + # testing with no feasible points and not infeasible obj + def objective(Y, X): + return Y.squeeze(-1) - 5.0 + + best_f = compute_best_feasible_objective( + samples=means, + obj=obj, + constraints=[lambda X: torch.ones_like(X[..., 0])], + model=mm, + X_baseline=X, + objective=objective, + ) + self.assertAllClose( + best_f, -get_infeasible_cost(X=X, model=mm, objective=objective) + ) + + with self.assertRaisesRegex(ValueError, "Must specify `model`"): + best_f = compute_best_feasible_objective( + samples=means, + obj=obj, + constraints=[lambda X: torch.ones_like(X[..., 0])], + X_baseline=X, + ) + with self.assertRaisesRegex(ValueError, "Must specify `X_baseline`"): + best_f = compute_best_feasible_objective( + samples=means, + obj=obj, + constraints=[lambda X: torch.ones_like(X[..., 0])], + model=mm, + ) + def test_get_infeasible_cost(self): for dtype in (torch.float, torch.double): tkwargs = {"dtype": dtype, "device": self.device} diff --git a/test/utils/test_objective.py b/test/utils/test_objective.py index 3975d6682c..ee35bf08e5 100644 --- a/test/utils/test_objective.py +++ b/test/utils/test_objective.py @@ -7,6 +7,10 @@ import torch from botorch.utils import apply_constraints, get_objective_weights_transform +from botorch.utils.objective import ( + compute_feasibility_indicator, + compute_smoothed_constraint_indicator, +) from botorch.utils.testing import BotorchTestCase from torch import Tensor @@ -153,7 +157,7 @@ def test_apply_constraints_multi_output(self): self.assertAllClose(obj, samples.clamp_min(-1.0) * 0.5 - 1.0) # nonnegative objective, one constraint, eta = 0 obj = samples - with self.assertRaises(ValueError): + with self.assertRaisesRegex(ValueError, "eta must be positive"): apply_constraints( obj=obj, constraints=[zeros_f], @@ -168,7 +172,7 @@ def test_apply_constraints_wrong_eta_dim(self): tkwargs["dtype"] = dtype samples = torch.rand(3, 2, **tkwargs) obj = samples.clone() - with self.assertRaises(ValueError): + with self.assertRaisesRegex(ValueError, "Number of provided constraints"): obj = apply_constraints( obj=obj, constraints=[zeros_f, zeros_f], @@ -176,7 +180,7 @@ def test_apply_constraints_wrong_eta_dim(self): infeasible_cost=0.0, eta=torch.tensor([0.1]).to(**tkwargs), ) - with self.assertRaises(ValueError): + with self.assertRaisesRegex(ValueError, "Number of provided constraints"): obj = apply_constraints( obj=obj, constraints=[zeros_f, zeros_f], @@ -185,6 +189,47 @@ def test_apply_constraints_wrong_eta_dim(self): eta=torch.tensor([0.1, 0.1, 0.3]).to(**tkwargs), ) + def test_constraint_indicators(self): + # nonnegative objective, one constraint + samples = torch.randn(1) + ind = compute_feasibility_indicator(constraints=[zeros_f], samples=samples) + self.assertAllClose(ind, torch.zeros_like(ind)) + self.assertEqual(ind.dtype, torch.bool) + + smoothed_ind = compute_smoothed_constraint_indicator( + constraints=[zeros_f], samples=samples, eta=1e-3 + ) + self.assertAllClose(smoothed_ind, ones_f(samples) / 2) + + # two constraints + samples = torch.randn(1) + smoothed_ind = compute_smoothed_constraint_indicator( + constraints=[zeros_f, zeros_f], + samples=samples, + eta=1e-3, + ) + self.assertAllClose(smoothed_ind, ones_f(samples) * 0.5 * 0.5) + + # feasible + samples = torch.randn(1) + ind = compute_feasibility_indicator( + constraints=[minus_one_f], + samples=samples, + ) + self.assertAllClose(ind, torch.ones_like(ind)) + + smoothed_ind = compute_smoothed_constraint_indicator( + constraints=[minus_one_f], samples=samples, eta=1e-3 + ) + self.assertTrue((smoothed_ind > 3 / 4).all()) + + with self.assertRaisesRegex(ValueError, "Number of provided constraints"): + compute_smoothed_constraint_indicator( + constraints=[zeros_f, zeros_f], + samples=samples, + eta=torch.tensor([0.1], device=self.device), + ) + class TestGetObjectiveWeightsTransform(BotorchTestCase): def test_NoWeights(self):