Dynamics#

One of the core components in the CBX package are dynamics, which are used to represent different variants of consensus-based algorithms. Each dynamic inherits from the base class CBXDynamic, which implements the core functionality for consensus-based algorithms. This base class itself inherits from ParticleDynamic, which implements functionality specific to particle-based algorithms. The design choice here was to divide between principles common to iterative particle-based algorithms and principles specific to consensus-based algorithms.

Optimization with dynamics#

In the simplest case, when selecting a specific dynamic and aiming to optimize a function, you can utilize the function optimize:

>>> from cbx.dynamics import CBO
>>> dyn = CBO(lambda x:x**2, d=1)
>>> dyn.optimize()

This function executes the optimization process until a specified termination criterion is met. In scenarios where parameter control via a scheduler is necessary, the function includes the keyword argument sched (for more details, we refer to Schedulers).

Each dynamic implements a step method that delineates the update in each iteration (see The step method). If you wish to run a dynamic at a lower level, directly managing each iteration, you can define a custom for-loop:

>>> from cbx.dynamics import CBXDynamic
>>> dyn = CBXDynamic(lambda x:x**2, d=1)
>>> for i in range(10):
>>>     dyn.step()

Modelling the ensemble#

All particle-based methods operate on an ensemble of points \(x = (x^1, \ldots, x^N) \in \mathcal{X}^N\). Here, we opt to model the ensemble as an array, assuming \(\mathcal{X} = \mathbb{R}^d\), hence representing it as an \(N \times d\) array. Note, that \(d=(d_1,\ldots,d_s)\) can be a tuple of additional dimensions, which can be convenient for certain spaces \(\mathcal{X}\). In most cases, we assume this array is a numpy array. However, it’s often straightforward to use other objects, like torch tensors instead.

For multiple runs of a particle-based scheme, creating \(M \in \mathbb{N}\) instances of a dynamic and running them is a typical approach. This process can be parallelized at an external level, by creating \(M\) copies of the dynamic. However, in scenarios where, for instance, all parameters remain fixed across runs, it can be efficient to represent the different runs directly on the array level. We therefore model an ensemble as an

\[M\times N\times d_1\times \ldots \times d_s\]

array, which allows us to employ parallelization on the array level. The value of \(M\) defaults to 1, therefore, it is important to keep in mind that an ensemble has three dimensions.

Note

One might ask what the difference between a dynamic with the array structure (M,N,d) and a dynamic with the structure (1,M*N,d) is. The difference becomes visible, whenever particles interact across their ensemble. E.g., in the first case, when the consensus is computed, we compute it separately for each \(m\in\{1,\ldots,M\}\) sub-runs,

\[c(x)^{m} = \frac{\sum_{n=1}^N x^{m,n}\ \exp(-\alpha\ f(x^{m,n}))}{\sum_{n=1}^N \exp(-\alpha\ f(x^{m,n}))},\]

whereas in the second case, we have one single ensemble with \(M\cdot N\) particles and therefore compute a single consensus point

\[\tilde c(x) = \frac{\sum_{m=1}^M \sum_{n=1}^N x^{m,n}\ \exp(-\alpha\ f(x^{m,n}))}{\sum_{m=1}^M \sum_{n=1}^N \exp(-\alpha\ f(x^{m,n}))}.\]

The ensemble can be accessed via the attribute dyn.x of each dynamic.

>>> from cbx.dynamics import ParticleDynamic
>>> dyn = ParticleDynamic(lambda x:x, d=1, N=5)
>>> print(dyn.x.shape)
(1, 5, 1)

If the initial ensemble is not specified, then it is initialized randomly uniform within the bounds given by dyn.x_min and dyn.x_max. In this case the dimension of the problem \(d\) can not be inferred and therefore have to specified:

>>> from cbx.dynamics import ParticleDynamic
>>> dyn = ParticleDynamic(lambda x:x)
RuntimeError: If the inital partical system is not given, the dimension d must be specified!

However, one can specify the initial ensemble directly, in which case the dimension \(d\) can be inferred from the shape of the array:

>>> import numpy as np
>>> from cbx.dynamics import ParticleDynamic
>>> dyn = ParticleDynamic(lambda x:x.sum(-1), x=np.ones((2,5,1)))
>>> print(dyn.x.shape)
(2, 5, 1)

The objective function#

A key element of each particle dynamic is the objective function \(f(x)\). This function has to be specified by the user. A priori, one assumes that it is a map \(f: \mathbb{R}^d \to \mathbb{R}\). However, we need to evaluate the objective on the whole ensemble. The naive approach here, would be to loop over all indices \(m=1, \ldots, M, n=1, \ldots, N\) and evaluate \(f(x^{m,n})\) separately. However, this is not efficient and since the objective evaluation might happen a lot, it is better to evaluate the objective on the whole array at once. Therefore, we need to ensure that objective function dyn.f can be evaluated on an array of shape \(M\times N\times d\) and we always think of maps

\[\mathbb{R}^{M\times N\times d} \to \mathbb{R}^{M\times N}.\]

I.e., in terms of dimensionality an application of dyn.f strips away the dimension of the original problem \(\mathcal{X}=\mathbb{R}^d\) and keeps the structure given by \(M\times N\).

However, there might be cases where the user specifies an objective function that only works within the original interpretation, i.e., \(f: \mathbb{R}^d \to \mathbb{R}\), as in the following example:

>>> import numpy as np
>>> def f(x):
>>>     return abs(x[0] + x[1])
>>> x = np.ones((3,4,2))
>>> print(f(x).shape)
(4, 2)

In the above example the array x yields \(M=3, N=4\) and \(d=2\), therefore the output must be of shape \(3\times 4\). However, since f as defined above only works on the single particle level, the shape of the output and therefore also the application is wrong. Let’s see how the situation changes when we use the above f as an objective for a dynamic:

>>> import numpy as np
>>> from cbx.dynamics import ParticleDynamic
>>> def f(x):
>>>     return abs(x[0] + x[1])
>>>
>>> dyn = ParticleDynamic(f, x=np.ones((3,4,2)))
>>> print(dyn.f(x).shape)
(3, 4)

We observe that the objective function dyn.f now returns an array of shape \(M\times N\). This is due to the fact that an objective is promoted to the class cbx_objective, which handles the evaluation on the array level. By default, we assume that the specified function, only works on the single particle level, which is expressed in the keyword argument f_dim='1D' of the class ParticleDynamic. If your function works on single-run ensembles of shape \(N\times d\), you can specify f_dim='2D' and respectively if it works on multi-run ensembles of shape \(M\times N\times d\) you can specify f_dim=3D. If you specify the latter, the objective function is not modified or wrapped, but is directly used for the dynamic:

>>> import numpy as np
>>> from cbx.dynamics import ParticleDynamic
>>>
>>> def f(x):
>>>     return abs(x[...,0] + x[...,1])
>>>
>>> dyn0 = ParticleDynamic(f, x=np.ones((2,5,2)))
>>> dyn1 = ParticleDynamic(f, x=np.ones((2,5,2)), f_dim='3D')
>>>
>>> print(dyn0.f(np.ones((3,4,2))).shape)
>>> print(dyn1.f(np.ones((3,4,2))).shape)
>>> print(dyn0.f is f)
>>> print(dyn1.f is f)
(3, 4)
(3, 4)
False
True

Here, we observe that the dynamic directly uses the specified objective function for f_dim='3D'. For more complicated functions, one can also inherit from cbx_objective.

Note

When inheriting from cbx_objective, the method __call__ should not be overwritten as it is used internally to update the number of evaluation. Instead, the actual function call should be implemented in the method apply(self, x).

>>> import numpy as np
>>> from cbx.dynamics import ParticleDynamic
>>> from cbx.utils.objective_handling import cbx_objective
>>> class objective(cbx_objective):
>>>     def __init__(self, a=1.0):
>>>         super().__init__()
>>>         self.a = a
>>>     def apply(self, x):
>>>         return self.a * x[...,0] + x[...,1]

The step method#

At the heart of every iterative method is the actual update that is performed. Each dynamic encodes this update in the method inner_step. For example, the standard CBO class CBO implements the following update:

def inner_step(self,) -> None:
    self.consensus, energy = self.compute_consensus() # update consensus point
    self.energy[self.consensus_idx] = energy # update energy only at the active indices
    self.drift = self.x[self.particle_idx] - self.consensus # compute drift
    self.s = self.sigma * self.noise() # compute noise

    #  update particle positions
    self.x[self.particle_idx] = (
        self.x[self.particle_idx] -
        self.correction(self.lamda * self.dt * self.drift) +
        self.s)

In the simplest case, where we use isotropic noise and no correction, this basically implements the update

\[x^i \gets x^i - \lambda\, dt\, (x_i - c_\alpha(x)) + \sigma\, \sqrt{dt} |x^i - c_\alpha(x)| \xi^i\]

with an additional correction step on the drift. If you want to implement a custom update, you need to overwrite this method in an inherited class. Additionally, there might be certain procedures that should happen before or after each iteration. These can be implemented in the method pre_step and post_step. For example the base dynamic class CBO, saves the position of the old ensemble before each iteration:

def pre_step(self,) -> None:
    self.x_old = self.copy_particles(self.x)

After each inner step, the base class updates the best particles (both of the current ensemble and the best of the whole iteration), performs the tracking step (see Tracking and history), performs an optional post processing step (e.g., clip the particles within a valid range) and most importantly, increments the iteration counter:

def post_step(self) -> None:
    if hasattr(self, 'x_old'):
        self.update_diff = np.linalg.norm(self.x - self.x_old, axis=(-2,-1))/self.N

    self.update_best_cur_particle()
    self.update_best_particle()
    self.track()
    self.process_particles()

    self.it+=1

The main step method, which actually used in the iteration is the defined as

def step(self):
    self.pre_step()
    self.inner_step()
    self.post_step()

Noise methods and how to customize them#

In the update step of consensus based methods, diffusion is modeled by the addition of noise, which is scaled by a factor dependent on the iteration. Here, it is very convenient to assume that we can compute the noise, given full information about the dynamic. Therefore, the callable that implements the specific noise needs to accept the dynamic as an argument. This function is then saved in the attribute noise_callable. The function that is called during the iteration noise is defined as follows:

def noise(self):
    return self.noise_callable(self)

You can specify the noise as keyword argument of the class CBXDynamic. This can be a string from the following list:

  • noise = 'anistropic': anistropic noise (see anistropic_noise),

  • noise = 'isotropic': isotropic noise (see isotropic_noise),

  • noise = 'covariance': covariance noise (see covariance_noise).

    >>> from cbx.dynamics import CBXDynamic
    >>> dyn = CBXDynamic(lambda x:x, d=1, noise='isotropic')
    

Alternatively, you can define a custom callable and specify it to be used as the noise_callable:

>>> from cbx.dynamics import CBXDynamic
>>> def my_noise(dyn):
>>>     print('This is my custom noise')
>>> dyn = CBXDynamic(lambda x:x, d=1, noise=my_noise)
>>> dyn.noise()
>>> print(dyn.noise_callable is my_noise)
This is my custom noise
True

Note

The function noise does not take any arguments, other than self.

Correction steps#

In the original CBO paper it is proposed to perform a correction step on the drift in each iteration. From a technical point of view the mechanics here are very similar to how the noise is implemented. The following methods can be specified as keyword argument of the class CBXDynamic:

  • correction = 'none': no correction (see no_correction),

  • correction = 'heavi_side': Heaviside correction (see heavi_side_correction),

  • correction = 'heavi_side_reg': Heaviside correction with regularization (see heavi_side_correction_reg).

As in the case for the noise, this first sets the function correction_callable of the dynamic class. The actual correction is then defined as follows:

def correction(self, x):
    return self.correction_callable(self, x)

Note

The function correction additionally takes x as an argument.

You can also use a custom callable and specify it to be used as the correction_callable:

>>> from cbx.dynamics import CBXDynamic
>>> def my_correction(dyn, x):
>>>     print('This is my custom correction')
>>> dyn = CBXDynamic(lambda x:x, d=1, correction=my_correction)
>>> dyn.correction(dyn.x)
>>> print(dyn.correction_callable is my_correction)
This is my custom correction

Termination criteria#

You can specify different termination criteria for your CBO algorithm, by passing the list term_criteria to the class CBXDynamic. Each entry in this list is a callable that accepts the dynamic as an argument and returns a numpy boolean array of size (M,). The i-th entry specifies if the i-th run should be terminated. The function terminate checks all the termination criteria and then saves the result in the array dyn.active_runs_idx that saves the inidces of the original M runs that are still active. This allows to terminate some of the runs, while others continue. The function terminate returns True, only if the array dyn.active_runs_idx is empty.

Note

We check whether to terminate the run. Therefore, False means a certain check is not meant and the run should continue. True means the check is meant, and the run should be stopped.

As termination callables you can use, e.g., the pre-defined routines in cbx.utils.termination. For convenience, you can specify the max iteration criterion directly as a keyword argument CBXDynamic(..., max_it=10).

Tracking and history#

During the iteration, we can save different values in the dictionary dyn.history. You can specify, which values to track, with the dictonary track_args. In the follwing we specify possible keys:

track_args={...,'save_int': <int>}#

The value save_int specifiy the interval in which the values should be tracked.

track_args={...,names=[....,'x']}#

Specifies, that the particles dyn.x should be tracked after each step. In that case the entry in the history dyn.history['x'] is a basic list of arrays of shape (M, N, d).

track_args={...,names=[....,'update_norm']}#

Specifies, that the norm of the difference between the old and the new ensemble should be tracked. The values are saved in dyn.history['update_norm'] which is a list of arrays of shape (M,).

track_args={...,names=[....,'energy']}#

Specifies, that the best energy in each iteration should be tracked. The values are saved in dyn.history['energy'] which is a list of arrays of shape (M,).

track_args={...,names=[....,'consensus']}#

Specifies, that the consensus points should be tracked. They are saved in dyn.history['consensus'] which is a list of arrays of shape (M, d). This is only available in the subclass CBXDynamic cbx.dynamics.CBXDynamic.

track_args={...,names=[....,'drift']}#

Specifies, that the drift vectors should be tracked. They are saved in dyn.history['drift'] which is a list of arrays of shape (M, d). This is only available in the subclass CBXDynamic cbx.dynamics.CBXDynamic.

track_args={...,names=[....,'drift_mean']}#

Specifies that the mean of the drift vectors should be tracked. It is saved in dyn.history['drift_mean'] which is a list of arrays of shape (M, d).

track_args={...,extra_tracks=[<track>]}#

The list extra_tracks allows you to specify additional functions that perform custom tracking routines. The instances should be a class that implement the following functions

  • init_history: Here you initialize the value in dyn.history, e.g., you can initialize an array or list to store the values in.

  • update: This performs the tracking after each update.

    >>> from cbx.dynamics import CBXDynamic
    >>> class MyCustomTrack:
    >>>     def init_history(dyn):
    >>>         print('Initializing my custom track')
    >>>         dyn.history['my_custom_track'] = []
    >>>     def update(dyn):
    >>>         print('Updating my custom track')
    >>>         dyn.history['my_custom_track'].append(dyn.x.min(axis=-1))
    >>> dyn = CBXDynamic(lambda x:x, d=1, track_args={'extra_tracks':[MyCustomTrack]})
    >>> dyn.step()
    >>> print(dyn.history['my_custom_track'])
    Initializing my custom track
    Updating my custom track
    [0]
    

Batching#

As proposed in [1] it is common to perform only batch updates across the ensemble. In order to specify batching in a cbx class you can use the keyword argument CBXDynamic(...,batch_args=batch_args), where batch_args is a dictionary with the following keys:

  • 'batch_partial': If True the consensus and particle indices are the same. If False the particle indices are an Ellipsis.

  • 'batch_size': The size of the batch.

  • 'seed': The seed for the random number generator.

  • 'var': The resampling variant.

We explain the mechanism and the behavior of these arguments below.

Note

Here, and in the following this batching should not be confused with the batching of a objective function. If your objective function is given as a sum over many functions, it might make sense to batch the evaluation of this function. However, the batching over the ensemble is conceptually different.

The base class CBXDynamic implements the function set_batch_idx. If it is called it sets the following attributes

  • dyn.consensus_idx: the indices used to computed the consensus point,

  • dyn.particle_idx: the indices updated in each step.

The keyword argument batch_partial decides how consensus and particle indices relate to each other:

  • batch_partial=True: the consensus and particle indices are the same.

  • batch_partial=False: each particle is updated from the partially computed consensus and therefore, the particle indices are an Ellipsis.

The attribute dyn.consenus_idx is a tuple of array indices such that we can directly use it for array indexing:

>>> import numpy as np
>>> from cbx.dynamics import CBXDynamic
>>> dyn = CBXDynamic(lambda x:x, M=4, N=5, d=1, batch_args={'size':2})
>>> dyn.set_batch_idx()
>>> print(dyn.consensus_idx)
>>> print(dyn.x[dyn.consensus_idx].shape)
(array([[0, 0],
        [1, 1],
        [2, 2],
        [3, 3]]),
 array([[1, 3],
        [1, 3],
        [0, 1],
        [2, 0]]),
 Ellipsis)
 (4, 2, 1)

The first entry, allows for convenient broadcasting in the run dimension, this array \(M\in\N_0^{M\times\text{batch_size}}\) is deterministic and defined as

\[M_{m, n} := n.\]

The second entry stores the indices of the particles that belong to the current batch. This array has the same shape as the previous one and randomly selects indices in the range 0 to N-1, independently across each run. In the best the indices are unique within a single sub-run.

Resampling strategies#

As described in [1] in certain situations it is useful to add noise independent of the current ensemble, or even resample the whole ensemble. In order to perform such a operation we can use the function resample, which accepts a list of callables. Similar to the terimination callables, they also accept the dynamic as an argument and return a list of indices, of where to resample. The resampling can be performed as a part of the post processing:

>>> from cbx.dynamics import CBXDynamic
>>> def always_resample_in_run_zero(dyn):
>>>     return [0]
>>> resampling = cbx.resampling.resampling([always_resample_in_run_zero])
>>> dyn = CBXDynamic(lambda x:x.sum(axis=-1), M=4, N=5, d=1, post_process = lambda dyn: resampling(dyn))
>>> dyn.post_process()
Resampled in runs [0]

We refer to resample for available resampling functions.

References#