Source code for cbx.dynamics.pso

import numpy as np
from typing import Union
#from scipy.special import logsumexp

from .pdyn import CBXDynamic

#%% CBO_Memory
[docs] class PSO(CBXDynamic): r"""Particle Swarm Optimization class This class implements the PSO algorithm as described in [1]_ and [2]_. The algorithm is a particle dynamic algorithm that is used to minimize the objective function :math:`f(x)`. Parameters ---------- f : objective The objective function :math:`f(x)` of the system. x : array_like, shape (N, d) The initial positions of the particles. For a system of :math:`N` particles, the i-th row of this array ``x[i,:]`` represents the position :math:`x_i` of the i-th particle. y : array_like, shape (N, d) The initial positions of the particles. For a system of :math:`N` particles, the i-th row of this array ``y[i,:]`` represents the or an approximation of the historical best position :math:`y_i` of the i-th particle. v : array_like, shape (N, d) The initial velocities of the particles. For a system of :math:`N` particles, the i-th row of this array ``y[i,:]`` represents the or an approximation of the historical best position :math:`y_i` of the i-th particle. dt : float, optional The parameter :math:`dt` of the system. The default is 0.1. alpha : float, optional The heat parameter :math:`\alpha` of the system. The default is 1.0. m : float, optional The inertia :math:`m` of the system. The default is 0.1. gamma : float, optional The friction coefficient :math:`\gamma` of the system. The default is 1-m. lamda : float, optional The decay parameter :math:`\lambda` of the system. The default is 1.0. noise : noise_model, optional The noise model that is used to compute the noise vector. The default is ``normal_noise(dt=0.1)``. sigma : float, optional The parameter :math:`\sigma` of the noise model. The default is 1.0. lamda_memory : float, optional The decay parameter :math:`\lambda_{\text{memory}}` of the system. The default is 1.0. sigma_memory : float, optional The parameter :math:`\sigma_{\text{memory}}` of the noise model. The default is 1.0. References ---------- .. [1] Grassi, S. & Pareschi, L. (2021). From particle swarm optimization to consensus based optimization: stochastic modeling and mean-field limit. Math. Models Methods Appl. Sci., 31(8):1625–1657. .. [2] Huang, H. & Qiu, J. & Riedl, K (2023). On the global convergence of particle swarm optimization methods. Appl. Math. Optim., 88(2):30. """ def __init__(self, f, m: float = 0.001, gamma: Union[float, None] = None, lamda_memory: float = 0.4, sigma_memory: Union[float, None] = None, **kwargs) -> None: super(PSO, self).__init__(f, **kwargs) self.m = m if gamma is None: self.gamma = 1 - m else: self.gamma = gamma self.lamda_memory = lamda_memory # init velocities of particles self.v = np.zeros(self.x.shape) # init historical best positions of particles self.y = self.copy(self.x) if sigma_memory is None: self.sigma_memory = self.lamda_memory * self.sigma else: self.sigma_memory = sigma_memory self.energy = self.f(self.x) self.num_f_eval += np.ones(self.M, dtype=int) * self.N # update number of function evaluations
[docs] def pre_step(self,): # save old positions self.x_old = self.copy(self.x) # save old positions self.y_old = self.copy(self.y) # save old positions self.v_old = self.copy(self.v) # save old velocities # set new batch indices self.set_batch_idx()
[docs] def inner_step(self,) -> None: r"""Performs one step of the PSO algorithm. Parameters ---------- None Returns ------- None """ mind = self.consensus_idx ind = self.particle_idx # first update self.consensus = self.compute_consensus(self.y[mind], self.energy[mind]) consensus_drift = self.x[ind] - self.consensus memory_drift = self.x[ind] - self.y[ind] # perform noise steps # **NOTE**: noise always uses the ``drift`` attribute of the dynamic. # We first use the actual drift here and # then the memory difference self.drift = consensus_drift self.s_consensus = self.sigma * self.noise() self.drift = memory_drift self.s_memory = self.sigma_memory * self.noise() # dynamcis update # velocities of particles self.v[ind] = ( self.m * self.dt * self.v[ind] + self.correction(self.lamda * self.dt * consensus_drift) + self.lamda_memory * self.dt * memory_drift + self.s_consensus + self.s_memory)/(self.m+self.gamma*self.dt) # momentaneous positions of particles self.x[ind] = self.x[ind] + self.dt * self.v[ind] # evaluation of objective function on all particles energy_new = self.f(self.x[ind]) self.num_f_eval += np.ones(self.M, dtype =int) * self.x[ind].shape[-2] # update number of function evaluations # historical best positions of particles energy_expand = tuple([Ellipsis] + [None for _ in range(self.x.ndim-2)]) self.y[ind] = self.y[ind] + ((self.energy>energy_new)[energy_expand]) * (self.x[ind] - self.y[ind]) self.energy = np.minimum(self.energy, energy_new)
[docs] def compute_consensus(self, x_batch, energy) -> None: r"""Updates the weighted mean of the particles. Parameters ---------- None Returns ------- None """ c, _ = self._compute_consensus(energy, self.x[self.consensus_idx], self.alpha[self.active_runs_idx, :]) return c