diff --git a/botorch/acquisition/multi_objective/hypervolume_knowledge_gradient.py b/botorch/acquisition/multi_objective/hypervolume_knowledge_gradient.py new file mode 100644 index 0000000000..08d9c9edd0 --- /dev/null +++ b/botorch/acquisition/multi_objective/hypervolume_knowledge_gradient.py @@ -0,0 +1,560 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + + +""" +The hypervolume knowledge gradient acquisition function (HVKG). + +References: + +.. [Daulton2023hvkg] + S. Daulton, M. Balandat, E. Bakshy. Hypervolume Knowledge Gradient: A + Lookahead Approach for Multi-Objective Bayesian Optimization with Partial + Information. Proceedings of the 40th International Conference on Machine + Learning, 2023. +""" + +from copy import deepcopy +from typing import Any, Callable, Dict, List, Optional, Tuple, Type + +import torch +from botorch import settings +from botorch.acquisition.acquisition import ( + AcquisitionFunction, + OneShotAcquisitionFunction, +) + +from botorch.acquisition.cost_aware import CostAwareUtility +from botorch.acquisition.decoupled import DecoupledAcquisitionFunction +from botorch.acquisition.knowledge_gradient import ProjectedAcquisitionFunction +from botorch.acquisition.multi_objective.monte_carlo import ( + MultiObjectiveMCAcquisitionFunction, + qExpectedHypervolumeImprovement, +) +from botorch.acquisition.multi_objective.objective import MCMultiOutputObjective +from botorch.exceptions.errors import UnsupportedError +from botorch.models.deterministic import PosteriorMeanModel +from botorch.models.model import Model +from botorch.sampling.base import MCSampler +from botorch.sampling.list_sampler import ListSampler +from botorch.sampling.normal import SobolQMCNormalSampler +from botorch.sampling.stochastic_samplers import StochasticSampler +from botorch.utils.multi_objective.box_decompositions.non_dominated import ( + FastNondominatedPartitioning, +) +from botorch.utils.transforms import match_batch_shape, t_batch_mode_transform +from torch import Tensor + + +class qHypervolumeKnowledgeGradient( + DecoupledAcquisitionFunction, + MultiObjectiveMCAcquisitionFunction, + OneShotAcquisitionFunction, +): + """Batch Hypervolume Knowledge Gradient using one-shot optimization. + + This computes the batch Hypervolume Knowledge Gradient using fantasies for + the outer expectation and MC-sampling for the inner expectation. + + In addition to the design variables, the input `X` also includes variables + for the optimal designs for each of the fantasy models (Note this is + `N x N_pareto` optimal designs). For a fixed number of fantasies, all points + in `X` can be optimized in a "one-shot" fashion. + """ + + def __init__( + self, + model: Model, + ref_point: Tensor, + num_fantasies: int = 32, + num_pareto: int = 10, + sampler: Optional[ListSampler] = None, + objective: Optional[MCMultiOutputObjective] = None, + inner_sampler: Optional[MCSampler] = None, + X_evaluation_mask: Optional[List[Tensor]] = None, + X_pending: Optional[Tensor] = None, + X_pending_evaluation_mask: Optional[Tensor] = None, + current_value: Optional[Tensor] = None, + use_posterior_mean: bool = True, + cost_aware_utility: Optional[CostAwareUtility] = None, + **kwargs: Any, + ) -> None: + r"""q-Hypervolume Knowledge Gradient. + + Args: + model: A fitted model. Must support fantasizing. + ref_point: A `m`-dim tensor containing the reference point. + num_fantasies: The number of fantasy points to use. More fantasy + points result in a better approximation, at the expense of + memory and wall time. Unused if `sampler` is specified. + num_pareto: The number of pareto optimal designs to consider. + sampler: The sampler used to sample fantasy observations. Optional + if `num_fantasies` is specified. + objective: The objective under which the samples are evaluated. If + `None`, then the analytic posterior mean is used. Otherwise, the + objective is MC-evaluated (using inner_sampler). + inner_sampler: The sampler used for inner sampling. Ignored if the + objective is `None`. + X_evaluation_mask: A `q x m`-dim tensor of booleans indicating which + objective(s) each of the `q` points should be evaluated on. + X_pending: A `n' x d`-dim Tensor of `m` design points that have + points that have been submitted for function evaluation + but have not yet been evaluated. + X_pending_evaluation_mask: A `n' x m`-dim tensor of booleans indicating which + objective(s) each of the `n'` pending points are being evaluated on. + current_value: The current value, i.e. the expected best objective + given the observed points `D`. If omitted, forward will not + return the actual KG value, but the expected best objective + given the data set `D u X`. + use_posterior_mean: A boolean indicating whether to use the to optimize + the hypervolume of the posterior mean or whether to optimize the + expected hypervolume. See [Daulton2023hvkg]_ for details. + cost_aware_utility: A CostAwareUtility specifying the cost function for + evaluating the `X` on the objectives indicated by `evaluation_mask`. + + """ + if sampler is None: + # base samples should be fixed for joint optimization over X, X_fantasies + samplers = [ + SobolQMCNormalSampler( + sample_shape=torch.Size([num_fantasies]), + resample=False, + collapse_batch_dims=True, + ) + for _ in range(ref_point.shape[0]) + ] + sampler = ListSampler(*samplers) + else: + sample_shape = sampler.samplers[0].sample_shape + if sample_shape != torch.Size([num_fantasies]): + raise ValueError( + f"The sampler shape must match num_fantasies={num_fantasies}." + ) + super().__init__(model=model, X_evaluation_mask=X_evaluation_mask) + + if inner_sampler is None: + inner_sampler = SobolQMCNormalSampler( + sample_shape=torch.Size([32]), resample=False, collapse_batch_dims=True + ) + if current_value is None and cost_aware_utility is not None: + raise UnsupportedError( + "Cost-aware HVKG requires current_value to be specified." + ) + self.register_buffer("ref_point", ref_point) + self.sampler = sampler + self.objective = objective + self.set_X_pending( + X_pending=X_pending, X_pending_evaluation_mask=X_pending_evaluation_mask + ) + self.inner_sampler = inner_sampler + self.num_fantasies = num_fantasies + self.num_pareto = num_pareto + self.num_pseudo_points = num_fantasies * num_pareto + self.current_value = current_value + self.use_posterior_mean = use_posterior_mean + self.cost_aware_utility = cost_aware_utility + self._cost_sampler = None + + @property + def cost_sampler(self): + if self._cost_sampler is None: + # Note: Using the deepcopy here is essential. Removing this poses a + # problem if the base model and the cost model have a different number + # of outputs or test points (this would be caused by expand), as this + # would trigger re-sampling the base samples in the fantasy sampler. + # By cloning the sampler here, the right thing will happen if the + # the sizes are compatible, if they are not this will result in + # samples being drawn using different base samples, but it will at + # least avoid changing state of the fantasy sampler. + self._cost_sampler = deepcopy(self.sampler) + return self._cost_sampler + + @t_batch_mode_transform() + def forward(self, X: Tensor) -> Tensor: + r"""Evaluate qKnowledgeGradient on the candidate set `X`. + + Args: + X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of + `q + num_fantasies` design points each. We split this X tensor + into two parts in the `q` dimension (`dim=-2`). The first `q` + are the q-batch of design points and the last num_fantasies are + the current solutions of the inner optimization problem. + + `X_fantasies = X[..., -num_fantasies:, :]` + `X_fantasies.shape = b x num_fantasies x d` + + `X_actual = X[..., :-num_fantasies, :]` + `X_actual.shape = b x q x d` + + Returns: + A Tensor of shape `b`. For t-batch b, the q-KG value of the design + `X_actual[b]` is averaged across the fantasy models, where + `X_fantasies[b, i]` is chosen as the final selection for the + `i`-th fantasy model. + NOTE: If `current_value` is not provided, then this is not the + true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be + maximized at fixed `X_actual[b]`. + """ + X_actual, X_fantasies = _split_hvkg_fantasy_points( + X=X, n_f=self.num_fantasies, num_pareto=self.num_pareto + ) + + # construct evaluation_mask + evaluation_mask = self.construct_evaluation_mask(X=X_actual) + # We only concatenate X_pending into the X part after splitting + if self.X_pending is not None: + X_actual = torch.cat( + [X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2 + ) + + # construct the fantasy model of shape `num_fantasies x b` + # note: for decoupled, cost-aware (e.g. not async), we technically + # want to make sure to copy the base samples here, so that the same fantasies are used + # for X_pending on the left and right of the KG terms + fantasy_model = self.model.fantasize( + X=X_actual, + sampler=self.sampler, + observation_noise=True, + evaluation_mask=evaluation_mask, + ) + + # get the value function + value_function = _get_hv_value_function( + model=fantasy_model, + ref_point=self.ref_point, + objective=self.objective, + sampler=self.inner_sampler, + use_posterior_mean=self.use_posterior_mean, + num_pareto=self.num_pareto, + dim=X_actual.shape[-1], + ) + + # make sure to propagate gradients to the fantasy model train inputs + with settings.propagate_grads(True): + # X_fantasies is num_pseudo_points x batch_shape x 1 x d + # Reshape it into num_fantasies x batch_shape x num_pareto x d + shape = torch.Size( + [ + self.num_fantasies, + *X_fantasies.shape[1:-2], + self.num_pareto, + X_fantasies.shape[-1], + ] + ) + values = value_function(X=X_fantasies.reshape(shape)) # num_fantasies x b + + if self.current_value is not None: + values = values - self.current_value + + if self.cost_aware_utility is not None: + values = self.cost_aware_utility( + X=X_actual, + # cost-weighting relies on nonnegative deltas + deltas=values.clamp_min(0.0), + sampler=self.cost_sampler, + X_evaluation_mask=self.X_evaluation_mask, + ) + + # return average over the fantasy samples + return values.mean(dim=0) + + def get_augmented_q_batch_size(self, q: int) -> int: + r"""Get augmented q batch size for one-shot optimization. + + Args: + q: The number of candidates to consider jointly. + + Returns: + The augmented size for one-shot optimization (including variables + parameterizing the fantasy solutions). + """ + return q + self.num_pseudo_points + + def extract_candidates(self, X_full: Tensor) -> Tensor: + r"""We only return X as the set of candidates post-optimization. + + Args: + X_full: A `b x (q + num_fantasies) x d`-dim Tensor with `b` + t-batches of `q + num_fantasies` design points each. + + Returns: + A `b x q x d`-dim Tensor with `b` t-batches of `q` design points each. + """ + return X_full[..., : -self.num_pseudo_points, :] + + +class qMultiFidelityHypervolumeKnowledgeGradient(qHypervolumeKnowledgeGradient): + r"""Batch Hypervolume Knowledge Gradient for multi-fidelity optimization. + + See [Daulton2023hvkg]_ for details. + + A version of `qHypervolumeKnowledgeGradient` that supports multi-fidelity optimization + via a `CostAwareUtility` and the `project` and `expand` operators. If none + of these are set, this acquisition function reduces to `qHypervolumeKnowledgeGradient`. + Through `valfunc_cls` and `valfunc_argfac`, this can be changed into a custom + multi-fidelity acquisition function. + """ + + def __init__( + self, + model: Model, + ref_point: Tensor, + target_fidelities: Dict[int, float], + num_fantasies: int = 32, + num_pareto: int = 10, + sampler: Optional[MCSampler] = None, + objective: Optional[MCMultiOutputObjective] = None, + inner_sampler: Optional[MCSampler] = None, + X_pending: Optional[Tensor] = None, + X_evaluation_mask: Optional[Tensor] = None, + X_pending_evaluation_mask: Optional[Tensor] = None, + current_value: Optional[Tensor] = None, + cost_aware_utility: Optional[CostAwareUtility] = None, + project: Callable[[Tensor], Tensor] = lambda X: X, + expand: Optional[Callable[[Tensor], Tensor]] = None, + valfunc_cls: Optional[Type[AcquisitionFunction]] = None, + valfunc_argfac: Optional[Callable[[Model], Dict[str, Any]]] = None, + use_posterior_mean: bool = True, + **kwargs: Any, + ) -> None: + r"""Multi-Fidelity q-Knowledge Gradient (one-shot optimization). + + Args: + model: A fitted model. Must support fantasizing. + ref_point: A `m`-dim tensor containing the reference point. + num_fantasies: The number of fantasy points to use. More fantasy + points result in a better approximation, at the expense of + memory and wall time. Unused if `sampler` is specified. + num_pareto: The number of pareto optimal designs to consider. + sampler: The sampler used to sample fantasy observations. Optional + if `num_fantasies` is specified. + objective: The objective under which the samples are evaluated. If + `None`, then the analytic posterior mean is used. Otherwise, the + objective is MC-evaluated (using inner_sampler). + inner_sampler: The sampler used for inner sampling. Ignored if the + objective is `None`. + X_evaluation_mask: A `q x m`-dim tensor of booleans indicating which + objective(s) each of the `q` points should be evaluated on. + X_pending: A `n' x d`-dim Tensor of `m` design points that have + points that have been submitted for function evaluation + but have not yet been evaluated. + X_pending_evaluation_mask: A `n' x m`-dim tensor of booleans indicating which + objective(s) each of the `n'` pending points are being evaluated on. + current_value: The current value, i.e. the expected best objective + given the observed points `D`. If omitted, forward will not + return the actual KG value, but the expected best objective + given the data set `D u X`. + use_posterior_mean: A boolean indicating whether to use the to optimize + the hypervolume of the posterior mean or whether to optimize the + expected hypervolume. See [Daulton2023hvkg]_ for details. + cost_aware_utility: A CostAwareUtility specifying the cost function for + evaluating the `X` on the objectives indicated by `evaluation_mask`. + project: A callable mapping a `batch_shape x q x d` tensor of design + points to a tensor with shape `batch_shape x q_term x d` projected + to the desired target set (e.g. the target fidelities in case of + multi-fidelity optimization). For the basic case, `q_term = q`. + expand: A callable mapping a `batch_shape x q x d` input tensor to + a `batch_shape x (q + q_e)' x d`-dim output tensor, where the + `q_e` additional points in each q-batch correspond to + additional ("trace") observations. + valfunc_cls: An acquisition function class to be used as the terminal + value function. + valfunc_argfac: An argument factory, i.e. callable that maps a `Model` + to a dictionary of kwargs for the terminal value function (e.g. + `best_f` for `ExpectedImprovement`). + """ + + super().__init__( + model=model, + ref_point=ref_point, + num_fantasies=num_fantasies, + num_pareto=num_pareto, + sampler=sampler, + objective=objective, + inner_sampler=inner_sampler, + X_evaluation_mask=X_evaluation_mask, + X_pending=X_pending, + X_pending_evaluation_mask=X_pending_evaluation_mask, + current_value=current_value, + use_posterior_mean=use_posterior_mean, + cost_aware_utility=cost_aware_utility, + ) + self.project = project + if expand is not None: + raise NotImplementedError("Trace observations are not currently supported.") + self.expand = lambda X: X + self.valfunc_cls = valfunc_cls + self.valfunc_argfac = valfunc_argfac + self.target_fidelities = target_fidelities + + @t_batch_mode_transform() + def forward(self, X: Tensor) -> Tensor: + r"""Evaluate qMultiFidelityKnowledgeGradient on the candidate set `X`. + + Args: + X: A `b x (q + num_fantasies) x d` Tensor with `b` t-batches of + `q + num_fantasies` design points each. We split this X tensor + into two parts in the `q` dimension (`dim=-2`). The first `q` + are the q-batch of design points and the last num_fantasies are + the current solutions of the inner optimization problem. + + `X_fantasies = X[..., -num_fantasies:, :]` + `X_fantasies.shape = b x num_fantasies x d` + + `X_actual = X[..., :-num_fantasies, :]` + `X_actual.shape = b x q x d` + + In addition, `X` may be augmented with fidelity parameteres as + part of thee `d`-dimension. Projecting fidelities to the target + fidelity is handled by `project`. + + Returns: + A Tensor of shape `b`. For t-batch b, the q-KG value of the design + `X_actual[b]` is averaged across the fantasy models, where + `X_fantasies[b, i]` is chosen as the final selection for the + `i`-th fantasy model. + NOTE: If `current_value` is not provided, then this is not the + true KG value of `X_actual[b]`, and `X_fantasies[b, : ]` must be + maximized at fixed `X_actual[b]`. + """ + X_actual, X_fantasies = _split_hvkg_fantasy_points( + X=X, n_f=self.num_fantasies, num_pareto=self.num_pareto + ) + + # construct evaluation_mask + evaluation_mask = self.construct_evaluation_mask(X=X_actual) + + # We only concatenate X_pending into the X part after splitting + if self.X_pending is not None: + X_actual = torch.cat( + [X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2 + ) + + # construct the fantasy model of shape `num_fantasies x b` + fantasy_model = self.model.fantasize( + X=X_actual, + sampler=self.sampler, + observation_noise=True, + evaluation_mask=evaluation_mask, + ) + # get the value function + value_function = _get_hv_value_function( + model=fantasy_model, + ref_point=self.ref_point, + objective=self.objective, + sampler=self.inner_sampler, + project=self.project, + valfunc_cls=self.valfunc_cls, + valfunc_argfac=self.valfunc_argfac, + use_posterior_mean=self.use_posterior_mean, + num_pareto=self.num_pareto, + dim=X_actual.shape[-1], + ) + + # make sure to propagate gradients to the fantasy model train inputs + with settings.propagate_grads(True): + # X_fantasies is num_pseudo_points x batch_shape x 1 x d + # Reshape it into num_fantasies x batch_shape x num_pareto x d + shape = torch.Size( + [ + self.num_fantasies, + *X_fantasies.shape[1:-2], + self.num_pareto, + X_fantasies.shape[-1], + ] + ) + values = value_function(X=X_fantasies.reshape(shape)) # num_fantasies x b + if self.current_value is not None: + values = values - self.current_value + + if self.cost_aware_utility is not None: + values = self.cost_aware_utility( + X=X_actual, + # cost-weighting relies on nonnegative deltas + deltas=values.clamp_min(0.0), + sampler=self.cost_sampler, + X_evaluation_mask=self.X_evaluation_mask, + ) + + # return average over the fantasy samples + return values.mean(dim=0) + + +def _get_hv_value_function( + model: Model, + ref_point: Tensor, + dim: int, + num_pareto: int, + objective: Optional[MCMultiOutputObjective] = None, + sampler: Optional[MCSampler] = None, + project: Optional[Callable[[Tensor], Tensor]] = None, + valfunc_cls: Optional[Type[AcquisitionFunction]] = None, + valfunc_argfac: Optional[Callable[[Model], Dict[str, Any]]] = None, + use_posterior_mean: bool = False, + hv_weights: Optional[Tensor] = None, +) -> AcquisitionFunction: + r"""Construct value function (i.e. inner acquisition function). + This is a method for computing hypervolume. + """ + if use_posterior_mean: + model = PosteriorMeanModel(model=model) + sampler = StochasticSampler(sample_shape=torch.Size([1])) # dummy sampler + base_value_function = qExpectedHypervolumeImprovement( + model=model, + ref_point=ref_point, + partitioning=FastNondominatedPartitioning( + ref_point=ref_point, + Y=torch.empty( + (0, ref_point.shape[0]), + dtype=ref_point.dtype, + device=ref_point.device, + ), + ), # create empty partitioning + sampler=sampler, + objective=objective, + ) + # ProjectedAcquisitionFunction requires this + base_value_function.posterior_transform = None + + if project is None: + return base_value_function + else: + return ProjectedAcquisitionFunction( + base_value_function=base_value_function, + project=project, + ) + + +def _split_hvkg_fantasy_points( + X: Tensor, n_f: int, num_pareto: int +) -> Tuple[Tensor, Tensor]: + r"""Split a one-shot HV-KGoptimization input into actual and fantasy points + + Args: + X: A `batch_shape x (q + n_f*num_pareto) x d`-dim tensor of actual and fantasy + points + + Returns: + 2-element tuple containing + + - A `batch_shape x q x d`-dim tensor `X_actual` of input candidates. + - A `n_f x batch_shape x num_pareto x d`-dim tensor `X_fantasies` of fantasy + points, where `X_fantasies[i, batch_idx]` is the i-th fantasy point + associated with the batch indexed by `batch_idx`. + """ + if n_f * num_pareto > X.size(-2): + raise ValueError( + f"n_f*num_pareto ({n_f*num_pareto}) must be less than the q-batch dimension of X ({X.size(-2)})" + ) + split_sizes = [X.size(-2) - n_f * num_pareto, n_f * num_pareto] + X_actual, X_fantasies = torch.split(X, split_sizes, dim=-2) + # X_fantasies is b x n_f * num_pareto x d, needs to be n_f x b x num_pareto x d + # reshape into num_fantasies x b x num_pareto x d + new_shape = torch.Size( + [n_f, *X_fantasies.shape[:-2], num_pareto, X_fantasies.shape[-1]] + ) + X_fantasies = X_fantasies.reshape(new_shape) + # n_f x b x num_pareto x d + return X_actual, X_fantasies diff --git a/sphinx/source/acquisition.rst b/sphinx/source/acquisition.rst index c5e8cd5a5c..8c757c1766 100644 --- a/sphinx/source/acquisition.rst +++ b/sphinx/source/acquisition.rst @@ -74,6 +74,11 @@ Multi-Objective Analytic Acquisition Functions :members: :exclude-members: MultiObjectiveAnalyticAcquisitionFunction +Multi-Objective Hypervolume Knowledge Gradient Acquisition Functions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. automodule:: botorch.acquisition.multi_objective.hypervolume_knowledge_gradient + :members: + Multi-Objective Joint Entropy Search Acquisition Functions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. automodule:: botorch.acquisition.multi_objective.joint_entropy_search diff --git a/test/acquisition/multi_objective/test_hypervolume_knowledge_gradient.py b/test/acquisition/multi_objective/test_hypervolume_knowledge_gradient.py new file mode 100644 index 0000000000..20cd940382 --- /dev/null +++ b/test/acquisition/multi_objective/test_hypervolume_knowledge_gradient.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from itertools import product +from unittest import mock + +import torch +from botorch.acquisition.cost_aware import InverseCostWeightedUtility +from botorch.acquisition.multi_objective.hypervolume_knowledge_gradient import ( + _get_hv_value_function, + qHypervolumeKnowledgeGradient, + qMultiFidelityHypervolumeKnowledgeGradient, +) +from botorch.acquisition.multi_objective.objective import GenericMCMultiOutputObjective +from botorch.exceptions.errors import UnsupportedError +from botorch.models.deterministic import GenericDeterministicModel +from botorch.models.gp_regression import SingleTaskGP +from botorch.models.model_list_gp_regression import ModelListGP +from botorch.sampling.list_sampler import ListSampler +from botorch.sampling.normal import SobolQMCNormalSampler +from botorch.utils.multi_objective.box_decompositions.dominated import ( + DominatedPartitioning, +) +from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior + + +NO = "botorch.models.model_list_gp_regression.ModelListGP.num_outputs" + + +class TestHypervolumeKnowledgeGradient(BotorchTestCase): + def test_initialization(self): + tkwargs = {"device": self.device} + for dtype, acqf_class in product( + (torch.float, torch.double), + (qHypervolumeKnowledgeGradient, qMultiFidelityHypervolumeKnowledgeGradient), + ): + tkwargs["dtype"] = dtype + X = torch.rand(4, 3, **tkwargs) + Y1 = torch.rand(4, 1, **tkwargs) + Y2 = torch.rand(4, 1, **tkwargs) + m1 = SingleTaskGP(X, Y1) + m2 = SingleTaskGP(X, Y2) + model = ModelListGP(m1, m2) + ref_point = torch.zeros(2, **tkwargs) + # test sampler is None + if acqf_class == qMultiFidelityHypervolumeKnowledgeGradient: + mf_kwargs = {"target_fidelities": {-1: 1.0}} + else: + mf_kwargs = {} + acqf = acqf_class(model=model, ref_point=ref_point, **mf_kwargs) + + self.assertIsInstance(acqf.sampler, ListSampler) + self.assertEqual(acqf.sampler.samplers[0].sample_shape, torch.Size([32])) + # test ref point + self.assertTrue(torch.equal(acqf.ref_point, ref_point)) + # test sampler is not None + sampler = ListSampler( + SobolQMCNormalSampler(sample_shape=torch.Size([4])), + SobolQMCNormalSampler(sample_shape=torch.Size([4])), + ) + with self.assertRaisesRegex( + ValueError, "The sampler shape must match num_fantasies=32." + ): + acqf_class( + model=model, ref_point=ref_point, sampler=sampler, **mf_kwargs + ) + acqf = acqf_class( + model=model, + ref_point=ref_point, + num_fantasies=4, + num_pareto=8, + sampler=sampler, + use_posterior_mean=False, + **mf_kwargs + ) + self.assertEqual(acqf.num_fantasies, 4) + self.assertEqual(acqf.num_pareto, 8) + self.assertEqual(acqf.num_pseudo_points, 32) + self.assertFalse(acqf.use_posterior_mean) + self.assertIsInstance(acqf.inner_sampler, SobolQMCNormalSampler) + self.assertEqual(acqf.inner_sampler.sample_shape, torch.Size([32])) + self.assertIsNone(acqf._cost_sampler) + # test objective + mc_objective = GenericMCMultiOutputObjective(lambda Y: 2 * Y) + acqf = acqf_class( + model=model, ref_point=ref_point, objective=mc_objective, **mf_kwargs + ) + self.assertIs(acqf.objective, mc_objective) + # test X_pending + X_pending = torch.rand(2, 3, **tkwargs) + acqf = acqf_class( + model=model, ref_point=ref_point, X_pending=X_pending, **mf_kwargs + ) + self.assertTrue(torch.equal(acqf.X_pending, X_pending)) + # test X_pending_evaluation_mask + X_pending_evaluation_mask = torch.eye(2, device=self.device).bool() + acqf = acqf_class( + model=model, + ref_point=ref_point, + X_pending=X_pending, + X_pending_evaluation_mask=X_pending_evaluation_mask, + **mf_kwargs + ) + self.assertTrue( + torch.equal(acqf.X_pending_evaluation_mask, X_pending_evaluation_mask) + ) + # test cost aware utility + cost_model = GenericDeterministicModel( + lambda X: torch.ones(X.shape[:-1], 2, **tkwargs) + ) + cost_aware_utility = InverseCostWeightedUtility(cost_model=cost_model) + with self.assertRaisesRegex( + UnsupportedError, + "Cost-aware HVKG requires current_value to be specified.", + ): + acqf_class( + model=model, + ref_point=ref_point, + cost_aware_utility=cost_aware_utility, + **mf_kwargs + ) + acqf = acqf_class( + model=model, + ref_point=ref_point, + cost_aware_utility=cost_aware_utility, + current_value=0.0, + **mf_kwargs + ) + self.assertEqual(acqf.current_value, 0.0) + self.assertIs(acqf.cost_aware_utility, cost_aware_utility) + + if acqf_class is qMultiFidelityHypervolumeKnowledgeGradient: + # test default + x = torch.rand(5, 3, **tkwargs) + self.assertTrue(torch.equal(acqf.project(x), x)) + # test expand raises exception + with self.assertRaisesRegex( + NotImplementedError, + "Trace observations are not currently supported.", + ): + acqf_class( + model=model, + ref_point=ref_point, + expand=lambda X: X, + **mf_kwargs + ) + + def test_evaluate_q_hvkg(self): + # Stop gap measure to avoid test failures on Ampere devices + # TODO: Find an elegant way of disallowing tf32 for botorch/gpytorch + # without blanket-disallowing it for all of torch. + torch.backends.cuda.matmul.allow_tf32 = False + tkwargs = {"device": self.device} + num_pareto = 3 + for dtype, acqf_class in product( + (torch.float, torch.double), + (qHypervolumeKnowledgeGradient, qMultiFidelityHypervolumeKnowledgeGradient), + ): + tkwargs["dtype"] = dtype + # basic test + n_f = 4 + mean = torch.rand(n_f, num_pareto, 2, **tkwargs) + variance = torch.rand(n_f, num_pareto, 2, **tkwargs) + mfm = MockModel(MockPosterior(mean=mean, variance=variance)) + ref_point = torch.zeros(2, **tkwargs) + models = [ + SingleTaskGP(torch.rand(2, 1, **tkwargs), torch.rand(2, 1, **tkwargs)), + SingleTaskGP(torch.rand(4, 1, **tkwargs), torch.rand(4, 1, **tkwargs)), + ] + model = ModelListGP(*models) + if acqf_class == qMultiFidelityHypervolumeKnowledgeGradient: + mf_kwargs = {"target_fidelities": {-1: 1.0}} + else: + mf_kwargs = {} + + with mock.patch.object( + ModelListGP, "fantasize", return_value=mfm + ) as patch_f: + with mock.patch(NO, new_callable=mock.PropertyMock) as mock_num_outputs: + mock_num_outputs.return_value = 2 + + qHVKG = acqf_class( + model=model, + num_fantasies=n_f, + ref_point=ref_point, + num_pareto=num_pareto, + **mf_kwargs + ) + X = torch.rand(n_f * num_pareto + 1, 1, **tkwargs) + val = qHVKG(X) + patch_f.assert_called_once() + cargs, ckwargs = patch_f.call_args + self.assertEqual(ckwargs["X"].shape, torch.Size([1, 1, 1])) + expected_hv = ( + DominatedPartitioning(Y=mean, ref_point=ref_point) + .compute_hypervolume() + .mean() + ) + self.assertAllClose(val, expected_hv, atol=1e-4) + self.assertTrue( + torch.equal(qHVKG.extract_candidates(X), X[..., : -n_f * num_pareto, :]) + ) + # batched evaluation + b = 2 + mean = torch.rand(n_f, b, num_pareto, 2, **tkwargs) + variance = torch.rand(n_f, b, num_pareto, 2, **tkwargs) + mfm = MockModel(MockPosterior(mean=mean, variance=variance)) + X = torch.rand(b, n_f * num_pareto + 1, 1, **tkwargs) + with mock.patch.object( + ModelListGP, "fantasize", return_value=mfm + ) as patch_f: + with mock.patch(NO, new_callable=mock.PropertyMock) as mock_num_outputs: + mock_num_outputs.return_value = 2 + qHVKG = acqf_class( + model=model, + num_fantasies=n_f, + ref_point=ref_point, + num_pareto=num_pareto, + **mf_kwargs + ) + val = qHVKG(X) + patch_f.assert_called_once() + cargs, ckwargs = patch_f.call_args + self.assertEqual(ckwargs["X"].shape, torch.Size([b, 1, 1])) + expected_hv = ( + DominatedPartitioning( + Y=mean.view(-1, num_pareto, 2), ref_point=ref_point + ) + .compute_hypervolume() + .view(n_f, b) + .mean(dim=0) + ) + self.assertAllClose(val, expected_hv, atol=1e-4) + self.assertTrue( + torch.equal(qHVKG.extract_candidates(X), X[..., : -n_f * num_pareto, :]) + ) + # pending points and current value + X_pending = torch.rand(2, 1, **tkwargs) + X_pending_evaluation_mask = torch.eye(2, device=self.device).bool() + X_evaluation_mask = torch.tensor( + [[False, True]], dtype=torch.bool, device=self.device + ) + mean = torch.rand(n_f, num_pareto, 2, **tkwargs) + variance = torch.rand(n_f, num_pareto, 2, **tkwargs) + mfm = MockModel(MockPosterior(mean=mean, variance=variance)) + current_value = torch.rand(1, **tkwargs) + X = torch.rand(n_f * num_pareto + 1, 1, **tkwargs) + with mock.patch.object( + ModelListGP, "fantasize", return_value=mfm + ) as patch_f: + with mock.patch(NO, new_callable=mock.PropertyMock) as mock_num_outputs: + mock_num_outputs.return_value = 2 + qHVKG = acqf_class( + model=model, + num_fantasies=n_f, + X_pending=X_pending, + X_pending_evaluation_mask=X_pending_evaluation_mask, + X_evaluation_mask=X_evaluation_mask, + current_value=current_value, + ref_point=ref_point, + num_pareto=num_pareto, + **mf_kwargs + ) + val = qHVKG(X) + patch_f.assert_called_once() + expected_eval_mask = torch.cat( + [X_evaluation_mask, X_pending_evaluation_mask], dim=0 + ) + cargs, ckwargs = patch_f.call_args + print(ckwargs) + self.assertEqual(ckwargs["X"].shape, torch.Size([1, 3, 1])) + self.assertTrue( + torch.equal(ckwargs["evaluation_mask"], expected_eval_mask) + ) + expected_hv = ( + DominatedPartitioning(Y=mean, ref_point=ref_point) + .compute_hypervolume() + .mean(dim=0) + ) + + expected = (expected_hv.mean() - current_value).reshape([]) + self.assertAllClose(val, expected, atol=1e-4) + self.assertTrue( + torch.equal(qHVKG.extract_candidates(X), X[..., : -n_f * num_pareto, :]) + ) + # test objective (inner MC sampling) + objective = GenericMCMultiOutputObjective(lambda Y, X: 2 * Y) + samples = torch.randn(n_f, 1, num_pareto, 2, **tkwargs) + mfm = MockModel(MockPosterior(samples=samples)) + X = torch.rand(n_f * num_pareto + 1, 1, **tkwargs) + with mock.patch.object( + ModelListGP, "fantasize", return_value=mfm + ) as patch_f: + with mock.patch(NO, new_callable=mock.PropertyMock) as mock_num_outputs: + mock_num_outputs.return_value = 2 + qHVKG = acqf_class( + model=model, + num_fantasies=n_f, + objective=objective, + ref_point=ref_point, + num_pareto=num_pareto, + use_posterior_mean=False, + **mf_kwargs + ) + val = qHVKG(X) + patch_f.assert_called_once() + cargs, ckwargs = patch_f.call_args + self.assertEqual(ckwargs["X"].shape, torch.Size([1, 1, 1])) + expected_hv = ( + DominatedPartitioning( + Y=objective(samples).view(-1, num_pareto, 2), ref_point=ref_point + ) + .compute_hypervolume() + .view(n_f, 1) + .mean(dim=0) + ) + self.assertAllClose(val, expected_hv, atol=1e-4) + self.assertTrue( + torch.equal(qHVKG.extract_candidates(X), X[..., : -n_f * num_pareto, :]) + ) + + # test mfkg + if acqf_class == qMultiFidelityHypervolumeKnowledgeGradient: + mean = torch.rand(n_f, num_pareto, 2, **tkwargs) + variance = torch.rand(n_f, num_pareto, 2, **tkwargs) + mfm = MockModel(MockPosterior(mean=mean, variance=variance)) + current_value = torch.rand(1, **tkwargs) + X = torch.rand(n_f * num_pareto + 1, 1, **tkwargs) + with mock.patch( + "botorch.acquisition.multi_objective.hypervolume_knowledge_gradient._get_hv_value_function", + wraps=_get_hv_value_function, + ) as mock_get_value_func: + with mock.patch.object( + ModelListGP, "fantasize", return_value=mfm + ) as patch_f: + with mock.patch( + NO, new_callable=mock.PropertyMock + ) as mock_num_outputs: + mock_num_outputs.return_value = 2 + qHVKG = acqf_class( + model=model, + num_fantasies=n_f, + current_value=current_value, + ref_point=ref_point, + num_pareto=num_pareto, + **mf_kwargs + ) + val = qHVKG(X) + self.assertIsNotNone( + mock_get_value_func.call_args_list[0][1]["project"] + )