Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SacessOptimizer: expose more hyperparameters #1459

Merged
merged 8 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pypesto/optimize/ess/refset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def initialize_random(
self,
n_diverse: int,
):
"""Create initial reference set from random parameters.
"""Create an initial reference set from random parameters.

Sample ``n_diverse`` random points, populate half of the RefSet using
the best solutions and fill the rest with random points.
Expand All @@ -90,7 +90,7 @@ def initialize_random(
self.initialize_from_array(x_diverse=x_diverse, fx_diverse=fx_diverse)

def initialize_from_array(self, x_diverse: np.array, fx_diverse: np.array):
"""Create initial reference set using the provided points.
"""Create an initial reference set using the provided points.

Populate half of the RefSet using the best given solutions and fill the
rest with a random selection from the remaining points.
Expand Down Expand Up @@ -174,7 +174,8 @@ def resize(self, new_dim: int):
If the dimension does not change, do nothing.
If size is decreased, drop entries from the end (i.e., the worst
values, assuming it is sorted). If size is increased, the new
entries are filled with randomly and the refset is sorted.
entries are filled with randomly sampled parameters and the refset is
sorted.

NOTE: Any attributes are just truncated or filled with zeros.
"""
Expand Down
164 changes: 120 additions & 44 deletions pypesto/optimize/ess/sacess.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Self-adaptive cooperative enhanced scatter search (SACESS)."""
from __future__ import annotations

import itertools
import logging
Expand All @@ -11,7 +12,7 @@
from multiprocessing import get_context
from multiprocessing.managers import SyncManager
from pathlib import Path
from typing import Any, Callable, Optional, Union
from typing import Any, Callable
from uuid import uuid1
from warnings import warn

Expand Down Expand Up @@ -62,13 +63,14 @@ class SacessOptimizer:

def __init__(
self,
num_workers: Optional[int] = None,
ess_init_args: Optional[list[dict[str, Any]]] = None,
num_workers: int | None = None,
ess_init_args: list[dict[str, Any]] | None = None,
max_walltime_s: float = np.inf,
sacess_loglevel: int = logging.INFO,
ess_loglevel: int = logging.WARNING,
tmpdir: Union[Path, str] = None,
tmpdir: Path | str = None,
mp_start_method: str = "spawn",
options: SacessOptions = None,
):
"""Construct.

Expand Down Expand Up @@ -110,6 +112,8 @@ def __init__(
mp_start_method:
The start method for the multiprocessing context.
See :mod:`multiprocessing` for details.
options:
Further optimizer hyperparameters.
"""
if (num_workers is None and ess_init_args is None) or (
num_workers is not None and ess_init_args is not None
Expand Down Expand Up @@ -138,10 +142,11 @@ def __init__(
self._tmpdir = Path(f"SacessOptimizerTemp-{str(uuid1())[:8]}")
self._tmpdir = Path(self._tmpdir).absolute()
self._tmpdir.mkdir(parents=True, exist_ok=True)
self.histories: Optional[
list["pypesto.history.memory.MemoryHistory"]
] = None
self.histories: list[
pypesto.history.memory.MemoryHistory
] | None = None
self.mp_ctx = get_context(mp_start_method)
self.options = options or SacessOptions()

def minimize(
self,
Expand Down Expand Up @@ -212,6 +217,7 @@ def minimize(
shmem_manager=shmem_manager,
ess_options=ess_init_args,
dim=problem.dim,
options=self.options,
)
# create workers
workers = [
Expand All @@ -225,6 +231,7 @@ def minimize(
tmp_result_file=SacessWorker.get_temp_result_filename(
worker_idx, self._tmpdir
),
options=self.options,
)
for worker_idx, ess_kwargs in enumerate(ess_init_args)
]
Expand Down Expand Up @@ -344,23 +351,23 @@ class SacessManager:
more promising the respective worker is considered)
_worker_comms: Number of communications received from the individual
workers
_rejections: Number of rejected solutions received from workers since last
adaptation of ``_rejection_threshold``.
_rejections: Number of rejected solutions received from workers since the
last adaptation of ``_rejection_threshold``.
_rejection_threshold: Threshold for relative objective improvements that
incoming solutions have to pass to be accepted
_rejection_threshold_min: ``_rejection_threshold`` will be reduced (halved)
if too few solutions are accepted. This value is the lower limit for
``_rejection_threshold``.
_lock: Lock for accessing shared state.
_logger: A logger instance
_options: Further optimizer hyperparameters.
"""

def __init__(
self,
shmem_manager: SyncManager,
ess_options: list[dict[str, Any]],
dim: int,
options: SacessOptions = None,
):
self._options = options or SacessOptions()
self._num_workers = len(ess_options)
self._ess_options = [shmem_manager.dict(o) for o in ess_options]
self._best_known_fx = shmem_manager.Value("d", np.inf)
Expand All @@ -370,8 +377,9 @@ def __init__(
# [PenasGon2017]_ p.9 is 0.1.
# However, their implementation uses 0.1 *percent*. I assume this is a
# mistake in the paper.
self._rejection_threshold = shmem_manager.Value("d", 0.001)
self._rejection_threshold_min = 0.001
self._rejection_threshold = shmem_manager.Value(
"d", self._options.manager_initial_rejection_threshold
)

# scores of the workers, ordered by worker-index
# initial score is the worker index
Expand Down Expand Up @@ -433,7 +441,7 @@ def submit_solution(
np.isfinite(fx)
and not np.isfinite(self._best_known_fx.value)
)
# avoid division by 0. just accept any improvement if best
# avoid division by 0. just accept any improvement if the best
# known value is 0.
or (self._best_known_fx.value == 0 and fx < 0)
or (
Expand Down Expand Up @@ -475,12 +483,12 @@ def submit_solution(
f"(threshold: {self._rejection_threshold.value}) "
f"(total rejections: {self._rejections.value})."
)
# adapt acceptance threshold if too many solutions have been
# rejected
# adapt the acceptance threshold if too many solutions have
# been rejected
if self._rejections.value >= self._num_workers:
self._rejection_threshold.value = min(
self._rejection_threshold.value / 2,
self._rejection_threshold_min,
self._options.manager_minimum_rejection_threshold,
)
self._logger.debug(
"Lowered acceptance threshold to "
Expand All @@ -507,9 +515,6 @@ class SacessWorker:
to the manager.
_ess_kwargs: ESSOptimizer options for this worker (may get updated during
the self-adaptive step).
_acceptance_threshold: Minimum relative improvement of the objective
compared to the best known value to be eligible for submission to the
Manager.
_n_sent_solutions: Number of solutions sent to the Manager.
_max_walltime_s: Walltime limit.
_logger: A Logger instance.
Expand All @@ -527,15 +532,14 @@ def __init__(
loglevel: int = logging.INFO,
ess_loglevel: int = logging.WARNING,
tmp_result_file: str = None,
options: SacessOptions = None,
):
self._manager = manager
self._worker_idx = worker_idx
self._best_known_fx = np.inf
self._n_received_solutions = 0
self._neval = 0
self._ess_kwargs = ess_kwargs
# Default value from original SaCeSS implementation
self._acceptance_threshold = 0.0001
self._n_sent_solutions = 0
self._max_walltime_s = max_walltime_s
self._start_time = None
Expand All @@ -544,6 +548,7 @@ def __init__(
self._logger = None
self._tmp_result_file = tmp_result_file
self._refset = None
self._options = options or SacessOptions()

def run(
self,
Expand Down Expand Up @@ -618,6 +623,7 @@ def run(
exit_flag=ess.exit_flag,
)
self._manager._result_queue.put(worker_result)
self._logger.debug(f"Final configuration: {self._ess_kwargs}")
ess._report_final()

def _setup_ess(self, startpoint_method: StartpointMethod) -> ESSOptimizer:
Expand Down Expand Up @@ -665,15 +671,18 @@ def _cooperate(self):
self.replace_solution(self._refset, x=recv_x, fx=recv_fx)

def _maybe_adapt(self, problem: Problem):
"""Perform adaptation step.
"""Perform the adaptation step if needed.

Update ESS settings if conditions are met.
"""
# Update ESS settings if we received way more solutions than we sent
# Magic numbers from [PenasGon2017]_ algorithm 5
# Note: [PenasGon2017]_ Algorithm 5 uses AND in the following
# condition, but the accompanying implementation uses OR.
if (
self._n_received_solutions > 10 * self._n_sent_solutions + 20
and self._neval > problem.dim * 5000
self._n_received_solutions
> self._options.adaptation_sent_coeff * self._n_sent_solutions
+ self._options.adaptation_sent_offset
or self._neval > problem.dim * self._options.adaptation_min_evals
):
self._ess_kwargs = self._manager.reconfigure_worker(
self._worker_idx
Expand All @@ -693,17 +702,17 @@ def maybe_update_best(self, x: np.array, fx: float):
f"Worker {self._worker_idx} maybe sending solution {fx}. "
f"best known: {self._best_known_fx}, "
f"rel change: {rel_change:.4g}, "
f"threshold: {self._acceptance_threshold}"
f"threshold: {self._options.worker_acceptance_threshold}"
)

# solution improves best value by at least a factor of ...
# solution improves the best value by at least a factor of ...
if (
(np.isfinite(fx) and not np.isfinite(self._best_known_fx))
or (self._best_known_fx == 0 and fx < 0)
or (
fx < self._best_known_fx
and abs((self._best_known_fx - fx) / fx)
> self._acceptance_threshold
> self._options.worker_acceptance_threshold
)
):
self._logger.debug(
Expand Down Expand Up @@ -738,7 +747,7 @@ def replace_solution(refset: RefSet, x: np.array, fx: float):
refset.attributes["cooperative_solution"]
)
).size == 0:
# the attribute exists, but no member is marked as cooperative
# the attribute exists, but no member is marked as the cooperative
# solution. this may happen if we shrink the refset.
cooperative_solution_idx = np.argmax(refset.fx)

Expand Down Expand Up @@ -767,9 +776,7 @@ def _keep_going(self):
return True

@staticmethod
def get_temp_result_filename(
worker_idx: int, tmpdir: Union[str, Path]
) -> str:
def get_temp_result_filename(worker_idx: int, tmpdir: str | Path) -> str:
return str(Path(tmpdir, f"sacess-{worker_idx:02d}_tmp.h5").absolute())


Expand All @@ -786,7 +793,7 @@ def _run_worker(
# different random seeds per process
np.random.seed((os.getpid() * int(time.time() * 1000)) % 2**32)

# Forward log messages to logging process
# Forward log messages to the logging process
h = logging.handlers.QueueHandler(log_process_queue)
worker._logger = logging.getLogger(multiprocessing.current_process().name)
worker._logger.addHandler(h)
Expand All @@ -797,11 +804,9 @@ def _run_worker(
def get_default_ess_options(
num_workers: int,
dim: int,
local_optimizer: Union[
bool,
"pypesto.optimize.Optimizer",
Callable[..., "pypesto.optimize.Optimizer"],
] = True,
local_optimizer: bool
| pypesto.optimize.Optimizer
| Callable[..., pypesto.optimize.Optimizer] = True,
) -> list[dict]:
"""Get default ESS settings for (SA)CESS.

Expand Down Expand Up @@ -1017,8 +1022,8 @@ class SacessFidesFactory:

def __init__(
self,
fides_options: Optional[dict[str, Any]] = None,
fides_kwargs: Optional[dict[str, Any]] = None,
fides_options: dict[str, Any] | None = None,
fides_kwargs: dict[str, Any] | None = None,
):
if fides_options is None:
fides_options = {}
Expand All @@ -1038,7 +1043,7 @@ def __init__(

def __call__(
self, max_walltime_s: int, max_eval: int
) -> "pypesto.optimize.FidesOptimizer":
) -> pypesto.optimize.FidesOptimizer:
"""Create a :class:`FidesOptimizer` instance."""

from fides.constants import Options as FidesOptions
Expand Down Expand Up @@ -1085,5 +1090,76 @@ class SacessWorkerResult:
fx: float
n_eval: int
n_iter: int
history: "pypesto.history.memory.MemoryHistory"
history: pypesto.history.memory.MemoryHistory
exit_flag: ESSExitFlag


@dataclass
class SacessOptions:
"""Container for :class:`SacessOptimizer` hyperparameters.

Attributes
----------
manager_initial_rejection_threshold, manager_minimum_rejection_threshold:
Initial and minimum threshold for relative objective improvements that
incoming solutions have to pass to be accepted. If the number of
rejected solutions exceeds the number of workers, the threshold is
halved until it reaches ``manager_minimum_rejection_threshold``.

_acceptance_threshold: Minimum relative improvement of the objective
compared to the best known value to be eligible for submission to the
Manager.

worker_acceptance_threshold:
Minimum relative improvement of the objective compared to the best
known value to be eligible for submission to the Manager.
dweindl marked this conversation as resolved.
Show resolved Hide resolved

adaptation_min_evals, adaptation_sent_offset, adaptation_sent_coeff:
Hyperparameters that control when the workers will adapt their settings
based on the performance of the other workers.

The adaptation step is performed if all the following conditions are
met:

* The number of function evaluations since the last solution was sent
to the manager times the number of optimization parameters is greater
than ``adaptation_min_evals``.

* The number of solutions received by the worker since the last
solution it sent to the manager is greater than
``adaptation_sent_coeff * n_sent_solutions + adaptation_sent_offset``,
where ``n_sent_solutions`` is the number of solutions sent to the
manager by the given worker.

"""

manager_initial_rejection_threshold: float = 0.001
manager_minimum_rejection_threshold: float = 0.001

# Default value from original SaCeSS implementation
worker_acceptance_threshold: float = 0.0001

# Magic numbers for adaptation, taken from [PenasGon2017]_ algorithm 5
adaptation_min_evals: int = 5000
adaptation_sent_offset: int = 20
adaptation_sent_coeff: int = 10

def __post_init__(self):
if self.adaptation_min_evals < 0:
raise ValueError("adaptation_min_evals must be non-negative.")
if self.adaptation_sent_offset < 0:
raise ValueError("adaptation_sent_offset must be non-negative.")
if self.adaptation_sent_coeff < 0:
raise ValueError("adaptation_sent_coeff must be non-negative.")
if self.manager_initial_rejection_threshold < 0:
raise ValueError(
"manager_initial_rejection_threshold must be non-negative."
)
if self.manager_minimum_rejection_threshold < 0:
raise ValueError(
"manager_minimum_rejection_threshold must be non-negative."
)
if self.worker_acceptance_threshold < 0:
raise ValueError(
"worker_acceptance_threshold must be non-negative."
)
Loading