Source code for empulse.optimizers.generation

from collections.abc import Callable, Generator, Iterable, Sequence
from datetime import datetime

import numpy as np
from joblib import Parallel, delayed
from numpy.typing import NDArray
from scipy.optimize import OptimizeResult
from sklearn.utils import check_random_state

MIN_POP_SIZE: int = 10
FEATURE_TO_POP_SIZE_RATIO: int = 10


[docs] class Generation: """ A single generation of a Real-coded Genetic Algorithm (RGA). Read more in the :ref:`User Guide <proflogit>`. Parameters ---------- population_size : int or None, default=None Number of individuals in the population. If ``None``, population size is set to ``10 * n_features``. crossover_rate : float, default=0.8 Probability of crossover. Must be in [0, 1]. mutation_rate : float, default=0.1 Probability of mutation. Must be in [0, 1]. elitism : float, default=0.05 Fraction of the population that transferred to the next generation without change. Must be in [0, 1]. verbose : bool, default=False If ``True``, print status messages. logging_fn : callable, default=print Function to use for logging. random_state : int or None, default=None Random seed. n_jobs : int or None, default=1 Number of jobs to run in parallel. If ``-1``, use all available processors. If ``None``, use 1 processor. Attributes ---------- name : str Name of the optimizer. population : ndarray, shape (population_size, n_dim) Current population. population_size : int Number of individuals in the population. crossover_rate : float Probability of crossover. mutation_rate : float Probability of mutation. elitism : int The number of individuals of the population that are transferred to the next generation without change. verbose : bool If ``True``, print status messages. logging_fn : callable Function to use for logging. rng : RandomState Random state object. n_jobs : int Number of jobs to run in parallel. If ``-1``, use all available processors. If ``None``, use 1 processor. fx_best : list List of best fitness values. fitness : ndarray, shape (population_size,) Fitness values of the current population. result : OptimizeResult Result of the optimization. lower_bounds : ndarray, shape (n_dim,) Lower bounds of the search space. upper_bounds : ndarray, shape (n_dim,) Upper bounds of the search space. delta_bounds : ndarray, shape (n_dim,) Difference between upper and lower bounds. n_dim : int Number of dimensions. _n_mating_pairs : int Number of mating pairs. elite_pool : list List of elite individuals. """ def __init__( self, population_size: int | None = None, crossover_rate: float = 0.8, mutation_rate: float = 0.1, elitism: float = 0.05, verbose: bool = False, logging_fn: Callable[[str], None] = print, random_state: int | None = None, n_jobs: int | None = 1, ): super().__init__() self.name = 'RGA' self.population: NDArray[np.float64] = np.empty(0) if population_size is not None: if not isinstance(population_size, int): raise TypeError('`pop_size` must be an int.') if population_size < MIN_POP_SIZE: raise ValueError(f'`pop_size` must be >= {MIN_POP_SIZE}, got {population_size}.') if population_size is None: population_size = -1 self.population_size = population_size if not 0.0 <= crossover_rate <= 1.0: raise ValueError('`crossover_rate` must be in [0, 1].') self.crossover_rate = crossover_rate if not 0.0 <= mutation_rate <= 1.0: raise ValueError('`mutation_rate` must be in [0, 1].') self.mutation_rate = mutation_rate if not 0.0 <= elitism <= 1.0: raise ValueError('`elitism` must be in [0, 1].') self.elitism_fraction: float = elitism self.elitism = 0 # determined later self.verbose = verbose self.logging_fn = logging_fn # Get random state object self.rng = check_random_state(random_state) self.n_jobs = n_jobs # Attributes self._n_mating_pairs: int | None = None self.elite_pool: list[tuple[NDArray[np.float64], np.float64]] = [] # individual, fitness self.fx_best: list[np.float64] = [] self.fitness: NDArray[np.float64] = np.empty(0) self.result = OptimizeResult(success=False, nfev=0, nit=0, fun=np.inf, x=None) # type: ignore[call-arg] self.lower_bounds: NDArray[np.float64] = np.asarray(0.0) self.upper_bounds: NDArray[np.float64] = np.asarray(0.0) self.delta_bounds: NDArray[np.float64] = np.asarray(0.0) self.n_dim: int = 0
[docs] def optimize( self, objective: Callable[[NDArray[np.float64]], float], bounds: list[tuple[float, float]] ) -> Generator['Generation', None, None]: """ Optimize the objective function. Parameters ---------- objective : Callable Objective function to optimize. Should be of signature ``objective(weights) -> float``. bounds : list[tuple[float, float]] List of tuples of lower and upper bounds for each weight. Yields ------ self : Generation Current instance of the optimizer. """ # Check bounds bounds = list(bounds) if not all( isinstance(t, tuple) and len(t) == 2 and isinstance(t[0], int | float) and isinstance(t[1], int | float) for t in bounds ): raise ValueError('`bounds` must be a sequence of tuples of two numbers (lower_bound, upper_bound).') array_bounds: NDArray[np.float64] = np.asarray(bounds, dtype=np.float64).T self.lower_bounds = array_bounds[0] self.upper_bounds = array_bounds[1] if self.lower_bounds is None or self.upper_bounds is None: raise ValueError('`lower_bounds` and `upper_bounds` are None.') self.delta_bounds = np.fabs(self.upper_bounds - self.lower_bounds) self.n_dim = len(bounds) # Check population size if self.population_size <= 0: self.population_size = self.n_dim * FEATURE_TO_POP_SIZE_RATIO self.elitism = int(max(1, round(self.population_size * self.elitism_fraction))) self._n_mating_pairs = int(self.population_size / 2) # Constant for crossover self.fitness = np.empty(self.population_size) * np.nan self.population = self._generate_population() self._evaluate(objective) self._update_elite_pool() if self.verbose: self._log_start() while True: yield self if self.verbose: self._log_progress() self._select() self._crossover() self._mutate() self._evaluate(objective) self._insert_elites() # survivor selection: overlapping-generation model self._update_elite_pool()
def _generate_population(self) -> NDArray[np.float64]: if self.n_dim is None: raise ValueError('`n_dim` must be set.') if self.population_size is None: raise ValueError('`population_size` must be set.') population = self.rng.rand(self.population_size, self.n_dim) return self.lower_bounds + population * self.delta_bounds # type: ignore def _evaluate(self, objective: Callable[[NDArray[np.float64]], float]) -> None: if self.population_size is None: raise ValueError('`population_size` must be set.') fitness_values = Parallel(n_jobs=self.n_jobs)( delayed(self._update_fitness)(objective, ix) for ix in range(self.population_size) ) self.fitness = np.asarray(fitness_values) def _update_fitness(self, objective: Callable[[NDArray[np.float64]], float], index: int) -> float: fitness_value = float(self.fitness[index]) if np.isnan(fitness_value): self.result.nfev += 1 # type: ignore[attr-defined] return objective(self.population[index]) else: return fitness_value def _crossover(self) -> None: """Perform local arithmetic crossover.""" # Make iterator for pairs match_parents = ( rnd_pair for rnd_pair in self.rng.choice(self.population_size, (self._n_mating_pairs, 2), replace=False) ) # Crossover parents for ix1, ix2 in match_parents: if self.rng.uniform() < self.crossover_rate: parent1 = self.population[ix1] # Pass-by-ref parent2 = self.population[ix2] w = self.rng.uniform(size=self.n_dim) child1 = w * parent1 + (1 - w) * parent2 child2 = w * parent2 + (1 - w) * parent1 self.population[ix1] = child1 self.population[ix2] = child2 self.fitness[ix1] = np.nan self.fitness[ix2] = np.nan def _mutate(self) -> None: """Perform uniform random mutation.""" if self.population_size is None: raise ValueError('`population_size` must be set.') if self.n_dim is None: raise ValueError('`n_dim` must be set.') for ix in range(self.population_size): if self.rng.uniform() < self.mutation_rate: mutant = self.population[ix] # inplace rnd_gene = self.rng.choice(self.n_dim) rnd_val = self.rng.uniform( low=self.lower_bounds[rnd_gene], high=self.upper_bounds[rnd_gene], ) mutant[rnd_gene] = rnd_val self.fitness[ix] = np.nan def _select(self) -> None: """Perform linear scaling selection.""" fitness_values = np.copy(self.fitness) min_fitness = float(np.min(fitness_values)) avg_fitness = float(np.mean(fitness_values)) max_fitness = float(np.max(fitness_values)) # Linear scaling if min_fitness < 0: fitness_values -= min_fitness min_fitness = 0 if min_fitness > (2 * avg_fitness - max_fitness): denominator = max_fitness - avg_fitness a = avg_fitness / (denominator if denominator != 0 else 1e-10) b = a * (max_fitness - 2 * avg_fitness) else: denominator = avg_fitness - min_fitness a = avg_fitness / (denominator if denominator != 0 else 1e-10) b = -min_fitness * a scaled_fitness = np.abs(a * fitness_values + b) # Normalize if (normalization_factor := np.sum(scaled_fitness)) == 0: relative_fitness = np.ones(self.population_size) / self.population_size # Uniform distribution else: relative_fitness = scaled_fitness / normalization_factor # Select individuals select_ix = self.rng.choice( self.population_size, size=self.population_size, replace=True, p=relative_fitness, ) self.population = self.population[select_ix] self.fitness = self.fitness[select_ix] def _get_sorted_non_nan_ix(self) -> list[tuple[int, float]]: """Get indices sorted according to non-nan fitness values.""" non_nan_fx = ((ix, fx) for ix, fx in enumerate(self.fitness) if ~np.isnan(fx)) sorted_list = sorted(non_nan_fx, key=lambda t: t[1]) return sorted_list def _insert_elites(self) -> None: """Update population by replacing the worst solutions of the current with the ones from the elite pool.""" if any(np.isnan(fx) for fx in self.fitness): sorted_fx = self._get_sorted_non_nan_ix() worst_ix: Iterable[int] = [t[0] for t in sorted_fx][: self.elitism] else: worst_ix = np.argsort(self.fitness)[: self.elitism] for i, ix in enumerate(worst_ix): elite, fitness_elite = self.elite_pool[i] self.population[ix] = elite self.fitness[ix] = fitness_elite def _update_elite_pool(self) -> None: if any(np.isnan(fx) for fx in self.fitness): sorted_fx = self._get_sorted_non_nan_ix() elite_ix: Sequence[int] = [t[0] for t in sorted_fx][-self.elitism :] else: elite_ix = list(np.argsort(self.fitness)[-self.elitism :]) self.elite_pool = [(self.population[ix].copy(), self.fitness[ix]) for ix in elite_ix] # Append best solution self.fx_best.append(self.fitness[elite_ix[-1]]) self.result.x = self.population[elite_ix[-1]] # type: ignore[attr-defined] self.result.fun = self.fx_best[-1] # type: ignore[attr-defined] self.result.nit = len(self.fx_best) # type: ignore[attr-defined] def _log_start(self) -> None: self.logging_fn( '# --- {} ({}) --- #'.format( self.name, datetime.now().strftime('%a %b %d %H:%M:%S'), # noqa: DTZ005 ) ) def _log_progress(self) -> None: status_msg = f'Iter = {self.result.nit:5d}; nfev = {self.result.nfev:6d}; fx = {self.fx_best[-1]:.4f}' self.logging_fn(status_msg) def _log_end(self, stop_time: float) -> None: self.logging_fn(self.result) # type: ignore[arg-type] self.logging_fn(f'# --- {self.name} ({stop_time}) --- #')