jMetalPy
jMetalPy copied to clipboard
How can we save all solutions (both dominated and non-dominated)? (All function evaluations)
How can I save all dominated and non-dominated solutions ?
For population= 100, number of generations= 10, I want to save all dominated and non-dominated solutions i.e., 100 pareto solutions and 900 non-pareto solutions from a total of 1000 functional evaluations. @ajnebro
from jmetal.algorithm.multiobjective.nsgaii import NSGAII
from jmetal.operator import SBXCrossover, PolynomialMutation
from jmetal.util.termination_criterion import StoppingByEvaluations
from jmetal.util.observer import ProgressBarObserver
from jmetal.algorithm.multiobjective.smpso import SMPSO
from jmetal.operator import PolynomialMutation
from jmetal.util.archive import CrowdingDistanceArchive
from jmetal.algorithm.multiobjective.spea2 import SPEA2
from jmetal.lab.experiment import Experiment, Job, generate_summary_from_experiment
from jmetal.core.observer import Observer
from jmetal.util.observer import PlotFrontToFileObserver, WriteFrontToFileObserver
import time
problem = MyProblem()
MFES= 1000
import time
start_time1 = time.process_time()
def configure_experiment(problems: dict, n_run: int):
jobs = []
for run in range(n_run):
for problem_tag, problem in problems.items():
jobs.append(
Job(
algorithm = NSGAII(
problem=problem,
population_size=100,
offspring_population_size=100,
mutation=PolynomialMutation(probability=0.01, distribution_index=20),
crossover=SBXCrossover(probability=0.9, distribution_index=15),
termination_criterion=StoppingByEvaluations(max_evaluations=MFES)),
algorithm_tag='NSGAII',
problem_tag='My_Problem',
run=run,
)
)
return jobs
if __name__ == '__main__':
# Configure the experiments
jobs = configure_experiment(problems={'My_Problem': MyProblem()}, n_run=5)
# Run the study
output_directory = 'P100G10'
experiment = Experiment(output_dir=output_directory, jobs=jobs)
experiment.run()
print("--- %s seconds ---" % (time.process_time() - start_time1))
@TimJay @Juanjdurillo @ajnebro @Canicio @cbarba
There are several ways to cope with this. For example, you could create a subclass of NSGAII overriding the evaluate()
method in this way:
def evaluate(self, solution_list: List[S], global_solution_list:List[S]) -> List[S]:
# Call the evaluate method of the super class and add the evaluated list to global_solution_list
You can create the global solution list before running the algorithm, and write its contents when the algorithm finishes.
I receive error "NameError: name 'List' is not defined". Can you please give an example of how to include this? Also, how can I make changes to get the list of solutions both dominated and non-dominated for SMPSO?
For SMPSO, I changed the evaluate method to the following:
def evaluate(self, solution_list: List[FloatSolution], global_solution_list:List[FloatSolution]) -> List[FloatSolution]:
return self.swarm_evaluator.evaluate(solution_list, global_solution_list, self.problem)
But, how can I access the global solution list containing both dominated and non-dominated points? For getting only dominated points we use the following code:
solutions = algorithm.get_result()
The entire code for SMPSO is given below:
Part 1:
import random
import threading
from copy import copy
from math import sqrt
from typing import TypeVar, List, Optional
import numpy
from jmetal.config import store
from jmetal.core.algorithm import ParticleSwarmOptimization, DynamicAlgorithm
from jmetal.core.operator import Mutation
from jmetal.core.problem import FloatProblem, DynamicProblem
from jmetal.core.solution import FloatSolution
from jmetal.util.archive import BoundedArchive, ArchiveWithReferencePoint
from jmetal.util.comparator import DominanceComparator
from jmetal.util.evaluator import Evaluator
from jmetal.util.generator import Generator
from jmetal.util.termination_criterion import TerminationCriterion
R = TypeVar('R')
"""
.. module:: SMPSO
:platform: Unix, Windows
:synopsis: Implementation of SMPSO.
.. moduleauthor:: Antonio Benítez-Hidalgo <[email protected]>
"""
class SMPSO(ParticleSwarmOptimization):
def __init__(self,
problem: FloatProblem,
swarm_size: int,
mutation: Mutation,
leaders: Optional[BoundedArchive],
termination_criterion: TerminationCriterion = store.default_termination_criteria,
swarm_generator: Generator = store.default_generator,
swarm_evaluator: Evaluator = store.default_evaluator):
""" This class implements the SMPSO algorithm as described in
* SMPSO: A new PSO-based metaheuristic for multi-objective optimization
* MCDM 2009. DOI: `<http://dx.doi.org/10.1109/MCDM.2009.4938830/>`_.
The implementation of SMPSO provided in jMetalPy follows the algorithm template described in the algorithm
templates section of the documentation.
:param problem: The problem to solve.
:param swarm_size: Size of the swarm.
:param max_evaluations: Maximum number of evaluations/iterations.
:param mutation: Mutation operator (see :py:mod:`jmetal.operator.mutation`).
:param leaders: Archive for leaders.
"""
super(SMPSO, self).__init__(
problem=problem,
swarm_size=swarm_size)
self.swarm_generator = swarm_generator
self.swarm_evaluator = swarm_evaluator
self.termination_criterion = termination_criterion
self.observable.register(termination_criterion)
self.mutation_operator = mutation
self.leaders = leaders
self.c1_min = 1.5
self.c1_max = 2.5
self.c2_min = 1.5
self.c2_max = 2.5
self.r1_min = 0.0
self.r1_max = 1.0
self.r2_min = 0.0
self.r2_max = 1.0
self.min_weight = 0.1
self.max_weight = 0.1
self.change_velocity1 = -1
self.change_velocity2 = -1
self.dominance_comparator = DominanceComparator()
self.speed = numpy.zeros((self.swarm_size, self.problem.number_of_variables), dtype=float)
self.delta_max, self.delta_min = numpy.empty(problem.number_of_variables), \
numpy.empty(problem.number_of_variables)
def create_initial_solutions(self) -> List[FloatSolution]:
return [self.swarm_generator.new(self.problem) for _ in range(self.swarm_size)]
def evaluate(self, solution_list: List[FloatSolution], global_solution_list:List[FloatSolution]) -> List[FloatSolution]:
return self.swarm_evaluator.evaluate(solution_list, global_solution_list, self.problem)
def stopping_condition_is_met(self) -> bool:
return self.termination_criterion.is_met
def initialize_global_best(self, swarm: List[FloatSolution]) -> None:
for particle in swarm:
self.leaders.add(copy(particle))
def initialize_particle_best(self, swarm: List[FloatSolution]) -> None:
for particle in swarm:
particle.attributes['local_best'] = copy(particle)
def initialize_velocity(self, swarm: List[FloatSolution]) -> None:
for i in range(self.problem.number_of_variables):
self.delta_max[i] = (self.problem.upper_bound[i] - self.problem.lower_bound[i]) / 2.0
self.delta_min = -1.0 * self.delta_max
def update_velocity(self, swarm: List[FloatSolution]) -> None:
for i in range(self.swarm_size):
best_particle = copy(swarm[i].attributes['local_best'])
best_global = self.select_global_best()
r1 = round(random.uniform(self.r1_min, self.r1_max), 1)
r2 = round(random.uniform(self.r2_min, self.r2_max), 1)
c1 = round(random.uniform(self.c1_min, self.c1_max), 1)
c2 = round(random.uniform(self.c2_min, self.c2_max), 1)
wmax = self.max_weight
wmin = self.min_weight
for var in range(swarm[i].number_of_variables):
self.speed[i][var] = \
self.__velocity_constriction(
self.__constriction_coefficient(c1, c2) *
((self.__inertia_weight(wmax)
* self.speed[i][var])
+ (c1 * r1 * (best_particle.variables[var] - swarm[i].variables[var]))
+ (c2 * r2 * (best_global.variables[var] - swarm[i].variables[var]))
),
self.delta_max, self.delta_min, var)
def update_position(self, swarm: List[FloatSolution]) -> None:
for i in range(self.swarm_size):
particle = swarm[i]
for j in range(particle.number_of_variables):
particle.variables[j] += self.speed[i][j]
if particle.variables[j] < self.problem.lower_bound[j]:
particle.variables[j] = self.problem.lower_bound[j]
self.speed[i][j] *= self.change_velocity1
if particle.variables[j] > self.problem.upper_bound[j]:
particle.variables[j] = self.problem.upper_bound[j]
self.speed[i][j] *= self.change_velocity2
def update_global_best(self, swarm: List[FloatSolution]) -> None:
for particle in swarm:
self.leaders.add(copy(particle))
def update_particle_best(self, swarm: List[FloatSolution]) -> None:
for i in range(self.swarm_size):
flag = self.dominance_comparator.compare(
swarm[i],
swarm[i].attributes['local_best'])
if flag != 1:
swarm[i].attributes['local_best'] = copy(swarm[i])
def perturbation(self, swarm: List[FloatSolution]) -> None:
for i in range(self.swarm_size):
if (i % 6) == 0:
self.mutation_operator.execute(swarm[i])
def select_global_best(self) -> FloatSolution:
leaders = self.leaders.solution_list
if len(leaders) > 2:
particles = random.sample(leaders, 2)
if self.leaders.comparator.compare(particles[0], particles[1]) < 1:
best_global = copy(particles[0])
else:
best_global = copy(particles[1])
else:
best_global = copy(self.leaders.solution_list[0])
return best_global
def __velocity_constriction(self, value: float, delta_max: [], delta_min: [], variable_index: int) -> float:
result = value
if value > delta_max[variable_index]:
result = delta_max[variable_index]
if value < delta_min[variable_index]:
result = delta_min[variable_index]
return result
def __inertia_weight(self, wmax: float):
return wmax
def __constriction_coefficient(self, c1: float, c2: float) -> float:
rho = c1 + c2
if rho <= 4:
result = 1.0
else:
result = 2.0 / (2.0 - rho - sqrt(pow(rho, 2.0) - 4.0 * rho))
return result
def init_progress(self) -> None:
self.evaluations = self.swarm_size
self.leaders.compute_density_estimator()
self.initialize_velocity(self.solutions)
self.initialize_particle_best(self.solutions)
self.initialize_global_best(self.solutions)
def update_progress(self) -> None:
self.evaluations += self.swarm_size
self.leaders.compute_density_estimator()
observable_data = self.get_observable_data()
observable_data['SOLUTIONS'] = self.leaders.solution_list
self.observable.notify_all(**observable_data)
def get_result(self) -> List[FloatSolution]:
return self.leaders.solution_list
def get_name(self) -> str:
return 'SMPSO'
Part 2:
import os
import sys
iter = 1
maxIter = 1
while (iter <= maxIter):
from jmetal.operator import SBXCrossover, PolynomialMutation
from jmetal.algorithm.multiobjective.smpso import SMPSO
from jmetal.operator import PolynomialMutation
from jmetal.util.archive import CrowdingDistanceArchive
from jmetal.util.termination_criterion import StoppingByEvaluations
problem = MyProblem()
MFES= 50000
algorithm = SMPSO(
problem=problem,
swarm_size=1000,
mutation=PolynomialMutation(probability=0.01, distribution_index=20),
leaders=CrowdingDistanceArchive(1000),
termination_criterion=StoppingByEvaluations(max_evaluations=MFES)
)
algorithm.run()
solutions = algorithm.get_result()
from jmetal.util.solution import get_non_dominated_solutions
front = get_non_dominated_solutions(solutions)
iter += 1
print('Loop ended.')
@TimJay @Juanjdurillo @ajnebro @Canicio @cbarba
This is something I was also looking for. For the NSGAII algorithm. I do not fully understand the solution to overwrite the evaluate function. Could someone maybe elaborate a little? It could very well be due to my coding skills. Any help would be appreciated.