Source code for easy_vic_build.tools.calibrate_func.algorithm_NSGAII

# code: utf-8
# author: Xudong Zheng
# email: z786909151@163.com

"""NSGA-II base implementation for calibration workflows.

This module defines :class:`NSGAII_Base`, a reusable DEAP-based NSGA-II runner
with checkpoint persistence, generation history tracking, and optional Pareto
front plotting.
"""

import os
import pickle
import random
import matplotlib.pyplot as plt
import numpy as np

from deap import algorithms, base, creator, tools
from tqdm import *
from copy import deepcopy

from ... import logger
from ..decoractors import clock_decorator


[docs] class NSGAII_Base: """ A class that implements the NSGA-II (Non-dominated Sorting Genetic Algorithm II) for multi-objective optimization. This class provides methods for setting up the genetic algorithm, evaluating individuals, applying genetic operators (crossover, mutation), selecting the next generation, and saving/loading the algorithm state. Attributes ---------- popSize : int The population size for each generation. maxGen : int The maximum number of generations to run the algorithm. save_path : str The path to save and load the algorithm state. history : list A history of the population and non-dominated fronts at each generation. current_generation : int The current generation number. initial_population : list The initial population before any genetic operators are applied. population : list The current population of individuals. Methods ------- __init__(algParams, save_path): Initializes the NSGA-II algorithm with the given parameters and attempts to load the state. get_obs(): Placeholder function to get the observed values (to be designed for specific use cases). get_sim(): Placeholder function to get the simulated values (to be designed for specific use cases). set_algorithm_params(popSize, maxGen, cxProb, mutateProb): Sets the parameters for the genetic algorithm (population size, max generations, crossover probability, and mutation probability). createFitness(): Creates the fitness function for individuals. createInd(): Creates an individual representation (a list). samplingInd(): Samples a new individual by generating random values for its elements. registerInd(): Registers the individual sampling function with the toolbox. registerPop(): Registers the population initialization function with the toolbox. evaluate(ind): A placeholder function for evaluating an individual's fitness (to be customized for specific use cases). registerEvaluate(): Registers the evaluation function with the toolbox. evaluatePop(population): Evaluates the fitness of the entire population. operatorMate(parent1, parent2): Defines the crossover operation for mating two individuals (using a two-point crossover). operatorMutate(ind): Defines the mutation operation for an individual (using a bit-flip mutation). operatorSelect(population): Defines the selection operation (using tournament selection). registerOperators(): Registers the genetic operators (mate, mutate, and select) with the toolbox. apply_genetic_operators(offspring): Applies the genetic operators (crossover and mutation) to the offspring. select_next_generation(combined): Selects the next generation by sorting individuals based on Pareto dominance and applying crowding distance. print_results(population): Prints the results of the best individual from the final population. load_state(): Loads the algorithm state from the specified save path (if a saved state exists). save_state(): Saves the current algorithm state (current generation, population, and history) to the specified save path. run(): Runs the NSGA-II algorithm for the specified number of generations, applying genetic operators and selecting the next generation. """
[docs] def __init__( self, algParams={"popSize": 40, "maxGen": 250, "cxProb": 0.7, "mutateProb": 0.2}, save_path="checkpoint.pkl", ): """ Initializes the NSGA-II algorithm with the given parameters. Parameters ---------- algParams : dict Dictionary containing the algorithm parameters: - popSize: The population size for each generation. - maxGen: The maximum number of generations to run the algorithm. - cxProb: The crossover probability. - mutateProb: The mutation probability. save_path : str, optional The path to save and load the algorithm state (default is "checkpoint.pkl"). """ # set algorithm params self.popSize = algParams["popSize"] self.maxGen = algParams["maxGen"] self.toolbox = base.Toolbox() self.set_algorithm_params(**algParams) # create self.createFitness() self.createInd() # register self.registerInd() self.registerPop() self.registerEvaluate() self.registerOperators() # set initial variables self.history = [] self.current_generation = 0 self.initial_population = None # set save path self.save_path = save_path # try to load state (if exist) self.load_state()
# ----------------------------- # set algorithm parameters # -----------------------------
[docs] def set_algorithm_params( self, popSize=None, maxGen=None, cxProb=None, mutateProb=None, **kwargs ): """ Sets the parameters for the genetic algorithm. Parameters ---------- popSize : int, optional The population size for each generation (default is 40). maxGen : int, optional The maximum number of generations to run the algorithm (default is 250). cxProb : float, optional The crossover probability (default is 0.7). mutateProb : float, optional The mutation probability (default is 0.2). """ self.toolbox.popSize = 40 if not popSize else popSize self.toolbox.maxGen = 250 if not maxGen else maxGen self.toolbox.cxProb = 0.7 if not cxProb else cxProb self.toolbox.mutateProb = 0.2 if not mutateProb else mutateProb
# ----------------------------- # User should define these # -----------------------------
[docs] def get_obs(self): """ Placeholder function to get observed values (to be designed for specific use cases). Returns ------- None """ self.obs = 0
[docs] def get_sim(self): """ Placeholder function to get simulated values (to be designed for specific use cases). Returns ------- None """ self.sim = 0
[docs] def createFitness(self): """Creates the fitness function for individuals.""" creator.create("Fitness", base.Fitness, weights=(-1.0,))
[docs] def createInd(self): """Creates an individual representation (a list).""" creator.create("Individual", list, fitness=creator.Fitness)
[docs] def samplingInd(self): """ Samples a new individual by generating random values for its elements. Returns ------- Individual A new individual sampled with random elements. """ # example: generate 5 elements/params in each Ind ind_elements = [random.uniform(-10, 10) for _ in range(5)] return creator.Individual(ind_elements)
[docs] def evaluate(self, ind): """ A placeholder function for evaluating an individual's fitness. Parameters ---------- ind : Individual The individual to evaluate. Returns ------- tuple A tuple containing the fitness values. """ x, y = ind return (x**2 + y**2,)
# ----------------------------- # NSGAII operator # -----------------------------
[docs] @staticmethod def operatorMate(parent1, parent2): """ Defines the crossover operation for mating two individuals. Parameters ---------- parent1 : Individual The first parent individual. parent2 : Individual The second parent individual. Returns ------- tuple A tuple containing the offspring resulting from the crossover. """ # parent is ind kwargs = {} return tools.cxTwoPoint(parent1, parent2, **kwargs)
[docs] @staticmethod def operatorMutate(ind): """ Defines the mutation operation for an individual. Parameters ---------- ind : Individual The individual to mutate. Returns ------- tuple A tuple containing the mutated individual. """ kwargs = {} return tools.mutFlipBit(ind, kwargs)
[docs] @staticmethod def operatorSelect(population): """ Defines the selection operation for choosing individuals from the population. Parameters ---------- population : list of Individual The population from which to select individuals. Returns ------- list A list of selected individuals. """ kwargs = {} return tools.selTournament(population, **kwargs)
# ----------------------------- # Registering DEAP components # -----------------------------
[docs] def registerInd(self): """Registers the individual sampling function with the toolbox.""" self.toolbox.register("individual", self.samplingInd)
[docs] def registerPop(self): """Registers the population initialization function with the toolbox.""" self.toolbox.register( "population", tools.initRepeat, list, self.toolbox.individual )
[docs] def registerEvaluate(self): """Registers the evaluation function with the toolbox.""" self.toolbox.register("evaluate", self.evaluate)
[docs] def registerOperators(self): """Registers the genetic operators (mate, mutate, and select) with the toolbox.""" self.toolbox.register("mate", self.operatorMate) self.toolbox.register("mutate", self.operatorMutate) self.toolbox.register("select", self.operatorSelect)
# ----------------------------- # Generation in NSGAII # -----------------------------
[docs] def evaluatePop(self, population): """ Evaluates the fitness of the entire population. Parameters ---------- population : list of Individual The population to evaluate. """ fitnesses = list(map(self.toolbox.evaluate, population)) for ind, fit in zip(population, fitnesses): ind.fitness.values = fit
[docs] def apply_genetic_operators(self, offspring): """ Applies the genetic operators (crossover and mutation) to the offspring. Parameters ---------- offspring : list of Individual The offspring to apply the genetic operators to. """ # it can be implemented by algorithms.varAnd # crossover for child1, child2 in zip(offspring[::2], offspring[1::2]): if random.random() < self.toolbox.cxProb: self.toolbox.mate(child1, child2) del child1.fitness.values del child2.fitness.values # mutate for mutant in offspring: if random.random() < self.toolbox.mutateProb: self.toolbox.mutate(mutant) del mutant.fitness.values
[docs] def select_next_generation(self, combined): """ Selects the next generation from the combined population and offspring. Parameters ---------- combined : list of Individual The combined population of parents and offspring. Returns ------- list The selected next generation. """ fronts = tools.sortNondominated(combined, len(combined), first_front_only=False) next_generation = [] for front in fronts: if len(next_generation) + len(front) <= self.popSize: next_generation.extend(front) else: # cal crowding tools.emo.assignCrowdingDist(front) front.sort(key=lambda ind: ind.fitness.crowding_dist, reverse=True) next_generation.extend(front[: self.popSize - len(next_generation)]) break return next_generation
# ----------------------------- # Save & Load # -----------------------------
[docs] def load_state(self): """ Loads the algorithm state from the specified save path if a saved state exists. """ if os.path.exists(self.save_path): with open(self.save_path, "rb") as f: state = pickle.load(f) self.current_generation = state["current_generation"] self.initial_population = state["initial_population"] self.population = state["population"] self.history = state["history"] else: self.population = self.toolbox.population(n=self.popSize) self.initial_population = self.population[:]
[docs] def save_state(self): """ Saves the current algorithm state (current generation, population, and history) to the specified save path. """ state = { "current_generation": self.current_generation, "population": deepcopy(self.population), "initial_population": deepcopy(self.initial_population), "history": deepcopy(self.history), } with open(self.save_path, "wb") as f: pickle.dump(state, f)
# ----------------------------- # Print and Plot # -----------------------------
[docs] def print_results(self, population): """ Prints the results of the best individual from the final population. Parameters ---------- population : list of Individual The final population. """ best_ind = tools.selBest(population, k=1)[0] logger.info("best_ind:", best_ind) logger.info("fitness:", best_ind.fitness.values)
[docs] def plot_front_pairwise(self, population, front, gen, names_plot=None, plot_dir="pareto_progress", transform_func=None): """Plot pairwise objective scatter for full population and first front. Parameters ---------- population : list Population to visualize (typically parent + offspring). front : list First non-dominated front. gen : int Generation index used in figure title. names_plot : list of str, optional Objective axis labels. If ``None``, labels are generated as ``obj1``, ``obj2``, ... plot_dir : str, optional Directory used to store figure output. transform_func : callable, optional Optional transformation applied to objective arrays before plotting. """ # check plot_dir if not os.path.exists(plot_dir): os.makedirs(plot_dir) # default names if not provided n_obj = len(population[0].fitness.values) if names_plot is None: names_plot = [f"obj{i+1}" for i in range(n_obj)] # extract population and front values pop_vals = np.array([ind.fitness.values for ind in population]) front_vals = np.array([ind.fitness.values for ind in front]) all_vals = np.vstack([pop_vals, front_vals]) valid_mask = (all_vals != -9999.0) & (all_vals != 9999.0) all_vals_clean = all_vals[valid_mask].reshape(-1, all_vals.shape[1]) if transform_func is not None: pop_vals = transform_func(pop_vals) front_vals = transform_func(front_vals) # create subplots f, axes = plt.subplots(nrows=n_obj, ncols=n_obj, figsize=(2*n_obj, 2*n_obj), gridspec_kw={"wspace": 0.25, "hspace": 0.25, "left":0.1, "right": 0.95, "bottom": 0.1, "top": 0.95} ) for i in range(n_obj): for j in range(n_obj): ax = axes[i, j] ax.scatter(pop_vals[:, j], pop_vals[:, i], color='gray', s=10, alpha=0.7, zorder=5) ax.scatter(front_vals[:, j], front_vals[:, i], color='red', s=15, zorder=10) # set axis labels if i == n_obj - 1: ax.set_xlabel(names_plot[j], fontdict={'weight':'bold'}) if j == 0: ax.set_ylabel(names_plot[i], fontdict={'weight':'bold'}) x_min, x_max = np.percentile(all_vals_clean[:, j], [2, 100]) y_min, y_max = np.percentile(all_vals_clean[:, i], [2, 100]) pad_x = (x_max - x_min) * 0.05 pad_y = (y_max - y_min) * 0.05 ax.set_xlim((x_min-pad_x, x_max+pad_x)) ax.set_ylim((y_min-pad_y, y_max+pad_y)) plt.suptitle(f'Generation {gen}', fontsize=14, weight='bold') plt.tight_layout(rect=[0, 0, 1, 0.97]) plt.savefig(os.path.join(plot_dir, f'NSGAII_process.png')) plt.close()
# ----------------------------- # Run # -----------------------------
[docs] @clock_decorator(print_arg_ret=False) def run( self, plot_progress=False, plot_dir="pareto_progress", names_plot=None, transform_func=None ): """ Runs the NSGA-II algorithm for the specified number of generations. This method applies genetic operators, evaluates individuals, selects the next generation, and stores the results. Parameters ---------- plot_progress : bool, optional Whether to save a pairwise Pareto plot each generation. plot_dir : str, optional Output directory for progress plots. names_plot : list of str, optional Objective labels passed to :meth:`plot_front_pairwise`. transform_func : callable, optional Optional transform function passed to :meth:`plot_front_pairwise`. Returns ------- list The final population after all generations. """ # evaluate population self.evaluatePop(self.population) # loop for generations start_gen = self.current_generation for gen in tqdm( range(start_gen, self.maxGen), desc="loop for NSGAII generation", colour="green", ): # current generation self.current_generation = gen # generate offspring offspring = self.toolbox.select(self.population, self.popSize) offspring = list(map(self.toolbox.clone, offspring)) # apply_genetic_operators and evaluate it self.apply_genetic_operators(offspring) self.evaluatePop(offspring) # combine population and offspring combined = self.population + offspring # sortNondominated to get fronts (first_front_only=True, only get the first front, else all front, namely fronts) fronts = tools.sortNondominated( combined, len(combined), first_front_only=False ) # calculate crowding distance only for first front for history first_front = fronts[0] tools.emo.assignCrowdingDist(first_front) # save history (population and front) self.history.append({ "population": deepcopy(self.population), "combined_population": deepcopy(combined), "fronts": deepcopy(fronts), "first_front": deepcopy(first_front), }) # save state at the end of each gen self.save_state() # plot if plot_progress: self.plot_front_pairwise( combined, first_front, gen, names_plot=names_plot, plot_dir=plot_dir, transform_func=transform_func ) # update population: select next generation self.population[:] = self.select_next_generation(combined) self.print_results(self.population) return self.population