From 50f6e159f0cc50f906a1c2202ddbfe63a212c1ac Mon Sep 17 00:00:00 2001 From: Aaron <39431903+whzup@users.noreply.github.com> Date: Sat, 9 Feb 2019 04:47:50 +0100 Subject: [PATCH] Add BoundaryHandler and VelocityHandler (#238) Implements the new `BoundaryHandler` and `VelocityHandler` classes that deal with the issue of particles that don't move in optimisations with boundary conditions. This is achieved by using different strategies that have been described in [Hel10](https://opus4.kobv.de/opus4-fau/frontdoor/index/index/year/2010/docId/1328) (P. 28). These strategies include: - Nearest bound: Reset particle to the nearest boundary - Random: Reset the particle to a random position within the bounds - Shrink: Shrink the velocity such that the particle touches the boundary instead of surpassing it - Intermediate: Reset the particle to a value between its former location and the bound - Resample: Calculate new velocities until the next particle position is within the bounds - Periodic: Tile the space with the same search space over and over - Reflective: Mirror the position of the particle at the surpassed boundary Resolves #237 --- docs/api/_pyswarms.backend.rst | 1 + docs/api/pyswarms.backend.rst | 15 +- docs/api/pyswarms.handlers.rst | 17 + pyswarms/backend/__init__.py | 3 +- pyswarms/backend/handlers.py | 536 +++++++++++++++++++++ pyswarms/backend/operators.py | 64 ++- pyswarms/backend/topology/pyramid.py | 31 +- pyswarms/backend/topology/random.py | 36 +- pyswarms/backend/topology/ring.py | 36 +- pyswarms/backend/topology/star.py | 32 +- pyswarms/discrete/binary.py | 12 +- pyswarms/single/general_optimizer.py | 20 +- pyswarms/single/global_best.py | 15 +- pyswarms/single/local_best.py | 21 +- tests/backend/conftest.py | 54 +++ tests/backend/test_handlers.py | 181 +++++++ tests/backend/test_operators.py | 32 +- tests/backend/topology/test_pyramid.py | 4 +- tests/backend/topology/test_random.py | 13 +- tests/backend/topology/test_ring.py | 13 +- tests/backend/topology/test_von_neumann.py | 13 +- 21 files changed, 1033 insertions(+), 116 deletions(-) create mode 100644 docs/api/pyswarms.handlers.rst create mode 100644 pyswarms/backend/handlers.py create mode 100644 tests/backend/test_handlers.py diff --git a/docs/api/_pyswarms.backend.rst b/docs/api/_pyswarms.backend.rst index ddae95c1..e8ac4e25 100644 --- a/docs/api/_pyswarms.backend.rst +++ b/docs/api/_pyswarms.backend.rst @@ -9,6 +9,7 @@ as GlobalBestPSO and LocalBestPSO were built using the backend module. .. toctree:: pyswarms.backend + pyswarms.handlers pyswarms.topology pyswarms.swarms diff --git a/docs/api/pyswarms.backend.rst b/docs/api/pyswarms.backend.rst index 73be7107..2432743c 100644 --- a/docs/api/pyswarms.backend.rst +++ b/docs/api/pyswarms.backend.rst @@ -5,20 +5,27 @@ You can import all the native helper methods in this package using the command: .. code-block:: python - - import pyswarms.backend as P + + import pyswarms.backend as P Then call the methods found in each module. Note that these methods interface with the Swarm class provided in the :mod:`pyswarms.backend.swarms` module. pyswarms.backend.generators module ------------------------------------ +---------------------------------- .. automodule:: pyswarms.backend.generators :members: +pyswarms.backend.handlers module +-------------------------------- + +.. automodule:: pyswarms.backend.handlers + :members: + pyswarms.backend.operators module ---------------------------------- .. automodule:: pyswarms.backend.operators - :members: \ No newline at end of file + :members: + diff --git a/docs/api/pyswarms.handlers.rst b/docs/api/pyswarms.handlers.rst new file mode 100644 index 00000000..8e896109 --- /dev/null +++ b/docs/api/pyswarms.handlers.rst @@ -0,0 +1,17 @@ +pyswarms.handlers package +========================= + +This package implements different handling strategies for the optimiziation +using boundary conditions. The strategies help avoiding that particles +leave the defined search space. There are two :code:`Handler` classes +that provide these functionalities, the :class:`BoundaryHandler` and the +:class:`VelocityHandler`. + +pyswarms.handlers class +----------------------- + +.. automodule:: pyswarms.backend.handlers + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__, __call__ diff --git a/pyswarms/backend/__init__.py b/pyswarms/backend/__init__.py index 425d3450..92811332 100644 --- a/pyswarms/backend/__init__.py +++ b/pyswarms/backend/__init__.py @@ -5,7 +5,8 @@ """ from .generators import * +from .handlers import * from .operators import * from .swarms import * -__all__ = ["generators", "operators", "swarms"] +__all__ = ["generators", "handlers", "operators", "swarms"] diff --git a/pyswarms/backend/handlers.py b/pyswarms/backend/handlers.py new file mode 100644 index 00000000..348869f7 --- /dev/null +++ b/pyswarms/backend/handlers.py @@ -0,0 +1,536 @@ +""" +Handlers + +This module provides Handler classes for the position as well as the velocity +of particles. This is necessary when boundary conditions are imposed on the PSO +algorithm. Particles that do not stay inside these boundary conditions have to +be handled by either adjusting their position after they left the bounded +search space or adjusting their velocity when it would position them outside +the search space. In particular, this approach is important if the optimium of +a function is near the boundaries. +For the following documentation let :math:`x_{i, t, d}` be the :math:`d` th +coordinate of the particle :math:`i` 's position vector at the time :math:`t`, +:math:`lb` the vector of the lower boundaries and :math:`ub` the vector of the +upper boundaries. +The algorithms in this module are adapted from [SH2010] + +[SH2010] Sabine Helwig, "Particle Swarms for Constrained Optimization", +PhD thesis, Friedrich-Alexander Universität Erlangen-Nürnberg, 2010. +""" + +import inspect +import logging + +import numpy as np + +from ..utils.reporter import Reporter + + +class HandlerMixin(object): + """ A HandlerMixing class + + This class offers some basic functionality for the Handlers. + """ + + def _merge_dicts(self, *dict_args): + """Backward-compatible helper method to combine two dicts""" + result = {} + for dictionary in dict_args: + result.update(dictionary) + return result + + def _out_of_bounds(self, position, bounds): + """Helper method to find indices of out-of-bound positions + + This method finds the indices of the particles that are out-of-bound. + """ + lb, ub = bounds + greater_than_bound = np.nonzero(position > ub) + lower_than_bound = np.nonzero(position < lb) + return (lower_than_bound, greater_than_bound) + + def _get_all_strategies(self): + """Helper method to automatically generate a dict of strategies""" + return { + k: v + for k, v in inspect.getmembers(self, predicate=inspect.isroutine) + if not k.startswith(("__", "_")) + } + + +class BoundaryHandler(HandlerMixin): + def __init__(self, strategy): + """ A BoundaryHandler class + + This class offers a way to handle boundary conditions. It contains + methods to repair particle positions outside of the defined boundaries. + Following strategies are available for the handling: + + * Nearest: + Reposition the particle to the nearest bound. + * Random: + Reposition the particle randomly in between the bounds. + * Shrink: + Shrink the velocity of the particle such that it lands on the + bounds. + * Reflective: + Mirror the particle position from outside the bounds to inside the + bounds. + * Intermediate: + Reposition the particle to the midpoint between its current + position on the bound surpassing axis and the bound itself. This + only adjusts the axes that surpass the boundaries. + + The BoundaryHandler can be called as a function to use the strategy + that is passed at initialization to repair boundary issues. An example + for the usage: + + .. code-block :: python + + from pyswarms.backend import operators as op + from pyswarms.backend.handlers import BoundaryHandler + + bh = BoundaryHandler(strategy="reflective") + ops.compute_position(swarm, bounds, handler=bh) + + By passing the handler, the :func:`compute_position()` function now has + the ability to reset the particles by calling the :code:`BoundaryHandler` + inside. + + Attributes + ---------- + strategy : str + The strategy to use. To see all available strategies, + call :code:`BoundaryHandler.strategies` + """ + self.strategy = strategy + self.strategies = self._get_all_strategies() + self.rep = Reporter(logger=logging.getLogger(__name__)) + self.memory = None + + def __call__(self, position, bounds, **kwargs): + """Apply the selected strategy to the position-matrix given the bounds + + Parameters + ---------- + position : np.ndarray + The swarm position to be handled + bounds : tuple of :code:`np.ndarray` or list + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)` + kwargs : dict + + Returns + ------- + numpy.ndarray + the adjusted positions of the swarm + """ + try: + new_position = self.strategies[self.strategy]( + position, bounds, **kwargs + ) + except KeyError: + message = "Unrecognized strategy: {}. Choose one among: " + str( + [strat for strat in self.strategies.keys()] + ) + self.rep.logger.exception(message.format(self.strategy)) + raise + else: + return new_position + + def nearest(self, position, bounds, **kwargs): + r"""Set position to nearest bound + + This method resets particles that exceed the bounds to the nearest + available boundary. For every axis on which the coordiantes of the particle + surpasses the boundary conditions the coordinate is set to the respective + bound that it surpasses. + The following equation describes this strategy: + + .. math:: + + x_{i, t, d} = \begin{cases} + lb_d & \quad \text{if }x_{i, t, d} < lb_d \\ + ub_d & \quad \text{if }x_{i, t, d} > ub_d \\ + x_{i, t, d} & \quad \text{otherwise} + \end{cases} + + """ + lb, ub = bounds + bool_greater = position > ub + bool_lower = position < lb + new_pos = np.where(bool_lower, lb, position) + new_pos = np.where(bool_greater, ub, new_pos) + return new_pos + + def reflective(self, position, bounds, **kwargs): + r"""Reflect the particle at the boundary + + This method reflects the particles that exceed the bounds at the respective + boundary. This means that the amount that the component which is orthogonal to + the exceeds the boundary is mirrored at the boundary. The reflection is repeated + until the position of the particle is within the boundaries. The following + algorithm describes the behaviour of this strategy: + + .. math:: + :nowrap: + + \begin{gather*} + \text{while } x_{i, t, d} \not\in \left[lb_d,\,ub_d\right] \text{ do the + following:} + x_{i, t, d} = \begin{cases} + 2\cdot lb_d - x_{i, t, d} & \quad \text{if } x_{i, + t, d} < lb_d + 2\cdot ub_d - x_{i, t, d} & \quad \text{if } x_{i, + t, d} > ub_d + x_{i, t, d} & \quad \text{otherwise} + \end{cases} + \end{gather*} + """ + lb, ub = bounds + lower_than_bound, greater_than_bound = self._out_of_bounds( + position, bounds + ) + new_pos = position + while lower_than_bound[0].size != 0 or greater_than_bound[0].size != 0: + if lower_than_bound[0].size > 0: + new_pos[lower_than_bound] = 2 * lb[lower_than_bound[1]] - \ + new_pos[lower_than_bound] + if greater_than_bound[0].size > 0: + new_pos[greater_than_bound] = 2 * ub[greater_than_bound[1]] - \ + new_pos[greater_than_bound] + lower_than_bound, greater_than_bound = self._out_of_bounds( + new_pos, bounds + ) + + return new_pos + + def shrink(self, position, bounds, **kwargs): + r"""Set the particle to the boundary + + This method resets particles that exceed the bounds to the intersection + of its previous velocity and the boundary. This can be imagined as shrinking + the previous velocity until the particle is back in the valid search space. + Let :math:`\sigma_{i, t, d}` be the :math:`d` th shrinking value of the + :math:`i` th particle at the time :math:`t` and :math:`v_{i, t}` the velocity + of the :math:`i` th particle at the time :math:`t`. Then the new position is + computed by the following equation: + + .. math:: + :nowrap: + + \begin{gather*} + \mathbf{x}_{i, t} = \mathbf{x}_{i, t-1} + \sigma_{i, t} \mathbf{v}_{i, t} \\ + \\ + \text{with} \\ + \\ + \sigma_{i, t, d} = \begin{cases} + \frac{lb_d-x_{i, t-1, d}}{v_{i, t, d}} & \quad \text{if } x_{i, t, d} < lb_d \\ + \frac{ub_d-x_{i, t-1, d}}{v_{i, t, d}} & \quad \text{if } x_{i, t, d} > ub_d \\ + 1 & \quad \text{otherwise} + \end{cases} \\ + \\ + \text{and} \\ + \\ + \sigma_{i, t} = \min_{d=1...n} \sigma_{i, t, d} + \\ + \end{gather*} + + """ + if self.memory is None: + new_pos = position + self.memory = new_pos + else: + lb, ub = bounds + lower_than_bound, greater_than_bound = self._out_of_bounds( + position, bounds + ) + velocity = position - self.memory + # Create a coefficient matrix + sigma = np.tile(1.0, position.shape) + sigma[lower_than_bound] = ( + lb[lower_than_bound[1]] - self.memory[lower_than_bound] + ) / velocity[lower_than_bound] + sigma[greater_than_bound] = ( + ub[greater_than_bound[1]] - self.memory[greater_than_bound] + ) / velocity[greater_than_bound] + min_sigma = np.amin(sigma, axis=1) + new_pos = position + new_pos[lower_than_bound[0]] = ( + self.memory[lower_than_bound[0]] + + np.multiply( + min_sigma[lower_than_bound[0]], + velocity[lower_than_bound[0]].T, + ).T + ) + new_pos[greater_than_bound[0]] = ( + self.memory[greater_than_bound[0]] + + np.multiply( + min_sigma[greater_than_bound[0]], + velocity[greater_than_bound[0]].T, + ).T + ) + self.memory = new_pos + return new_pos + + def random(self, position, bounds, **kwargs): + """Set position to random location + + This method resets particles that exeed the bounds to a random position + inside the boundary conditions. + """ + lb, ub = bounds + lower_than_bound, greater_than_bound = self._out_of_bounds( + position, bounds + ) + # Set indices that are greater than bounds + new_pos = position + new_pos[greater_than_bound[0]] = np.array( + [ + np.array([u - l for u, l in zip(ub, lb)]) + * np.random.random_sample((position.shape[1],)) + + lb + ] + ) + new_pos[lower_than_bound[0]] = np.array( + [ + np.array([u - l for u, l in zip(ub, lb)]) + * np.random.random_sample((position.shape[1],)) + + lb + ] + ) + return new_pos + + def intermediate(self, position, bounds, **kwargs): + r"""Set the particle to an intermediate position + + This method resets particles that exceed the bounds to an intermediate + position between the boundary and their earlier position. Namely, it changes + the coordinate of the out-of-bounds axis to the middle value between the + previous position and the boundary of the axis. + The following equation describes this strategy: + + .. math:: + + x_{i, t, d} = \begin{cases} + \frac{1}{2} \left (x_{i, t-1, d} + lb_d \right) & \quad \text{if }x_{i, t, d} < lb_d \\ + \frac{1}{2} \left (x_{i, t-1, d} + ub_d \right) & \quad \text{if }x_{i, t, d} > ub_d \\ + x_{i, t, d} & \quad \text{otherwise} + \end{cases} + + """ + if self.memory is None: + new_pos = position + self.memory = new_pos + else: + lb, ub = bounds + lower_than_bound, greater_than_bound = self._out_of_bounds( + position, bounds + ) + new_pos = position + new_pos[lower_than_bound] = 0.5 * ( + self.memory[lower_than_bound] + lb[lower_than_bound[1]] + ) + new_pos[greater_than_bound] = 0.5 * ( + self.memory[greater_than_bound] + ub[greater_than_bound[1]] + ) + self.memory = new_pos + return new_pos + + def periodic(self, position, bounds, **kwargs): + r"""Sets the particles a periodic fashion + + This method resets the particles that exeed the bounds by using the + modulo function to cut down the position. This creates a virtual, + periodic plane which is tiled with the search space. + The following equation describtes this strategy: + + .. math:: + :nowrap: + + \begin{gather*} + x_{i, t, d} = \begin{cases} + ub_d - (lb_d - x_{i, t, d}) \mod s_d & \quad \text{if }x_{i, t, d} < lb_d \\ + lb_d + (x_{i, t, d} - ub_d) \mod s_d & \quad \text{if }x_{i, t, d} > ub_d \\ + x_{i, t, d} & \quad \text{otherwise} + \end{cases}\\ + \\ + \text{with}\\ + \\ + s_d = |ub_d - lb_d| + \end{gather*} + + """ + lb, ub = bounds + lower_than_bound, greater_than_bound = self._out_of_bounds( + position, bounds + ) + bound_d = np.tile( + np.abs(np.array(ub) - np.array(lb)), (position.shape[0], 1) + ) + ub = np.tile(ub, (position.shape[0], 1)) + lb = np.tile(lb, (position.shape[0], 1)) + new_pos = position + if lower_than_bound[0].size != 0 and lower_than_bound[1].size != 0: + new_pos[lower_than_bound] = ub[lower_than_bound] - np.mod( + (lb[lower_than_bound] - new_pos[lower_than_bound]), + bound_d[lower_than_bound], + ) + if greater_than_bound[0].size != 0 and greater_than_bound[1].size != 0: + new_pos[greater_than_bound] = lb[greater_than_bound] + np.mod( + (new_pos[greater_than_bound] - ub[greater_than_bound]), + bound_d[greater_than_bound], + ) + return new_pos + + +class VelocityHandler(HandlerMixin): + def __init__(self, strategy): + """ A VelocityHandler class + + This class offers a way to handle velocities. It contains + methods to repair the velocities of particles that exceeded the + defined boundaries. Following strategies are available for the handling: + + * Unmodified: + Returns the unmodified velocites. + * Adjust + Returns the velocity that is adjusted to be the distance between the current + and the previous position. + * Invert + Inverts and shrinks the velocity by the factor :code:`-z`. + * Zero + Sets the velocity of out-of-bounds particles to zero. + + """ + self.strategy = strategy + self.strategies = self._get_all_strategies() + self.rep = Reporter(logger=logging.getLogger(__name__)) + self.memory = None + + def __call__(self, velocity, clamp, **kwargs): + """Apply the selected strategy to the velocity-matrix given the bounds + + Parameters + ---------- + velocity : np.ndarray + The swarm position to be handled + clamp : tuple of :code:`np.ndarray` or list + a tuple of size 2 where the first entry is the minimum clamp while + the second entry is the maximum clamp. Each array must be of shape + :code:`(dimensions,)` + kwargs : dict + + Returns + ------- + numpy.ndarray + the adjusted positions of the swarm + """ + try: + new_position = self.strategies[self.strategy]( + velocity, clamp, **kwargs + ) + except KeyError: + message = "Unrecognized strategy: {}. Choose one among: " + str( + [strat for strat in self.strategies.keys()] + ) + self.rep.logger.exception(message.format(self.strategy)) + raise + else: + return new_position + + def __apply_clamp(self, velocity, clamp): + """Helper method to apply a clamp to a velocity vector""" + clamped_vel = velocity + min_velocity, max_velocity = clamp + lower_than_clamp = clamped_vel <= min_velocity + greater_than_clamp = clamped_vel >= max_velocity + clamped_vel = np.where(lower_than_clamp, min_velocity, clamped_vel) + clamped_vel = np.where(greater_than_clamp, max_velocity, clamped_vel) + return clamped_vel + + def unmodified(self, velocity, clamp=None, **kwargs): + """Leaves the velocity unchanged""" + if clamp is None: + new_vel = velocity + else: + if clamp is not None: + new_vel = self.__apply_clamp(velocity, clamp) + return new_vel + + def adjust(self, velocity, clamp=None, **kwargs): + r"""Adjust the velocity to the new position + + The velocity is adjusted such that the following equation holds: + .. math:: + + \mathbf{v_{i,t}} = \mathbf{x_{i,t}} - \mathbf{x_{i,t-1}} + + .. note:: + This method should only be used in combination with a position handling + operation. + + """ + try: + if self.memory is None: + new_vel = velocity + self.memory = kwargs["position"] + else: + new_vel = kwargs["position"] - self.memory + self.memory = kwargs["position"] + if clamp is not None: + new_vel = self.__apply_clamp(new_vel, clamp) + except KeyError: + self.rep.logger.exception("Keyword 'position' missing") + raise + else: + return new_vel + + def invert(self, velocity, clamp=None, **kwargs): + r"""Invert the velocity if the particle is out of bounds + + The velocity is inverted and shrinked. The shrinking is determined by the + kwarg :code:`z`. The default shrinking factor is :code:`0.5`. For all + velocities whose particles are out of bounds the following equation is + applied: + .. math:: + + \mathbf{v_{i,t}} = -z\mathbf{v_{i,t}} + """ + try: + # Default for the shrinking factor + if "z" not in kwargs: + z = 0.5 + else: + z = kwargs["z"] + lower_than_bound, greater_than_bound = self._out_of_bounds( + kwargs["position"], kwargs["bounds"] + ) + new_vel = velocity + new_vel[lower_than_bound[0]] = (-z) * new_vel[lower_than_bound[0]] + new_vel[greater_than_bound[0]] = (-z) * new_vel[ + greater_than_bound[0] + ] + if clamp is not None: + new_vel = self.__apply_clamp(new_vel, clamp) + except KeyError: + self.rep.logger.exception("Keyword 'position' or 'bounds' missing") + raise + else: + return new_vel + + def zero(self, velocity, clamp=None, **kwargs): + """Set velocity to zero if the particle is out of bounds""" + try: + lower_than_bound, greater_than_bound = self._out_of_bounds( + kwargs["position"], kwargs["bounds"] + ) + new_vel = velocity + new_vel[lower_than_bound[0]] = np.zeros(velocity.shape[1]) + new_vel[greater_than_bound[0]] = np.zeros(velocity.shape[1]) + except KeyError: + self.rep.logger.exception("Keyword 'position' or 'bounds' missing") + raise + else: + return new_vel diff --git a/pyswarms/backend/operators.py b/pyswarms/backend/operators.py index 2ff104e3..47902703 100644 --- a/pyswarms/backend/operators.py +++ b/pyswarms/backend/operators.py @@ -15,6 +15,8 @@ import numpy as np from ..utils.reporter import Reporter +from .handlers import BoundaryHandler, VelocityHandler + rep = Reporter(logger=logging.getLogger(__name__)) @@ -76,25 +78,27 @@ def compute_pbest(swarm): return (new_pbest_pos, new_pbest_cost) -def compute_velocity(swarm, clamp): +def compute_velocity(swarm, clamp, vh, bounds=None): """Update the velocity matrix This method updates the velocity matrix using the best and current positions of the swarm. The velocity matrix is computed using the - cognitive and social terms of the swarm. + cognitive and social terms of the swarm. The velocity is handled + by a :code:`VelocityHandler`. A sample usage can be seen with the following: .. code-block :: python import pyswarms.backend as P - from pyswarms.swarms.backend import Swarm + from pyswarms.swarms.backend import Swarm, VelocityHandler my_swarm = P.create_swarm(n_particles, dimensions) + my_vh = VelocityHandler(strategy="invert") for i in range(iters): # Inside the for-loop - my_swarm.velocity = update_velocity(my_swarm, clamp) + my_swarm.velocity = compute_velocity(my_swarm, clamp, my_vh, bounds) Parameters ---------- @@ -104,6 +108,13 @@ def compute_velocity(swarm, clamp): a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh : pyswarms.backend.handlers.VelocityHandler + a VelocityHandler object with a specified handling strategy. + For further information see :mod:`pyswarms.backend.handlers`. + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. Returns ------- @@ -129,15 +140,10 @@ def compute_velocity(swarm, clamp): ) # Compute temp velocity (subject to clamping if possible) temp_velocity = (w * swarm.velocity) + cognitive + social + updated_velocity = vh( + temp_velocity, clamp, position=swarm.position, bounds=bounds + ) - if clamp is None: - updated_velocity = temp_velocity - else: - min_velocity, max_velocity = clamp - mask = np.logical_and( - temp_velocity >= min_velocity, temp_velocity <= max_velocity - ) - updated_velocity = np.where(~mask, swarm.velocity, temp_velocity) except AttributeError: rep.logger.exception( "Please pass a Swarm class. You passed {}".format(type(swarm)) @@ -150,11 +156,24 @@ def compute_velocity(swarm, clamp): return updated_velocity -def compute_position(swarm, bounds): +def compute_position(swarm, bounds, bh): """Update the position matrix This method updates the position matrix given the current position and - the velocity. If bounded, it waives updating the position. + the velocity. If bounded, the positions are handled by a :code:`BoundaryHandler` + instance. + + .. code-block :: python + + import pyswarms.backend as P + from pyswarms.swarms.backend import Swarm, VelocityHandler + + my_swarm = P.create_swarm(n_particles, dimensions) + my_bh = BoundaryHandler(strategy="intermediate") + + for i in range(iters): + # Inside the for-loop + my_swarm.position = compute_position(my_swarm, bounds, my_bh) Parameters ---------- @@ -164,6 +183,9 @@ def compute_position(swarm, bounds): a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh : pyswarms.backend.handlers.BoundaryHandler + a BoundaryHandler object with a specified handling strategy + For further information see :mod:`pyswarms.backend.handlers`. Returns ------- @@ -175,18 +197,8 @@ def compute_position(swarm, bounds): temp_position += swarm.velocity if bounds is not None: - lb, ub = bounds - min_bounds = np.repeat( - np.array(lb)[np.newaxis, :], swarm.n_particles, axis=0 - ) - max_bounds = np.repeat( - np.array(ub)[np.newaxis, :], swarm.n_particles, axis=0 - ) - mask = np.all(min_bounds <= temp_position, axis=1) * np.all( - temp_position <= max_bounds, axis=1 - ) - mask = np.repeat(mask[:, np.newaxis], swarm.dimensions, axis=1) - temp_position = np.where(~mask, swarm.position, temp_position) + temp_position = bh(temp_position, bounds) + position = temp_position except AttributeError: rep.logger.exception( diff --git a/pyswarms/backend/topology/pyramid.py b/pyswarms/backend/topology/pyramid.py index 3fa5e489..91f82e2f 100644 --- a/pyswarms/backend/topology/pyramid.py +++ b/pyswarms/backend/topology/pyramid.py @@ -14,6 +14,7 @@ from scipy.spatial import Delaunay from .. import operators as ops +from ..handlers import BoundaryHandler, VelocityHandler from ...utils.reporter import Reporter from .base import Topology @@ -105,7 +106,13 @@ def compute_gbest(self, swarm, **kwargs): else: return (best_pos, best_cost) - def compute_velocity(self, swarm, clamp=None): + def compute_velocity( + self, + swarm, + clamp=None, + vh=VelocityHandler(strategy="unmodified"), + bounds=None, + ): """Compute the velocity matrix This method updates the velocity matrix using the best and current @@ -117,15 +124,18 @@ def compute_velocity(self, swarm, clamp=None): .. code-block :: python import pyswarms.backend as P - from pyswarms.swarms.backend import Swarm + from pyswarms.backend.swarm import Swarm + from pyswarms.backend.handlers import VelocityHandler from pyswarms.backend.topology import Pyramid my_swarm = P.create_swarm(n_particles, dimensions) my_topology = Pyramid(static=False) + my_vh = VelocityHandler(strategy="zero") for i in range(iters): # Inside the for-loop - my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp, my_vh, + bounds=bounds) Parameters ---------- @@ -135,15 +145,23 @@ def compute_velocity(self, swarm, clamp=None): a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh : pyswarms.backend.handlers.VelocityHandler + a VelocityHandler instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. Returns ------- numpy.ndarray Updated velocity matrix """ - return ops.compute_velocity(swarm, clamp) + return ops.compute_velocity(swarm, clamp, vh, bounds=bounds) - def compute_position(self, swarm, bounds=None): + def compute_position( + self, swarm, bounds=None, bh=BoundaryHandler(strategy="periodic") + ): """Update the position matrix This method updates the position matrix given the current position and @@ -157,10 +175,11 @@ def compute_position(self, swarm, bounds=None): a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh : a BoundaryHandler instance Returns ------- numpy.ndarray New position-matrix """ - return ops.compute_position(swarm, bounds) + return ops.compute_position(swarm, bounds, bh) diff --git a/pyswarms/backend/topology/random.py b/pyswarms/backend/topology/random.py index adf9a859..017b69a4 100644 --- a/pyswarms/backend/topology/random.py +++ b/pyswarms/backend/topology/random.py @@ -15,6 +15,7 @@ from scipy.sparse.csgraph import connected_components, dijkstra from .. import operators as ops +from ..handlers import BoundaryHandler, VelocityHandler from ...utils.reporter import Reporter from .base import Topology @@ -95,7 +96,13 @@ def compute_gbest(self, swarm, k, **kwargs): else: return (best_pos, best_cost) - def compute_velocity(self, swarm, clamp=None): + def compute_velocity( + self, + swarm, + clamp=None, + vh=VelocityHandler(strategy="unmodified"), + bounds=None, + ): """Compute the velocity matrix This method updates the velocity matrix using the best and current @@ -107,15 +114,18 @@ def compute_velocity(self, swarm, clamp=None): .. code-block :: python import pyswarms.backend as P - from pyswarms.swarms.backend import Swarm + from pyswarms.backend.swarm import Swarm + from pyswarms.backend.handlers import VelocityHandler from pyswarms.backend.topology import Random my_swarm = P.create_swarm(n_particles, dimensions) my_topology = Random(static=False) + my_vh = VelocityHandler(strategy="zero") for i in range(iters): # Inside the for-loop - my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp, my_vh, + bounds) Parameters ---------- @@ -125,15 +135,23 @@ def compute_velocity(self, swarm, clamp=None): a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh : pyswarms.backend.handlers.VelocityHandler + a VelocityHandler instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. Returns ------- numpy.ndarray Updated velocity matrix """ - return ops.compute_velocity(swarm, clamp) + return ops.compute_velocity(swarm, clamp, vh, bounds=bounds) - def compute_position(self, swarm, bounds=None): + def compute_position( + self, swarm, bounds=None, bh=BoundaryHandler(strategy="periodic") + ): """Update the position matrix This method updates the position matrix given the current position and @@ -147,13 +165,15 @@ def compute_position(self, swarm, bounds=None): a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh : pyswarms.backend.handlers.BoundaryHandler + a BoundaryHandler instance Returns ------- numpy.ndarray New position-matrix """ - return ops.compute_position(swarm, bounds) + return ops.compute_position(swarm, bounds, bh) def __compute_neighbors(self, swarm, k): """Helper method to compute the adjacency matrix of the topology @@ -171,8 +191,8 @@ def __compute_neighbors(self, swarm, k): * neighbor_matrix : The matrix of randomly generated neighbors. This matrix is a matrix of shape :code:`(swarm.n_particles, k)`: - with randomly generated elements. It's used - to create connections in the adj_matrix. + with randomly generated elements. It is used + to create connections in the :code:`adj_matrix`. * dist_matrix : The distance matrix computed with Dijkstra's algorithm. It is used to determine where the graph needs edges to change it to a connected diff --git a/pyswarms/backend/topology/ring.py b/pyswarms/backend/topology/ring.py index 5a7c9fc7..573e0fae 100644 --- a/pyswarms/backend/topology/ring.py +++ b/pyswarms/backend/topology/ring.py @@ -17,6 +17,7 @@ from scipy.spatial import cKDTree from .. import operators as ops +from ..handlers import BoundaryHandler, VelocityHandler from ...utils.reporter import Reporter from .base import Topology @@ -64,7 +65,9 @@ def compute_gbest(self, swarm, p, k, **kwargs): if (self.static and self.neighbor_idx is None) or not self.static: # Obtain the nearest-neighbors for each particle tree = cKDTree(swarm.position) - _, self.neighbor_idx = tree.query(swarm.position, p=p, k=k) + _, self.neighbor_idx = tree.query( + swarm.position, p=p, k=k + ) # Map the computed costs to the neighbour indices and take the # argmin. If k-neighbors is equal to 1, then the swarm acts @@ -89,7 +92,13 @@ def compute_gbest(self, swarm, p, k, **kwargs): else: return (best_pos, best_cost) - def compute_velocity(self, swarm, clamp=None): + def compute_velocity( + self, + swarm, + clamp=None, + vh=VelocityHandler(strategy="unmodified"), + bounds=None, + ): """Compute the velocity matrix This method updates the velocity matrix using the best and current @@ -101,15 +110,18 @@ def compute_velocity(self, swarm, clamp=None): .. code-block :: python import pyswarms.backend as P - from pyswarms.swarms.backend import Swarm + from pyswarms.backend.swarm import Swarm + from pyswarms.backend.handlers import VelocityHandler from pyswarms.backend.topology import Ring my_swarm = P.create_swarm(n_particles, dimensions) my_topology = Ring(static=False) + my_vh = VelocityHandler(strategy="invert") for i in range(iters): # Inside the for-loop - my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp, my_vh, + bounds) Parameters ---------- @@ -119,15 +131,23 @@ def compute_velocity(self, swarm, clamp=None): a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh : pyswarms.backend.handlers.VelocityHandler + a VelocityHandler instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. Returns ------- numpy.ndarray Updated velocity matrix """ - return ops.compute_velocity(swarm, clamp) + return ops.compute_velocity(swarm, clamp, vh, bounds) - def compute_position(self, swarm, bounds=None): + def compute_position( + self, swarm, bounds=None, bh=BoundaryHandler(strategy="periodic") + ): """Update the position matrix This method updates the position matrix given the current position and @@ -141,10 +161,12 @@ def compute_position(self, swarm, bounds=None): a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh : pyswarms.backend.handlers.BoundaryHandler + a BoundaryHandler instance Returns ------- numpy.ndarray New position-matrix """ - return ops.compute_position(swarm, bounds) + return ops.compute_position(swarm, bounds, bh) diff --git a/pyswarms/backend/topology/star.py b/pyswarms/backend/topology/star.py index 8fc7b094..0a166b47 100644 --- a/pyswarms/backend/topology/star.py +++ b/pyswarms/backend/topology/star.py @@ -16,6 +16,7 @@ import numpy as np from .. import operators as ops +from ..handlers import BoundaryHandler, VelocityHandler from ...utils.reporter import Reporter from .base import Topology @@ -78,7 +79,13 @@ def compute_gbest(self, swarm, **kwargs): else: return (best_pos, best_cost) - def compute_velocity(self, swarm, clamp=None): + def compute_velocity( + self, + swarm, + clamp=None, + vh=VelocityHandler(strategy="unmodified"), + bounds=None, + ): """Compute the velocity matrix This method updates the velocity matrix using the best and current @@ -90,15 +97,18 @@ def compute_velocity(self, swarm, clamp=None): .. code-block :: python import pyswarms.backend as P - from pyswarms.swarms.backend import Swarm + from pyswarms.backend.swarm import Swarm + from pyswarms.backend.handlers import VelocityHandler from pyswarms.backend.topology import Star my_swarm = P.create_swarm(n_particles, dimensions) my_topology = Star() + my_vh = VelocityHandler(strategy="adjust") for i in range(iters): # Inside the for-loop - my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp, my_vh, + bounds) Parameters ---------- @@ -108,15 +118,23 @@ def compute_velocity(self, swarm, clamp=None): a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh : pyswarms.backend.handlers.VelocityHandler + a VelocityHandler instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. Returns ------- numpy.ndarray Updated velocity matrix """ - return ops.compute_velocity(swarm, clamp) + return ops.compute_velocity(swarm, clamp, vh, bounds=bounds) - def compute_position(self, swarm, bounds=None): + def compute_position( + self, swarm, bounds=None, bh=BoundaryHandler(strategy="periodic") + ): """Update the position matrix This method updates the position matrix given the current position and @@ -130,10 +148,12 @@ def compute_position(self, swarm, bounds=None): a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh : pyswarms.backend.handlers.BoundaryHandler + a BoundaryHandler instance Returns ------- numpy.ndarray New position-matrix """ - return ops.compute_position(swarm, bounds) + return ops.compute_position(swarm, bounds, bh) diff --git a/pyswarms/discrete/binary.py b/pyswarms/discrete/binary.py index 9adc337d..0526f334 100644 --- a/pyswarms/discrete/binary.py +++ b/pyswarms/discrete/binary.py @@ -60,6 +60,7 @@ from ..backend.operators import compute_pbest from ..backend.topology import Ring +from ..backend.handlers import BoundaryHandler, VelocityHandler from ..base import DiscreteSwarmOptimizer from ..utils.reporter import Reporter @@ -72,6 +73,7 @@ def __init__( options, init_pos=None, velocity_clamp=None, + vh_strategy="unmodified", ftol=-np.inf, ): """Initialize the swarm @@ -105,6 +107,9 @@ def __init__( a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh_strategy : String + a strategy for the handling of the velocity of out-of-bounds particles. + Only the "unmodified" and the "adjust" strategies are allowed. ftol : float relative error in objective_func(best_pos) acceptable for convergence @@ -127,6 +132,7 @@ def __init__( self.reset() # Initialize the topology self.top = Ring(static=False) + self.vh = VelocityHandler(strategy=vh_strategy) self.name = __name__ def optimize(self, objective_func, iters, fast=False, **kwargs): @@ -158,7 +164,6 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): lvl=logging.INFO, ) - self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) for i in self.rep.pbar(iters, self.name): if not fast: sleep(0.01) @@ -166,6 +171,9 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): self.swarm.current_cost = objective_func( self.swarm.position, **kwargs ) + self.swarm.pbest_cost = objective_func( + self.swarm.pbest_pos, **kwargs + ) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest( self.swarm ) @@ -194,7 +202,7 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): break # Perform position velocity update self.swarm.velocity = self.top.compute_velocity( - self.swarm, self.velocity_clamp + self.swarm, self.velocity_clamp, self.vh ) self.swarm.position = self._compute_position(self.swarm) # Obtain the final best_cost and the final best_position diff --git a/pyswarms/single/general_optimizer.py b/pyswarms/single/general_optimizer.py index 069c8c87..b12a0e91 100644 --- a/pyswarms/single/general_optimizer.py +++ b/pyswarms/single/general_optimizer.py @@ -65,6 +65,7 @@ from ..backend.operators import compute_pbest from ..backend.topology import Topology +from ..backend.handlers import BoundaryHandler, VelocityHandler from ..base import SwarmOptimizer from ..utils.reporter import Reporter @@ -77,7 +78,9 @@ def __init__( options, topology, bounds=None, + bh_strategy="periodic", velocity_clamp=None, + vh_strategy="unmodified", center=1.00, ftol=-np.inf, init_pos=None, @@ -137,10 +140,14 @@ def __init__( a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh_strategy : String + a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple (default is :code:`None`) a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh_strategy : String + a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol : float @@ -170,6 +177,8 @@ def __init__( raise TypeError("Parameter `topology` must be a Topology object") else: self.top = topology + self.bh = BoundaryHandler(strategy=bh_strategy) + self.vh = VelocityHandler(strategy=vh_strategy) self.name = __name__ def optimize(self, objective_func, iters, fast=False, **kwargs): @@ -202,12 +211,11 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): "Optimize for {} iters with {}".format(iters, self.options), lvl=logging.INFO, ) - - self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) for i in self.rep.pbar(iters, self.name): # Compute cost for current position and personal best # fmt: off self.swarm.current_cost = objective_func(self.swarm.position, **kwargs) + self.swarm.pbest_cost = objective_func(self.swarm.pbest_pos, **kwargs) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) best_cost_yet_found = self.swarm.best_cost # fmt: on @@ -234,16 +242,14 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): break # Perform velocity and position updates self.swarm.velocity = self.top.compute_velocity( - self.swarm, self.velocity_clamp + self.swarm, self.velocity_clamp, self.vh, self.bounds ) self.swarm.position = self.top.compute_position( - self.swarm, self.bounds + self.swarm, self.bounds, self.bh ) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() - final_best_pos = self.swarm.position[ - self.swarm.pbest_cost.argmin() - ].copy() + final_best_pos = self.swarm.position[self.swarm.pbest_cost.argmin()].copy() # Write report in log and return final cost and position self.rep.log( "Optimization finished | best cost: {}, best pos: {}".format( diff --git a/pyswarms/single/global_best.py b/pyswarms/single/global_best.py index d0e7dc6a..0e30b128 100644 --- a/pyswarms/single/global_best.py +++ b/pyswarms/single/global_best.py @@ -64,6 +64,7 @@ from ..backend.operators import compute_pbest from ..backend.topology import Star +from ..backend.handlers import BoundaryHandler, VelocityHandler from ..base import SwarmOptimizer from ..utils.reporter import Reporter @@ -75,7 +76,9 @@ def __init__( dimensions, options, bounds=None, + bh_strategy="periodic", velocity_clamp=None, + vh_strategy="unmodified", center=1.00, ftol=-np.inf, init_pos=None, @@ -101,10 +104,14 @@ def __init__( a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh_strategy : String + a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple (default is :code:`None`) a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh_strategy : String + a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol : float @@ -131,6 +138,8 @@ def __init__( self.reset() # Initialize the topology self.top = Star() + self.bh = BoundaryHandler(strategy=bh_strategy) + self.vh = VelocityHandler(strategy=vh_strategy) self.name = __name__ def optimize(self, objective_func, iters, fast=False, **kwargs): @@ -162,13 +171,13 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): lvl=logging.INFO, ) - self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) for i in self.rep.pbar(iters, self.name): if not fast: sleep(0.01) # Compute cost for current position and personal best # fmt: off self.swarm.current_cost = objective_func(self.swarm.position, **kwargs) + self.swarm.pbest_cost = objective_func(self.swarm.pbest_pos, **kwargs) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) # Set best_cost_yet_found for ftol best_cost_yet_found = self.swarm.best_cost @@ -193,10 +202,10 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): break # Perform velocity and position updates self.swarm.velocity = self.top.compute_velocity( - self.swarm, self.velocity_clamp + self.swarm, self.velocity_clamp, self.vh, self.bounds ) self.swarm.position = self.top.compute_position( - self.swarm, self.bounds + self.swarm, self.bounds, self.bh ) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() diff --git a/pyswarms/single/local_best.py b/pyswarms/single/local_best.py index 9c7adfe9..268bb9c1 100644 --- a/pyswarms/single/local_best.py +++ b/pyswarms/single/local_best.py @@ -73,6 +73,7 @@ from ..backend.operators import compute_pbest from ..backend.topology import Ring +from ..backend.handlers import BoundaryHandler, VelocityHandler from ..base import SwarmOptimizer from ..utils.reporter import Reporter @@ -84,7 +85,9 @@ def __init__( dimensions, options, bounds=None, + bh_strategy="periodic", velocity_clamp=None, + vh_strategy="unmodified", center=1.00, ftol=-np.inf, init_pos=None, @@ -102,10 +105,14 @@ def __init__( a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. + bh_strategy : String + a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple (default is :code:`(0,1)`) a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. + vh_strategy : String + a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol : float @@ -155,6 +162,8 @@ def __init__( self.reset() # Initialize the topology self.top = Ring(static=static) + self.bh = BoundaryHandler(strategy=bh_strategy) + self.vh = VelocityHandler(strategy=vh_strategy) self.name = __name__ def optimize(self, objective_func, iters, fast=False, **kwargs): @@ -186,7 +195,6 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): lvl=logging.INFO, ) - self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) for i in self.rep.pbar(iters, self.name): if not fast: sleep(0.01) @@ -194,6 +202,9 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): self.swarm.current_cost = objective_func( self.swarm.position, **kwargs ) + self.swarm.pbest_cost = objective_func( + self.swarm.pbest_pos, **kwargs + ) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest( self.swarm ) @@ -221,16 +232,14 @@ def optimize(self, objective_func, iters, fast=False, **kwargs): break # Perform position velocity update self.swarm.velocity = self.top.compute_velocity( - self.swarm, self.velocity_clamp + self.swarm, self.velocity_clamp, self.vh, self.bounds ) self.swarm.position = self.top.compute_position( - self.swarm, self.bounds + self.swarm, self.bounds, self.bh ) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() - final_best_pos = self.swarm.position[ - self.swarm.pbest_cost.argmin(axis=0) - ].copy() + final_best_pos = self.swarm.position[self.swarm.pbest_cost.argmin(axis=0)].copy() # Write report in log and return final cost and position self.rep.log( "Optimization finished | best cost: {}, best pos: {}".format( diff --git a/tests/backend/conftest.py b/tests/backend/conftest.py index 90b19e6b..2d4baf56 100644 --- a/tests/backend/conftest.py +++ b/tests/backend/conftest.py @@ -26,3 +26,57 @@ def swarm(): "options": {"c1": 0.5, "c2": 1, "w": 2}, } return Swarm(**attrs_at_t) + +@pytest.fixture +def bounds(): + bounds_ = (np.array([2, 3, 1]), np.array([4, 7, 8])) + return bounds_ + +@pytest.fixture +def clamp(): + clamp_ = (np.array([2, 3, 1]), np.array([4, 7, 8])) + return clamp_ + +@pytest.fixture +def positions_inbound(): + pos_ = np.array([[3.3, 4.4, 2.3], + [3.7, 5.2, 7.0], + [2.5, 6.8, 2.3], + [2.1, 6.9, 4.7], + [2.7, 3.2, 3.5], + [2.5, 5.1, 1.2] + ]) + return pos_ + +@pytest.fixture +def positions_out_of_bound(): + pos_ = np.array([[5.3, 4.4, 2.3], + [3.7, 9.2, 7.0], + [8.5, 0.8, 2.3], + [2.1, 6.9, 0.7], + [2.7, 9.2, 3.5], + [1.5, 5.1, 9.2] + ]) + return pos_ + +@pytest.fixture +def velocities_inbound(): + pos_ = np.array([[3.3, 4.4, 2.3], + [3.7, 5.2, 7.0], + [2.5, 6.8, 2.3], + [2.1, 6.9, 4.7], + [2.7, 3.2, 3.5], + [2.5, 5.1, 1.2] + ]) + return pos_ + +@pytest.fixture +def velocities_out_of_bound(): + pos_ = np.array([[5.3, 4.4, 2.3], + [3.7, 9.2, 7.0], + [8.5, 0.8, 2.3], + [2.1, 6.9, 0.7], + [2.7, 9.2, 3.5], + [1.5, 5.1, 9.2] + ]) + return pos_ diff --git a/tests/backend/test_handlers.py b/tests/backend/test_handlers.py new file mode 100644 index 00000000..33852efa --- /dev/null +++ b/tests/backend/test_handlers.py @@ -0,0 +1,181 @@ +import pytest +import inspect +import numpy as np + +from pyswarms.backend.handlers import ( + BoundaryHandler, + VelocityHandler, + HandlerMixin, +) +import pyswarms.backend.handlers as h + +bh_strategies = [name for name, _ in inspect.getmembers(h.BoundaryHandler(""), + predicate=inspect.ismethod) if not name.startswith(("__", "_"))] +vh_strategies = [name for name, _ in inspect.getmembers(h.VelocityHandler(""), + predicate=inspect.ismethod) if not name.startswith(("__", "_"))] + +def test_out_of_bounds(bounds, positions_inbound, positions_out_of_bound): + hm = HandlerMixin() + out_of_bounds = hm._out_of_bounds + idx_inbound = out_of_bounds(positions_inbound, bounds) + idx_out_of_bounds = out_of_bounds(positions_out_of_bound, bounds) + + print(bh_strategies) + expected_idx = ( + (np.array([2, 3, 5]), np.array([1, 2, 0])), + (np.array([0, 1, 2, 3, 4, 5]), np.array([0, 1, 1, 2, 1, 0])), + ) + assert np.ravel(idx_inbound[0]).size == 0 + assert np.ravel(idx_inbound[1]).size == 0 + assert ( + np.ravel(idx_out_of_bounds[0]).all() == np.ravel(expected_idx[0]).all() + ) + assert ( + np.ravel(idx_out_of_bounds[1]).all() == np.ravel(expected_idx[1]).all() + ) + +@pytest.mark.parametrize( + "strategy", bh_strategies +) +def test_bound_handling( + bounds, positions_inbound, positions_out_of_bound, strategy +): + bh = BoundaryHandler(strategy=strategy) + # Test if it doesn't handle inbound positions + inbound_handled = bh(positions_inbound, bounds) + assert inbound_handled.all() == positions_inbound.all() + + # Test if all particles are handled to a position inside the boundaries + outbound_handled = bh(positions_out_of_bound, bounds) + lower_than_bound = outbound_handled >= bounds[0] + greater_than_bound = outbound_handled <= bounds[1] + assert lower_than_bound.all() + assert greater_than_bound.all() + +def test_nearest_strategy(bounds, positions_inbound, positions_out_of_bound): + bh = BoundaryHandler(strategy="nearest") + # TODO Add strategy specific tests + + +def test_reflective_strategy( + bounds, positions_inbound, positions_out_of_bound +): + bh = BoundaryHandler(strategy="reflective") + pass + # TODO Add strategy specific tests + + +def test_shrink_strategy(bounds, positions_inbound, positions_out_of_bound): + bh = BoundaryHandler(strategy="shrink") + # TODO Add strategy specific tests + + +def test_random_strategy(bounds, positions_inbound, positions_out_of_bound): + bh = BoundaryHandler(strategy="random") + # TODO Add strategy specific tests + + +def test_intermediate_strategy( + bounds, positions_inbound, positions_out_of_bound +): + bh = BoundaryHandler(strategy="intermediate") + # TODO Add strategy specific tests + + +def test_periodic_strategy(bounds, positions_inbound, positions_out_of_bound): + bh = BoundaryHandler(strategy="periodic") + # TODO Add strategy specific tests + +def assert_clamp( +clamp, +velocities_inbound, +velocities_out_of_bound, +positions_inbound, +positions_out_of_bound, +vh, +bounds=None, +): +# Test if it doesn't handle inclamp velocities + inbound_handled = vh( + velocities_inbound, clamp, position=positions_inbound, bounds=bounds + ) + assert inbound_handled.all() == velocities_inbound.all() + + # Test if all particles are handled to a velocity inside the clamp + outbound_handled = vh( + velocities_out_of_bound, + clamp, + position=positions_out_of_bound, + bounds=bounds, + ) + lower_than_clamp = outbound_handled < clamp[0] + greater_than_clamp = outbound_handled > clamp[1] + assert not lower_than_clamp.all() + assert not greater_than_clamp.all() + + + + +def test_unmodified_strategy( + clamp, velocities_inbound, velocities_out_of_bound +): + vh = VelocityHandler(strategy="unmodified") + inbound_handled = vh(velocities_inbound, clamp) + outbound_handled = vh(velocities_out_of_bound, clamp) + assert inbound_handled.all() == velocities_inbound.all() + assert outbound_handled.all() == velocities_out_of_bound.all() + + +def test_adjust_strategy( + clamp, + velocities_inbound, + velocities_out_of_bound, + positions_inbound, + positions_out_of_bound, +): + vh = VelocityHandler(strategy="adjust") + assert_clamp( + clamp, + velocities_inbound, + velocities_out_of_bound, + positions_inbound, + positions_out_of_bound, + vh, + ) + # TODO Add strategy specific tests + pass + + +def test_invert_strategy( + clamp, + velocities_inbound, + velocities_out_of_bound, + positions_inbound, + positions_out_of_bound, + bounds, +): + vh = VelocityHandler(strategy="invert") + assert_clamp( + clamp, + velocities_inbound, + velocities_out_of_bound, + positions_inbound, + positions_out_of_bound, + vh, + bounds=bounds, + ) + # TODO Add strategy specific tests + pass + + +def test_zero_strategy( + clamp, + velocities_inbound, + velocities_out_of_bound, + positions_inbound, + positions_out_of_bound, + bounds, +): + vh = VelocityHandler(strategy="zero") + # TODO Add strategy specific tests + pass diff --git a/tests/backend/test_operators.py b/tests/backend/test_operators.py index 224dcc04..94e6dd6b 100644 --- a/tests/backend/test_operators.py +++ b/tests/backend/test_operators.py @@ -7,6 +7,7 @@ # Import from pyswarms import pyswarms.backend as P +from pyswarms.backend.handlers import BoundaryHandler, VelocityHandler class TestComputePbest(object): @@ -33,24 +34,29 @@ class TestComputeVelocity(object): @pytest.mark.parametrize("clamp", [None, (0, 1), (-1, 1)]) def test_return_values(self, swarm, clamp): """Test if method gives the expected shape and range""" - v = P.compute_velocity(swarm, clamp) + vh = VelocityHandler(strategy="unmodified") + v = P.compute_velocity(swarm, clamp, vh) assert v.shape == swarm.position.shape if clamp is not None: assert (clamp[0] <= v).all() and (clamp[1] >= v).all() @pytest.mark.parametrize("swarm", [0, (1, 2, 3)]) - def test_input_swarm(self, swarm): + @pytest.mark.parametrize("vh_strat", ["unmodified", "zero", "invert", "adjust"]) + def test_input_swarm(self, swarm, vh_strat): """Test if method raises AttributeError with wrong swarm""" + vh = VelocityHandler(strategy=vh_strat) with pytest.raises(AttributeError): - P.compute_velocity(swarm, clamp=(0, 1)) + P.compute_velocity(swarm, clamp=(0, 1), vh=vh) @pytest.mark.parametrize("options", [{"c1": 0.5, "c2": 0.3}]) - def test_missing_kwargs(self, swarm, options): + @pytest.mark.parametrize("vh_strat", ["unmodified", "zero", "invert", "adjust"]) + def test_missing_kwargs(self, swarm, options, vh_strat): """Test if method raises KeyError with missing kwargs""" + vh = VelocityHandler(strategy=vh_strat) with pytest.raises(KeyError): swarm.options = options clamp = (0, 1) - P.compute_velocity(swarm, clamp) + P.compute_velocity(swarm, clamp, vh) class TestComputePosition(object): @@ -60,15 +66,21 @@ class TestComputePosition(object): "bounds", [None, ([-5, -5, -5], [5, 5, 5]), ([-10, -10, -10], [10, 10, 10])], ) - def test_return_values(self, swarm, bounds): + @pytest.mark.parametrize("bh_strat", ["nearest", "random"]) + def test_return_values(self, swarm, bounds, bh_strat): """Test if method gives the expected shape and range""" - p = P.compute_position(swarm, bounds) + bh = BoundaryHandler(strategy=bh_strat) + p = P.compute_position(swarm, bounds, bh) assert p.shape == swarm.velocity.shape if bounds is not None: assert (bounds[0] <= p).all() and (bounds[1] >= p).all() - + @pytest.mark.parametrize("swarm", [0, (1, 2, 3)]) - def test_input_swarm(self, swarm): + @pytest.mark.parametrize("bh_strat", ["nearest", "random", "shrink", + "intermediate"]) + def test_input_swarm(self, swarm, bh_strat): """Test if method raises AttributeError with wrong swarm""" + bh = BoundaryHandler(strategy=bh_strat) with pytest.raises(AttributeError): - P.compute_position(swarm, bounds=([-5, -5], [5, 5])) + P.compute_position(swarm, bounds=([-5, -5], [5, 5]), bh=bh) + diff --git a/tests/backend/topology/test_pyramid.py b/tests/backend/topology/test_pyramid.py index 02cd131e..eb3e0b16 100644 --- a/tests/backend/topology/test_pyramid.py +++ b/tests/backend/topology/test_pyramid.py @@ -28,9 +28,7 @@ def test_compute_gbest_return_values( """Test if compute_gbest() gives the expected return values""" topo = topology(static=static) expected_cost = 1.0002528364353296 - expected_pos = np.array( - [9.90438476e-01, 2.50379538e-03, 1.87405987e-05] - ) + expected_pos = np.array([9.90438476e-01, 2.50379538e-03, 1.87405987e-05]) pos, cost = topo.compute_gbest(swarm, **options) assert cost == pytest.approx(expected_cost) assert pos[np.argmin(cost)] == pytest.approx(expected_pos) diff --git a/tests/backend/topology/test_random.py b/tests/backend/topology/test_random.py index 8c675036..50ef3235 100644 --- a/tests/backend/topology/test_random.py +++ b/tests/backend/topology/test_random.py @@ -29,17 +29,12 @@ def test_compute_gbest_return_values( """Test if update_gbest_neighborhood gives the expected return values""" topo = topology(static=static) expected_cost = 1.0002528364353296 - expected_pos = np.array( - [9.90438476e-01, 2.50379538e-03, 1.87405987e-05] - ) - expected_pos_2 = np.array( - [9.98033031e-01, 4.97392619e-03, 3.07726256e-03] - ) + expected_pos = np.array([9.90438476e-01, 2.50379538e-03, 1.87405987e-05]) + expected_pos_2 = np.array([9.98033031e-01, 4.97392619e-03, 3.07726256e-03]) pos, cost = topo.compute_gbest(swarm, **options) assert cost == pytest.approx(expected_cost) - assert (pos[np.argmin(cost)] == pytest.approx(expected_pos)) or ( - pos[np.argmin(cost)] == pytest.approx(expected_pos_2) - ) + assert ((pos[np.argmin(cost)] == pytest.approx(expected_pos)) or + (pos[np.argmin(cost)] == pytest.approx(expected_pos_2))) @pytest.mark.parametrize("static", [True, False]) @pytest.mark.parametrize("k", [1, 2]) diff --git a/tests/backend/topology/test_ring.py b/tests/backend/topology/test_ring.py index 8d55c81c..03fa8948 100644 --- a/tests/backend/topology/test_ring.py +++ b/tests/backend/topology/test_ring.py @@ -30,13 +30,8 @@ def test_compute_gbest_return_values(self, swarm, topology, p, k, static): topo = topology(static=static) pos, cost = topo.compute_gbest(swarm, p=p, k=k) expected_cost = 1.0002528364353296 - expected_pos = np.array( - [9.90438476e-01, 2.50379538e-03, 1.87405987e-05] - ) - expected_pos_2 = np.array( - [9.98033031e-01, 4.97392619e-03, 3.07726256e-03] - ) + expected_pos = np.array([9.90438476e-01, 2.50379538e-03, 1.87405987e-05]) + expected_pos_2 = np.array([9.98033031e-01, 4.97392619e-03, 3.07726256e-03]) assert cost == pytest.approx(expected_cost) - assert (pos[np.argmin(cost)] == pytest.approx(expected_pos)) or ( - pos[np.argmin(cost)] == pytest.approx(expected_pos_2) - ) + assert ((pos[np.argmin(cost)] == pytest.approx(expected_pos)) or + (pos[np.argmin(cost)] == pytest.approx(expected_pos_2))) diff --git a/tests/backend/topology/test_von_neumann.py b/tests/backend/topology/test_von_neumann.py index 7331189b..7e3eed53 100644 --- a/tests/backend/topology/test_von_neumann.py +++ b/tests/backend/topology/test_von_neumann.py @@ -27,17 +27,12 @@ def test_update_gbest_neighborhood(self, swarm, topology, p, r): """Test if update_gbest_neighborhood gives the expected return values""" topo = topology() pos, cost = topo.compute_gbest(swarm, p=p, r=r) - expected_pos = np.array( - [9.90438476e-01, 2.50379538e-03, 1.87405987e-05] - ) - expected_pos_2 = np.array( - [9.98033031e-01, 4.97392619e-03, 3.07726256e-03] - ) + expected_pos = np.array([9.90438476e-01, 2.50379538e-03, 1.87405987e-05]) + expected_pos_2 = np.array([9.98033031e-01, 4.97392619e-03, 3.07726256e-03]) expected_cost = 1.0002528364353296 assert cost == pytest.approx(expected_cost) - assert (pos[np.argmin(cost)] == pytest.approx(expected_pos)) or ( - pos[np.argmin(cost)] == pytest.approx(expected_pos_2) - ) + assert ((pos[np.argmin(cost)] == pytest.approx(expected_pos)) or + (pos[np.argmin(cost)] == pytest.approx(expected_pos_2))) @pytest.mark.parametrize("m", [i for i in range(3)]) @pytest.mark.parametrize("n", [i for i in range(3)])