diff --git a/pyswarms/backend/operators.py b/pyswarms/backend/operators.py index 18db8d1e..57308a4e 100644 --- a/pyswarms/backend/operators.py +++ b/pyswarms/backend/operators.py @@ -13,12 +13,11 @@ # Import modules import numpy as np -from scipy.spatial import cKDTree # Create a logger logger = logging.getLogger(__name__) -def update_pbest(swarm): +def compute_pbest(swarm): """Takes a swarm instance and updates the personal best scores You can use this method to update your personal best positions. @@ -71,97 +70,7 @@ def update_pbest(swarm): else: return (new_pbest_pos, new_pbest_cost) -def update_gbest(swarm): - """Updates the global best given the cost and the position - - This method takes the current pbest_pos and pbest_cost, then returns - the minimum cost and position from the matrix. It should be used in - tandem with an if statement - - .. code-block:: python - - import pyswarms.backend as P - from pyswarms.backend.swarms import Swarm - - my_swarm = P.create_swarm(n_particles, dimensions) - - # If the minima of the pbest_cost is less than the best_cost - if np.min(pbest_cost) < best_cost: - # Update best_cost and position - swarm.best_pos, swarm.best_cost = P.update_gbest(my_swarm) - - Parameters - ---------- - swarm : pyswarms.backend.swarm.Swarm - a Swarm instance - - Returns - ------- - numpy.ndarray - Best position of shape :code:`(n_dimensions, )` - float - Best cost - """ - try: - best_pos = swarm.pbest_pos[np.argmin(swarm.pbest_cost)] - best_cost = np.min(swarm.pbest_cost) - except AttributeError: - msg = 'Please pass a Swarm class. You passed {}'.format(type(swarm)) - logger.error(msg) - raise - else: - return (best_pos, best_cost) - -def update_gbest_neighborhood(swarm, p, k): - """Updates the global best using a neighborhood approach - - This uses the cKDTree method from :code:`scipy` to obtain the nearest - neighbours - - Parameters - ---------- - swarm : pyswarms.backend.swarms.Swarm - a Swarm instance - k : int - number of neighbors to be considered. Must be a - positive integer less than :code:`n_particles` - p: int {1,2} - the Minkowski p-norm to use. 1 is the - sum-of-absolute values (or L1 distance) while 2 is - the Euclidean (or L2) distance. - - Returns - ------- - numpy.ndarray - Best position of shape :code:`(n_dimensions, )` - float - Best cost - """ - try: - # Obtain the nearest-neighbors for each particle - tree = cKDTree(swarm.position) - _, idx = tree.query(swarm.position, p=p, k=k) - - # Map the computed costs to the neighbour indices and take the - # argmin. If k-neighbors is equal to 1, then the swarm acts - # independently of each other. - if k == 1: - # The minimum index is itself, no mapping needed. - best_neighbor = swarm.pbest_cost[idx][:, np.newaxis].argmin(axis=1) - else: - idx_min = swarm.pbest_cost[idx].argmin(axis=1) - best_neighbor = idx[np.arange(len(idx)), idx_min] - # Obtain best cost and position - best_cost = np.min(swarm.pbest_cost[best_neighbor]) - best_pos = swarm.pbest_pos[np.argmin(swarm.pbest_cost[best_neighbor])] - except AttributeError: - msg = 'Please pass a Swarm class. You passed {}'.format(type(swarm)) - logger.error(msg) - raise - else: - return (best_pos, best_cost) - -def update_velocity(swarm, clamp): +def compute_velocity(swarm, clamp): """Updates the velocity matrix This method updates the velocity matrix using the best and current @@ -225,7 +134,7 @@ def update_velocity(swarm, clamp): else: return updated_velocity -def update_position(swarm, bounds): +def compute_position(swarm, bounds): """Updates the position matrix This method updates the position matrix given the current position and diff --git a/pyswarms/backend/topology/__init__.py b/pyswarms/backend/topology/__init__.py new file mode 100644 index 00000000..eef1b585 --- /dev/null +++ b/pyswarms/backend/topology/__init__.py @@ -0,0 +1,16 @@ +""" +The :code:`pyswarms.backend.topology` contains various topologies that dictate +particle behavior. These topologies implement three methods: + - compute_best_particle(): gets the position and cost of the best particle in the swarm + - update_velocity(): updates the velocity-matrix depending on the topology. + - update_position(): updates the position-matrix depending on the topology. +""" + +from .star import Star +from .ring import Ring + + +__all__ = [ + "Star", + "Ring" +] \ No newline at end of file diff --git a/pyswarms/backend/topology/base.py b/pyswarms/backend/topology/base.py new file mode 100644 index 00000000..f5287507 --- /dev/null +++ b/pyswarms/backend/topology/base.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +""" +Base class for Topologies +""" + +class Topology(object): + + def __init__(self, **kwargs): + """Initializes the class""" + pass + + def compute_gbest(self, swarm): + """Computes the best particle of the swarm and returns the cost and + position""" + raise NotImplementedError("Topology::compute_gbest()") + + def compute_position(self, swarm): + """Updates the swarm's position-matrix""" + raise NotImplementedError("Topology::compute_position()") + + def compute_velocity(self, swarm): + """Updates the swarm's velocity-matrix""" + raise NotImplementedError("Topology::compute_velocity()") \ No newline at end of file diff --git a/pyswarms/backend/topology/ring.py b/pyswarms/backend/topology/ring.py new file mode 100644 index 00000000..c6cbe899 --- /dev/null +++ b/pyswarms/backend/topology/ring.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- + +""" +A Ring Network Topology + +This class implements a star topology where all particles are connected in a +ring-like fashion. This social behavior is often found in LocalBest PSO +optimizers. +""" + +# Import from stdlib +import logging + +# Import modules +import numpy as np +from scipy.spatial import cKDTree + +# Import from package +from .. import operators as ops +from .base import Topology + +# Create a logger +logger = logging.getLogger(__name__) + +class Ring(Topology): + + def __init__(self): + super(Ring, self).__init__() + + def compute_gbest(self, swarm, p, k): + """Updates the global best using a neighborhood approach + + This uses the cKDTree method from :code:`scipy` to obtain the nearest + neighbours + + Parameters + ---------- + swarm : pyswarms.backend.swarms.Swarm + a Swarm instance + k : int + number of neighbors to be considered. Must be a + positive integer less than :code:`n_particles` + p: int {1,2} + the Minkowski p-norm to use. 1 is the + sum-of-absolute values (or L1 distance) while 2 is + the Euclidean (or L2) distance. + + Returns + ------- + numpy.ndarray + Best position of shape :code:`(n_dimensions, )` + float + Best cost + """ + try: + # Obtain the nearest-neighbors for each particle + tree = cKDTree(swarm.position) + _, idx = tree.query(swarm.position, p=p, k=k) + + # Map the computed costs to the neighbour indices and take the + # argmin. If k-neighbors is equal to 1, then the swarm acts + # independently of each other. + if k == 1: + # The minimum index is itself, no mapping needed. + best_neighbor = swarm.pbest_cost[idx][:, np.newaxis].argmin(axis=1) + else: + idx_min = swarm.pbest_cost[idx].argmin(axis=1) + best_neighbor = idx[np.arange(len(idx)), idx_min] + # Obtain best cost and position + best_cost = np.min(swarm.pbest_cost[best_neighbor]) + best_pos = swarm.pbest_pos[np.argmin(swarm.pbest_cost[best_neighbor])] + except AttributeError: + msg = 'Please pass a Swarm class. You passed {}'.format(type(swarm)) + logger.error(msg) + raise + else: + return (best_pos, best_cost) + + def compute_velocity(self, swarm, clamp): + """Computes the velocity matrix + + This method updates the velocity matrix using the best and current + positions of the swarm. The velocity matrix is computed using the + cognitive and social terms of the swarm. + + A sample usage can be seen with the following: + + .. code-block :: python + + import pyswarms.backend as P + from pyswarms.swarms.backend import Swarm + from pyswarms.backend.topology import Star + + my_swarm = P.create_swarm(n_particles, dimensions) + my_topology = Ring() + + for i in range(iters): + # Inside the for-loop + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + + Parameters + ---------- + swarm : pyswarms.backend.swarms.Swarm + a Swarm instance + clamp : tuple of floats (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum velocity + and the second entry is the maximum velocity. It + sets the limits for velocity clamping. + + Returns + ------- + numpy.ndarray + Updated velocity matrix + """ + return ops.compute_velocity(swarm, clamp) + + def compute_position(self, swarm, bounds): + """Updates the position matrix + + This method updates the position matrix given the current position and + the velocity. If bounded, it waives updating the position. + + Parameters + ---------- + swarm : pyswarms.backend.swarms.Swarm + a Swarm instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. + + Returns + ------- + numpy.ndarray + New position-matrix + """ + return ops.compute_position(swarm, bounds) \ No newline at end of file diff --git a/pyswarms/backend/topology/star.py b/pyswarms/backend/topology/star.py new file mode 100644 index 00000000..459d2685 --- /dev/null +++ b/pyswarms/backend/topology/star.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- + +""" +A Star Network Topology + +This class implements a star topology where all particles are connected to +one another. This social behavior is often found in GlobalBest PSO +optimizers. +""" + +# Import from stdlib +import logging + +# Import modules +import numpy as np + +# Import from package +from .. import operators as ops +from .base import Topology + +# Create a logger +logger = logging.getLogger(__name__) + +class Star(Topology): + + def __init__(self): + super(Star, self).__init__() + + def compute_gbest(self, swarm): + """Obtains the global best cost and position based on a star topology + + This method takes the current pbest_pos and pbest_cost, then returns + the minimum cost and position from the matrix. It should be used in + tandem with an if statement + + .. code-block:: python + + import pyswarms.backend as P + from pyswarms.backend.swarms import Swarm + from pyswarm.backend.topology import Star + + my_swarm = P.create_swarm(n_particles, dimensions) + my_topology = Star() + + # If the minima of the pbest_cost is less than the best_cost + if np.min(pbest_cost) < best_cost: + # Update best_cost and position + swarm.best_pos, swarm.best_cost = my_topology.compute_best_particle(my_swarm) + + Parameters + ---------- + swarm : pyswarms.backend.swarm.Swarm + a Swarm instance + + Returns + ------- + numpy.ndarray + Best position of shape :code:`(n_dimensions, )` + float + Best cost + """ + try: + best_pos = swarm.pbest_pos[np.argmin(swarm.pbest_cost)] + best_cost = np.min(swarm.pbest_cost) + except AttributeError: + msg = 'Please pass a Swarm class. You passed {}'.format(type(swarm)) + logger.error(msg) + raise + else: + return (best_pos, best_cost) + + def compute_velocity(self, swarm, clamp): + """Computes the velocity matrix + + This method updates the velocity matrix using the best and current + positions of the swarm. The velocity matrix is computed using the + cognitive and social terms of the swarm. + + A sample usage can be seen with the following: + + .. code-block :: python + + import pyswarms.backend as P + from pyswarms.swarms.backend import Swarm + from pyswarms.backend.topology import Star + + my_swarm = P.create_swarm(n_particles, dimensions) + my_topology = Star() + + for i in range(iters): + # Inside the for-loop + my_swarm.velocity = my_topology.update_velocity(my_swarm, clamp) + + Parameters + ---------- + swarm : pyswarms.backend.swarms.Swarm + a Swarm instance + clamp : tuple of floats (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum velocity + and the second entry is the maximum velocity. It + sets the limits for velocity clamping. + + Returns + ------- + numpy.ndarray + Updated velocity matrix + """ + return ops.compute_velocity(swarm, clamp) + + def compute_position(self, swarm, bounds): + """Updates the position matrix + + This method updates the position matrix given the current position and + the velocity. If bounded, it waives updating the position. + + Parameters + ---------- + swarm : pyswarms.backend.swarms.Swarm + a Swarm instance + bounds : tuple of :code:`np.ndarray` or list (default is :code:`None`) + a tuple of size 2 where the first entry is the minimum bound while + the second entry is the maximum bound. Each array must be of shape + :code:`(dimensions,)`. + + Returns + ------- + numpy.ndarray + New position-matrix + """ + return ops.compute_position(swarm, bounds) \ No newline at end of file diff --git a/pyswarms/single/global_best.py b/pyswarms/single/global_best.py index 34bea573..7b28e288 100644 --- a/pyswarms/single/global_best.py +++ b/pyswarms/single/global_best.py @@ -63,7 +63,8 @@ # Import from package from ..base import SwarmOptimizer -from ..backend.operators import (update_pbest, update_gbest, update_velocity, update_position) +from ..backend.operators import compute_pbest +from ..backend.topology import Star from ..utils.console_utils import cli_print, end_report @@ -112,6 +113,8 @@ def __init__(self, n_particles, dimensions, options, self.assertions() # Initialize the resettable attributes self.reset() + # Initialize the topology + self.top = Star() def optimize(self, objective_func, iters, print_step=1, verbose=1): """Optimizes the swarm for a number of iterations. @@ -139,11 +142,11 @@ def optimize(self, objective_func, iters, print_step=1, verbose=1): # Compute cost for current position and personal best self.swarm.current_cost = objective_func(self.swarm.position) self.swarm.pbest_cost = objective_func(self.swarm.pbest_pos) - self.swarm.pbest_pos, self.swarm.pbest_cost = update_pbest(self.swarm) + self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) best_cost_yet_found = self.swarm.best_cost # Get minima of pbest and check if it's less than gbest if np.min(self.swarm.pbest_cost) < self.swarm.best_cost: - self.swarm.best_pos, self.swarm.best_cost = update_gbest(self.swarm) + self.swarm.best_pos, self.swarm.best_cost = self.top.compute_gbest(self.swarm) # Print to console if i % print_step == 0: cli_print('Iteration %s/%s, cost: %s' % @@ -161,8 +164,8 @@ def optimize(self, objective_func, iters, print_step=1, verbose=1): if np.abs(self.swarm.best_cost - best_cost_yet_found) < relative_measure: break # Perform velocity and position updates - self.swarm.velocity = update_velocity(self.swarm, self.velocity_clamp) - self.swarm.position = update_position(self.swarm, self.bounds) + self.swarm.velocity = self.top.compute_velocity(self.swarm, self.velocity_clamp) + self.swarm.position = self.top.compute_position(self.swarm, self.bounds) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() final_best_pos = self.swarm.best_pos.copy() diff --git a/pyswarms/single/local_best.py b/pyswarms/single/local_best.py index 8279aa56..5a645909 100644 --- a/pyswarms/single/local_best.py +++ b/pyswarms/single/local_best.py @@ -72,7 +72,8 @@ # Import from package from ..base import SwarmOptimizer -from ..backend.operators import (update_pbest, update_gbest_neighborhood, update_position, update_velocity) +from ..backend.operators import compute_pbest +from ..backend.topology import Ring from ..utils.console_utils import cli_print, end_report @@ -152,6 +153,8 @@ def __init__(self, n_particles, dimensions, options, bounds=None, self.assertions() # Initialize the resettable attributes self.reset() + # Initialize the topology + self.top = Ring() def optimize(self, objective_func, iters, print_step=1, verbose=1): """Optimizes the swarm for a number of iterations. @@ -180,10 +183,12 @@ def optimize(self, objective_func, iters, print_step=1, verbose=1): # Compute cost for current position and personal best self.swarm.current_cost = objective_func(self.swarm.position) self.swarm.pbest_cost = objective_func(self.swarm.pbest_pos) - self.swarm.pbest_pos, self.swarm.pbest_cost = update_pbest(self.swarm) + self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) best_cost_yet_found = np.min(self.swarm.best_cost) # Update gbest from neighborhood - self.swarm.best_pos, self.swarm.best_cost = update_gbest_neighborhood(self.swarm, self.p, self.k) + self.swarm.best_pos, self.swarm.best_cost = self.top.compute_gbest(self.swarm, + self.p, + self.k) # Print to console if i % print_step == 0: cli_print('Iteration %s/%s, cost: %s' % @@ -201,8 +206,8 @@ def optimize(self, objective_func, iters, print_step=1, verbose=1): if np.abs(self.swarm.best_cost - best_cost_yet_found) < relative_measure: break # Perform position velocity update - self.swarm.velocity = update_velocity(self.swarm, self.velocity_clamp) - self.swarm.position = update_position(self.swarm, self.bounds) + self.swarm.velocity = self.top.compute_velocity(self.swarm, self.velocity_clamp) + self.swarm.position = self.top.compute_position(self.swarm, self.bounds) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() final_best_pos = self.swarm.best_pos.copy() diff --git a/tests/backend/test_operators.py b/tests/backend/test_operators.py index de667ee5..ccaa8517 100644 --- a/tests/backend/test_operators.py +++ b/tests/backend/test_operators.py @@ -9,46 +9,27 @@ import pyswarms.backend as P -def test_update_pbest_return_values(swarm): - """Test if update_pbest() gives the expected return values""" +def test_compute_pbest_return_values(swarm): + """Test if compute_pbest() gives the expected return values""" expected_cost = np.array([1,2,2]) expected_pos = np.array([[1,2,3], [4,5,6], [1,1,1]]) - pos, cost = P.update_pbest(swarm) + pos, cost = P.compute_pbest(swarm) assert (pos == expected_pos).all() assert (cost == expected_cost).all() -def test_update_gbest_return_values(swarm): - """Test if update_gbest() gives the expected return values""" - expected_cost = 1 - expected_pos = np.array([1,2,3]) - pos, cost = P.update_gbest(swarm) - assert cost == expected_cost - assert (pos == expected_pos).all() - @pytest.mark.parametrize('clamp', [None, (0,1), (-1,1)]) -def test_update_velocity_return_values(swarm, clamp): - """Test if update_velocity() gives the expected shape and range""" - v = P.update_velocity(swarm, clamp) +def test_compute_velocity_return_values(swarm, clamp): + """Test if compute_velocity() gives the expected shape and range""" + v = P.compute_velocity(swarm, clamp) assert v.shape == swarm.position.shape if clamp is not None: assert (clamp[0] <= v).all() and (clamp[1] >= v).all() @pytest.mark.parametrize('bounds', [None, ([-5,-5,-5],[5,5,5]), ([-10, -10, -10],[10, 10, 10])]) -def test_update_position_return_values(swarm, bounds): - """Test if update_position() gives the expected shape and range""" - p = P.update_position(swarm, bounds) +def test_compute_position_return_values(swarm, bounds): + """Test if compute_position() gives the expected shape and range""" + p = P.compute_position(swarm, bounds) assert p.shape == swarm.velocity.shape if bounds is not None: assert (bounds[0] <= p).all() and (bounds[1] >= p).all() - -@pytest.mark.parametrize('k', [1,2,3]) -@pytest.mark.parametrize('p', [1,2]) -def test_update_gbest_neighborhood(swarm, p, k): - """Test if update_gbest_neighborhood gives the expected return values""" - pos, cost = P.update_gbest_neighborhood(swarm, p=p, k=k) - expected_pos = np.array([1,2,3]) - expected_cost = 1 - print('k={} p={}, pos={} cost={}'.format(k,p,pos,cost)) - assert (pos == expected_pos).all() - assert cost == expected_cost \ No newline at end of file diff --git a/tests/backend/topology/__init__.py b/tests/backend/topology/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/backend/topology/conftest.py b/tests/backend/topology/conftest.py new file mode 100644 index 00000000..40228fc2 --- /dev/null +++ b/tests/backend/topology/conftest.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Fixtures for tests""" + +# Import modules +import pytest +import numpy as np + +# Import from package +from pyswarms.backend.swarms import Swarm + +@pytest.fixture +def swarm(): + """A contrived instance of the Swarm class at a certain timestep""" + attrs_at_t = { + 'position' : np.array([[5,5,5], [3,3,3], [1,1,1]]), + 'velocity' : np.array([[1,1,1], [1,1,1], [1,1,1]]), + 'current_cost' : np.array([2,2,2]), + 'pbest_cost' : np.array([1,2,3]), + 'pbest_pos' : np.array([[1,2,3], [4,5,6], [7,8,9]]), + 'best_cost' : 1, + 'best_pos' : np.array([1,1,1]), + 'behavior' : {'c1' : 0.5, 'c2': 1, 'w': 2} + } + return Swarm(**attrs_at_t) + diff --git a/tests/backend/topology/test_ring.py b/tests/backend/topology/test_ring.py new file mode 100644 index 00000000..bebf71e8 --- /dev/null +++ b/tests/backend/topology/test_ring.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Import modules +import pytest +import numpy as np + +# Import from package +from pyswarms.backend.topology import Ring + + +@pytest.mark.parametrize('k', [1,2,3]) +@pytest.mark.parametrize('p', [1,2]) +def test_update_gbest_neighborhood(swarm, p, k): + """Test if update_gbest_neighborhood gives the expected return values""" + topology = Ring() + pos, cost = topology.compute_gbest(swarm, p=p, k=k) + expected_pos = np.array([1,2,3]) + expected_cost = 1 + assert (pos == expected_pos).all() + assert cost == expected_cost + +@pytest.mark.parametrize('clamp', [None, (0,1), (-1,1)]) +def test_compute_velocity_return_values(swarm, clamp): + """Test if compute_velocity() gives the expected shape and range""" + topology = Ring() + v = topology.compute_velocity(swarm, clamp) + assert v.shape == swarm.position.shape + if clamp is not None: + assert (clamp[0] <= v).all() and (clamp[1] >= v).all() + +@pytest.mark.parametrize('bounds', [None, ([-5,-5,-5],[5,5,5]), + ([-10, -10, -10],[10, 10, 10])]) +def test_compute_position_return_values(swarm, bounds): + """Test if compute_position() gives the expected shape and range""" + topology = Ring() + p = topology.compute_position(swarm, bounds) + assert p.shape == swarm.velocity.shape + if bounds is not None: + assert (bounds[0] <= p).all() and (bounds[1] >= p).all() diff --git a/tests/backend/topology/test_star.py b/tests/backend/topology/test_star.py new file mode 100644 index 00000000..affd6770 --- /dev/null +++ b/tests/backend/topology/test_star.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Import modules +import pytest +import numpy as np + +# Import from package +from pyswarms.backend.topology import Star + + +def test_compute_gbest_return_values(swarm): + """Test if compute_gbest() gives the expected return values""" + topology = Star() + expected_cost = 1 + expected_pos = np.array([1,2,3]) + pos, cost = topology.compute_gbest(swarm) + assert cost == expected_cost + assert (pos == expected_pos).all() + +@pytest.mark.parametrize('clamp', [None, (0,1), (-1,1)]) +def test_compute_velocity_return_values(swarm, clamp): + """Test if compute_velocity() gives the expected shape and range""" + topology = Star() + v = topology.compute_velocity(swarm, clamp) + assert v.shape == swarm.position.shape + if clamp is not None: + assert (clamp[0] <= v).all() and (clamp[1] >= v).all() + +@pytest.mark.parametrize('bounds', [None, ([-5,-5,-5],[5,5,5]), + ([-10, -10, -10],[10, 10, 10])]) +def test_compute_position_return_values(swarm, bounds): + """Test if compute_position() gives the expected shape and range""" + topology = Star() + p = topology.compute_position(swarm, bounds) + assert p.shape == swarm.velocity.shape + if bounds is not None: + assert (bounds[0] <= p).all() and (bounds[1] >= p).all() \ No newline at end of file