# code: utf-8
# author: Xudong Zheng
# email: z786909151@163.com
"""NSGA-II base implementation for calibration workflows.
This module defines :class:`NSGAII_Base`, a reusable DEAP-based NSGA-II runner
with checkpoint persistence, generation history tracking, and optional Pareto
front plotting.
"""
import os
import pickle
import random
import matplotlib.pyplot as plt
import numpy as np
from deap import algorithms, base, creator, tools
from tqdm import *
from copy import deepcopy
from ... import logger
from ..decoractors import clock_decorator
[docs]
class NSGAII_Base:
"""
A class that implements the NSGA-II (Non-dominated Sorting Genetic Algorithm II) for multi-objective optimization.
This class provides methods for setting up the genetic algorithm, evaluating individuals,
applying genetic operators (crossover, mutation), selecting the next generation, and saving/loading the algorithm state.
Attributes
----------
popSize : int
The population size for each generation.
maxGen : int
The maximum number of generations to run the algorithm.
save_path : str
The path to save and load the algorithm state.
history : list
A history of the population and non-dominated fronts at each generation.
current_generation : int
The current generation number.
initial_population : list
The initial population before any genetic operators are applied.
population : list
The current population of individuals.
Methods
-------
__init__(algParams, save_path):
Initializes the NSGA-II algorithm with the given parameters and attempts to load the state.
get_obs():
Placeholder function to get the observed values (to be designed for specific use cases).
get_sim():
Placeholder function to get the simulated values (to be designed for specific use cases).
set_algorithm_params(popSize, maxGen, cxProb, mutateProb):
Sets the parameters for the genetic algorithm (population size, max generations, crossover probability, and mutation probability).
createFitness():
Creates the fitness function for individuals.
createInd():
Creates an individual representation (a list).
samplingInd():
Samples a new individual by generating random values for its elements.
registerInd():
Registers the individual sampling function with the toolbox.
registerPop():
Registers the population initialization function with the toolbox.
evaluate(ind):
A placeholder function for evaluating an individual's fitness (to be customized for specific use cases).
registerEvaluate():
Registers the evaluation function with the toolbox.
evaluatePop(population):
Evaluates the fitness of the entire population.
operatorMate(parent1, parent2):
Defines the crossover operation for mating two individuals (using a two-point crossover).
operatorMutate(ind):
Defines the mutation operation for an individual (using a bit-flip mutation).
operatorSelect(population):
Defines the selection operation (using tournament selection).
registerOperators():
Registers the genetic operators (mate, mutate, and select) with the toolbox.
apply_genetic_operators(offspring):
Applies the genetic operators (crossover and mutation) to the offspring.
select_next_generation(combined):
Selects the next generation by sorting individuals based on Pareto dominance and applying crowding distance.
print_results(population):
Prints the results of the best individual from the final population.
load_state():
Loads the algorithm state from the specified save path (if a saved state exists).
save_state():
Saves the current algorithm state (current generation, population, and history) to the specified save path.
run():
Runs the NSGA-II algorithm for the specified number of generations, applying genetic operators and selecting the next generation.
"""
[docs]
def __init__(
self,
algParams={"popSize": 40, "maxGen": 250, "cxProb": 0.7, "mutateProb": 0.2},
save_path="checkpoint.pkl",
):
"""
Initializes the NSGA-II algorithm with the given parameters.
Parameters
----------
algParams : dict
Dictionary containing the algorithm parameters:
- popSize: The population size for each generation.
- maxGen: The maximum number of generations to run the algorithm.
- cxProb: The crossover probability.
- mutateProb: The mutation probability.
save_path : str, optional
The path to save and load the algorithm state (default is "checkpoint.pkl").
"""
# set algorithm params
self.popSize = algParams["popSize"]
self.maxGen = algParams["maxGen"]
self.toolbox = base.Toolbox()
self.set_algorithm_params(**algParams)
# create
self.createFitness()
self.createInd()
# register
self.registerInd()
self.registerPop()
self.registerEvaluate()
self.registerOperators()
# set initial variables
self.history = []
self.current_generation = 0
self.initial_population = None
# set save path
self.save_path = save_path
# try to load state (if exist)
self.load_state()
# -----------------------------
# set algorithm parameters
# -----------------------------
[docs]
def set_algorithm_params(
self, popSize=None, maxGen=None, cxProb=None, mutateProb=None, **kwargs
):
"""
Sets the parameters for the genetic algorithm.
Parameters
----------
popSize : int, optional
The population size for each generation (default is 40).
maxGen : int, optional
The maximum number of generations to run the algorithm (default is 250).
cxProb : float, optional
The crossover probability (default is 0.7).
mutateProb : float, optional
The mutation probability (default is 0.2).
"""
self.toolbox.popSize = 40 if not popSize else popSize
self.toolbox.maxGen = 250 if not maxGen else maxGen
self.toolbox.cxProb = 0.7 if not cxProb else cxProb
self.toolbox.mutateProb = 0.2 if not mutateProb else mutateProb
# -----------------------------
# User should define these
# -----------------------------
[docs]
def get_obs(self):
"""
Placeholder function to get observed values (to be designed for specific use cases).
Returns
-------
None
"""
self.obs = 0
[docs]
def get_sim(self):
"""
Placeholder function to get simulated values (to be designed for specific use cases).
Returns
-------
None
"""
self.sim = 0
[docs]
def createFitness(self):
"""Creates the fitness function for individuals."""
creator.create("Fitness", base.Fitness, weights=(-1.0,))
[docs]
def createInd(self):
"""Creates an individual representation (a list)."""
creator.create("Individual", list, fitness=creator.Fitness)
[docs]
def samplingInd(self):
"""
Samples a new individual by generating random values for its elements.
Returns
-------
Individual
A new individual sampled with random elements.
"""
# example: generate 5 elements/params in each Ind
ind_elements = [random.uniform(-10, 10) for _ in range(5)]
return creator.Individual(ind_elements)
[docs]
def evaluate(self, ind):
"""
A placeholder function for evaluating an individual's fitness.
Parameters
----------
ind : Individual
The individual to evaluate.
Returns
-------
tuple
A tuple containing the fitness values.
"""
x, y = ind
return (x**2 + y**2,)
# -----------------------------
# NSGAII operator
# -----------------------------
[docs]
@staticmethod
def operatorMate(parent1, parent2):
"""
Defines the crossover operation for mating two individuals.
Parameters
----------
parent1 : Individual
The first parent individual.
parent2 : Individual
The second parent individual.
Returns
-------
tuple
A tuple containing the offspring resulting from the crossover.
"""
# parent is ind
kwargs = {}
return tools.cxTwoPoint(parent1, parent2, **kwargs)
[docs]
@staticmethod
def operatorMutate(ind):
"""
Defines the mutation operation for an individual.
Parameters
----------
ind : Individual
The individual to mutate.
Returns
-------
tuple
A tuple containing the mutated individual.
"""
kwargs = {}
return tools.mutFlipBit(ind, kwargs)
[docs]
@staticmethod
def operatorSelect(population):
"""
Defines the selection operation for choosing individuals from the population.
Parameters
----------
population : list of Individual
The population from which to select individuals.
Returns
-------
list
A list of selected individuals.
"""
kwargs = {}
return tools.selTournament(population, **kwargs)
# -----------------------------
# Registering DEAP components
# -----------------------------
[docs]
def registerInd(self):
"""Registers the individual sampling function with the toolbox."""
self.toolbox.register("individual", self.samplingInd)
[docs]
def registerPop(self):
"""Registers the population initialization function with the toolbox."""
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
[docs]
def registerEvaluate(self):
"""Registers the evaluation function with the toolbox."""
self.toolbox.register("evaluate", self.evaluate)
[docs]
def registerOperators(self):
"""Registers the genetic operators (mate, mutate, and select) with the toolbox."""
self.toolbox.register("mate", self.operatorMate)
self.toolbox.register("mutate", self.operatorMutate)
self.toolbox.register("select", self.operatorSelect)
# -----------------------------
# Generation in NSGAII
# -----------------------------
[docs]
def evaluatePop(self, population):
"""
Evaluates the fitness of the entire population.
Parameters
----------
population : list of Individual
The population to evaluate.
"""
fitnesses = list(map(self.toolbox.evaluate, population))
for ind, fit in zip(population, fitnesses):
ind.fitness.values = fit
[docs]
def apply_genetic_operators(self, offspring):
"""
Applies the genetic operators (crossover and mutation) to the offspring.
Parameters
----------
offspring : list of Individual
The offspring to apply the genetic operators to.
"""
# it can be implemented by algorithms.varAnd
# crossover
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < self.toolbox.cxProb:
self.toolbox.mate(child1, child2)
del child1.fitness.values
del child2.fitness.values
# mutate
for mutant in offspring:
if random.random() < self.toolbox.mutateProb:
self.toolbox.mutate(mutant)
del mutant.fitness.values
[docs]
def select_next_generation(self, combined):
"""
Selects the next generation from the combined population and offspring.
Parameters
----------
combined : list of Individual
The combined population of parents and offspring.
Returns
-------
list
The selected next generation.
"""
fronts = tools.sortNondominated(combined, len(combined), first_front_only=False)
next_generation = []
for front in fronts:
if len(next_generation) + len(front) <= self.popSize:
next_generation.extend(front)
else:
# cal crowding
tools.emo.assignCrowdingDist(front)
front.sort(key=lambda ind: ind.fitness.crowding_dist, reverse=True)
next_generation.extend(front[: self.popSize - len(next_generation)])
break
return next_generation
# -----------------------------
# Save & Load
# -----------------------------
[docs]
def load_state(self):
"""
Loads the algorithm state from the specified save path if a saved state exists.
"""
if os.path.exists(self.save_path):
with open(self.save_path, "rb") as f:
state = pickle.load(f)
self.current_generation = state["current_generation"]
self.initial_population = state["initial_population"]
self.population = state["population"]
self.history = state["history"]
else:
self.population = self.toolbox.population(n=self.popSize)
self.initial_population = self.population[:]
[docs]
def save_state(self):
"""
Saves the current algorithm state (current generation, population, and history) to the specified save path.
"""
state = {
"current_generation": self.current_generation,
"population": deepcopy(self.population),
"initial_population": deepcopy(self.initial_population),
"history": deepcopy(self.history),
}
with open(self.save_path, "wb") as f:
pickle.dump(state, f)
# -----------------------------
# Print and Plot
# -----------------------------
[docs]
def print_results(self, population):
"""
Prints the results of the best individual from the final population.
Parameters
----------
population : list of Individual
The final population.
"""
best_ind = tools.selBest(population, k=1)[0]
logger.info("best_ind:", best_ind)
logger.info("fitness:", best_ind.fitness.values)
[docs]
def plot_front_pairwise(self, population, front, gen, names_plot=None, plot_dir="pareto_progress", transform_func=None):
"""Plot pairwise objective scatter for full population and first front.
Parameters
----------
population : list
Population to visualize (typically parent + offspring).
front : list
First non-dominated front.
gen : int
Generation index used in figure title.
names_plot : list of str, optional
Objective axis labels. If ``None``, labels are generated as
``obj1``, ``obj2``, ...
plot_dir : str, optional
Directory used to store figure output.
transform_func : callable, optional
Optional transformation applied to objective arrays before plotting.
"""
# check plot_dir
if not os.path.exists(plot_dir):
os.makedirs(plot_dir)
# default names if not provided
n_obj = len(population[0].fitness.values)
if names_plot is None:
names_plot = [f"obj{i+1}" for i in range(n_obj)]
# extract population and front values
pop_vals = np.array([ind.fitness.values for ind in population])
front_vals = np.array([ind.fitness.values for ind in front])
all_vals = np.vstack([pop_vals, front_vals])
valid_mask = (all_vals != -9999.0) & (all_vals != 9999.0)
all_vals_clean = all_vals[valid_mask].reshape(-1, all_vals.shape[1])
if transform_func is not None:
pop_vals = transform_func(pop_vals)
front_vals = transform_func(front_vals)
# create subplots
f, axes = plt.subplots(nrows=n_obj, ncols=n_obj, figsize=(2*n_obj, 2*n_obj),
gridspec_kw={"wspace": 0.25, "hspace": 0.25,
"left":0.1, "right": 0.95,
"bottom": 0.1, "top": 0.95}
)
for i in range(n_obj):
for j in range(n_obj):
ax = axes[i, j]
ax.scatter(pop_vals[:, j], pop_vals[:, i], color='gray', s=10, alpha=0.7, zorder=5)
ax.scatter(front_vals[:, j], front_vals[:, i], color='red', s=15, zorder=10)
# set axis labels
if i == n_obj - 1:
ax.set_xlabel(names_plot[j], fontdict={'weight':'bold'})
if j == 0:
ax.set_ylabel(names_plot[i], fontdict={'weight':'bold'})
x_min, x_max = np.percentile(all_vals_clean[:, j], [2, 100])
y_min, y_max = np.percentile(all_vals_clean[:, i], [2, 100])
pad_x = (x_max - x_min) * 0.05
pad_y = (y_max - y_min) * 0.05
ax.set_xlim((x_min-pad_x, x_max+pad_x))
ax.set_ylim((y_min-pad_y, y_max+pad_y))
plt.suptitle(f'Generation {gen}', fontsize=14, weight='bold')
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.savefig(os.path.join(plot_dir, f'NSGAII_process.png'))
plt.close()
# -----------------------------
# Run
# -----------------------------
[docs]
@clock_decorator(print_arg_ret=False)
def run(
self,
plot_progress=False,
plot_dir="pareto_progress",
names_plot=None,
transform_func=None
):
"""
Runs the NSGA-II algorithm for the specified number of generations.
This method applies genetic operators, evaluates individuals,
selects the next generation, and stores the results.
Parameters
----------
plot_progress : bool, optional
Whether to save a pairwise Pareto plot each generation.
plot_dir : str, optional
Output directory for progress plots.
names_plot : list of str, optional
Objective labels passed to :meth:`plot_front_pairwise`.
transform_func : callable, optional
Optional transform function passed to :meth:`plot_front_pairwise`.
Returns
-------
list
The final population after all generations.
"""
# evaluate population
self.evaluatePop(self.population)
# loop for generations
start_gen = self.current_generation
for gen in tqdm(
range(start_gen, self.maxGen),
desc="loop for NSGAII generation",
colour="green",
):
# current generation
self.current_generation = gen
# generate offspring
offspring = self.toolbox.select(self.population, self.popSize)
offspring = list(map(self.toolbox.clone, offspring))
# apply_genetic_operators and evaluate it
self.apply_genetic_operators(offspring)
self.evaluatePop(offspring)
# combine population and offspring
combined = self.population + offspring
# sortNondominated to get fronts (first_front_only=True, only get the first front, else all front, namely fronts)
fronts = tools.sortNondominated(
combined, len(combined), first_front_only=False
)
# calculate crowding distance only for first front for history
first_front = fronts[0]
tools.emo.assignCrowdingDist(first_front)
# save history (population and front)
self.history.append({
"population": deepcopy(self.population),
"combined_population": deepcopy(combined),
"fronts": deepcopy(fronts),
"first_front": deepcopy(first_front),
})
# save state at the end of each gen
self.save_state()
# plot
if plot_progress:
self.plot_front_pairwise(
combined, first_front, gen,
names_plot=names_plot,
plot_dir=plot_dir,
transform_func=transform_func
)
# update population: select next generation
self.population[:] = self.select_next_generation(combined)
self.print_results(self.population)
return self.population