import math
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
import numpy as np
import optuna
from optuna import distributions
from optuna._experimental import experimental
from optuna.distributions import BaseDistribution
from optuna.samplers._random import RandomSampler
from optuna.samplers._tpe.parzen_estimator import _ParzenEstimator
from optuna.samplers._tpe.parzen_estimator import _ParzenEstimatorParameters
from optuna.samplers._tpe.sampler import TPESampler
from optuna.study import StudyDirection
EPS = 1e-12
def default_gamma(x: int) -> int:
return int(np.floor(0.1 * x))
def _default_weights_above(x: int) -> np.ndarray:
return np.ones(x)
[文档]@experimental("2.4.0")
class MOTPESampler(TPESampler):
"""Multi-objective sampler using the MOTPE algorithm.
This sampler is a multiobjective version of :class:`~optuna.samplers.TPESampler`.
For further information about MOTPE algorithm, please refer to the following paper:
- `Multiobjective tree-structured parzen estimator for computationally expensive optimization
problems <https://dl.acm.org/doi/abs/10.1145/3377930.3389817>`_
Args:
consider_prior:
Enhance the stability of Parzen estimator by imposing a Gaussian prior when
:obj:`True`. The prior is only effective if the sampling distribution is
either :class:`~optuna.distributions.UniformDistribution`,
:class:`~optuna.distributions.DiscreteUniformDistribution`,
:class:`~optuna.distributions.LogUniformDistribution`,
:class:`~optuna.distributions.IntUniformDistribution`,
or :class:`~optuna.distributions.IntLogUniformDistribution`.
prior_weight:
The weight of the prior. This argument is used in
:class:`~optuna.distributions.UniformDistribution`,
:class:`~optuna.distributions.DiscreteUniformDistribution`,
:class:`~optuna.distributions.LogUniformDistribution`,
:class:`~optuna.distributions.IntUniformDistribution`,
:class:`~optuna.distributions.IntLogUniformDistribution`, and
:class:`~optuna.distributions.CategoricalDistribution`.
consider_magic_clip:
Enable a heuristic to limit the smallest variances of Gaussians used in
the Parzen estimator.
consider_endpoints:
Take endpoints of domains into account when calculating variances of Gaussians
in Parzen estimator. See the original paper for details on the heuristics
to calculate the variances.
n_startup_trials:
The random sampling is used instead of the MOTPE algorithm until the given number
of trials finish in the same study. 11 * number of variables - 1 is recommended in the
original paper.
n_ehvi_candidates:
Number of candidate samples used to calculate the expected hypervolume improvement.
gamma:
A function that takes the number of finished trials and returns the number of trials to
form a density function for samples with low grains. See the original paper for more
details.
weights_above:
A function that takes the number of finished trials and returns a weight for them. As
default, weights are automatically calculated by the MOTPE's default strategy.
seed:
Seed for random number generator.
.. note::
Initialization with Latin hypercube sampling may improve optimization performance.
However, the current implementation only supports initialization with random sampling.
Example:
.. testcode::
import optuna
seed = 128
num_variables = 2
n_startup_trials = 11 * num_variables - 1
def objective(trial):
x = []
for i in range(1, num_variables + 1):
x.append(trial.suggest_float(f"x{i}", 0.0, 2.0 * i))
return x
sampler = optuna.samplers.MOTPESampler(
n_startup_trials=n_startup_trials, n_ehvi_candidates=24, seed=seed
)
study = optuna.create_study(directions=["minimize"] * num_variables, sampler=sampler)
study.optimize(objective, n_trials=n_startup_trials + 10)
"""
def __init__(
self,
*,
consider_prior: bool = True,
prior_weight: float = 1.0,
consider_magic_clip: bool = True,
consider_endpoints: bool = True,
n_startup_trials: int = 10,
n_ehvi_candidates: int = 24,
gamma: Callable[[int], int] = default_gamma,
weights_above: Callable[[int], np.ndarray] = _default_weights_above,
seed: Optional[int] = None,
) -> None:
super().__init__(
consider_prior=consider_prior,
prior_weight=prior_weight,
consider_magic_clip=consider_magic_clip,
consider_endpoints=consider_endpoints,
n_startup_trials=n_startup_trials,
n_ei_candidates=n_ehvi_candidates,
gamma=gamma,
weights=weights_above,
seed=seed,
)
self._n_ehvi_candidates = n_ehvi_candidates
self._mo_random_sampler = RandomSampler(seed=seed)
self._split_cache: Dict[int, Any] = {}
self._weights_below: Dict[int, Any] = {}
[文档] def reseed_rng(self) -> None:
self._rng = np.random.RandomState()
self._mo_random_sampler.reseed_rng()
[文档] def infer_relative_search_space(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
) -> Dict[str, BaseDistribution]:
return {}
[文档] def sample_relative(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
search_space: Dict[str, BaseDistribution],
) -> Dict[str, Any]:
return {}
[文档] def sample_independent(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
param_name: str,
param_distribution: BaseDistribution,
) -> Any:
if len(study.directions) < 2:
raise ValueError(
"Number of objectives must be >= 2. "
"Please use optuna.samplers.TPESampler for single-objective optimization."
)
values, scores = _get_observation_pairs(study, param_name)
n = len(values)
if n < self._n_startup_trials:
return self._mo_random_sampler.sample_independent(
study, trial, param_name, param_distribution
)
below_param_values, above_param_values = self._split_mo_observation_pairs(
study, trial, values, scores
)
if isinstance(param_distribution, distributions.UniformDistribution):
return self._sample_mo_uniform(
study, trial, param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.LogUniformDistribution):
return self._sample_mo_loguniform(
study, trial, param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.DiscreteUniformDistribution):
return self._sample_mo_discrete_uniform(
study, trial, param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.IntUniformDistribution):
return self._sample_mo_int(
study, trial, param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.IntLogUniformDistribution):
return self._sample_mo_int_loguniform(
study, trial, param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.CategoricalDistribution):
index = self._sample_mo_categorical_index(
study, trial, param_distribution, below_param_values, above_param_values
)
return param_distribution.choices[index]
else:
distribution_list = [
distributions.UniformDistribution.__name__,
distributions.LogUniformDistribution.__name__,
distributions.DiscreteUniformDistribution.__name__,
distributions.IntUniformDistribution.__name__,
distributions.IntLogUniformDistribution.__name__,
distributions.CategoricalDistribution.__name__,
]
raise NotImplementedError(
"The distribution {} is not implemented. "
"The parameter distribution should be one of the {}".format(
param_distribution, distribution_list
)
)
def _split_mo_observation_pairs(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
config_vals: List[Optional[float]],
loss_vals: List[List[float]],
) -> Tuple[np.ndarray, np.ndarray]:
"""Split observations into observations for l(x) and g(x) with the ratio of gamma:1-gamma.
Weights for l(x) are also calculated in this method.
This splitting strategy consists of the following two steps:
1. Nondonation rank-based selection
2. Hypervolume subset selection problem (HSSP)-based selection
Please refer to the `original paper <https://dl.acm.org/doi/abs/10.1145/3377930.3389817>`_
for more details.
"""
cvals = np.asarray(config_vals)
lvals = np.asarray(loss_vals)
# Solving HSSP for variables number of times is a waste of time.
# We cache the result of splitting.
is_cached = trial._trial_id in self._split_cache
if is_cached:
split_cache = self._split_cache[trial._trial_id]
indices_below = np.asarray(split_cache["indices_below"])
weights_below = np.asarray(split_cache["weights_below"])
indices_above = np.asarray(split_cache["indices_above"])
else:
nondomination_ranks = _calculate_nondomination_rank(lvals)
n_below = self._gamma(len(lvals))
assert 0 <= n_below <= len(lvals)
indices = np.array(range(len(lvals)))
indices_below = np.empty(n_below, dtype=int)
# Nondomination rank-based selection
i = 0
last_idx = 0
while last_idx + sum(nondomination_ranks == i) <= n_below:
length = indices[nondomination_ranks == i].shape[0]
indices_below[last_idx : last_idx + length] = indices[nondomination_ranks == i]
last_idx += length
i += 1
# Hypervolume subset selection problem (HSSP)-based selection
subset_size = n_below - last_idx
if subset_size > 0:
rank_i_lvals = lvals[nondomination_ranks == i]
rank_i_indices = indices[nondomination_ranks == i]
worst_point = np.max(rank_i_lvals, axis=0)
reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point)
reference_point[reference_point == 0] = EPS
selected_indices = self._solve_hssp(
rank_i_lvals, rank_i_indices, subset_size, reference_point
)
indices_below[last_idx:] = selected_indices
indices_above = np.setdiff1d(indices, indices_below)
attrs = {
"indices_below": indices_below.tolist(),
"indices_above": indices_above.tolist(),
}
weights_below = self._calculate_weights_below(lvals, indices_below)
attrs["weights_below"] = weights_below.tolist()
self._split_cache[trial._trial_id] = attrs
below = cvals[indices_below]
self._weights_below[trial._trial_id] = np.asarray(
[w for w, v in zip(weights_below, below) if v is not None], dtype=float
)
below = np.asarray([v for v in below if v is not None], dtype=float)
above = cvals[indices_above]
above = np.asarray([v for v in above if v is not None], dtype=float)
return below, above
def _sample_mo_uniform(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.UniformDistribution,
below: np.ndarray,
above: np.ndarray,
) -> float:
low = distribution.low
high = distribution.high
return self._sample_mo_numerical(study, trial, low, high, below, above)
def _sample_mo_loguniform(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.LogUniformDistribution,
below: np.ndarray,
above: np.ndarray,
) -> float:
low = distribution.low
high = distribution.high
return self._sample_mo_numerical(study, trial, low, high, below, above, is_log=True)
def _sample_mo_discrete_uniform(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.DiscreteUniformDistribution,
below: np.ndarray,
above: np.ndarray,
) -> float:
q = distribution.q
r = distribution.high - distribution.low
# [low, high] is shifted to [0, r] to align sampled values at regular intervals.
low = 0 - 0.5 * q
high = r + 0.5 * q
# Shift below and above to [0, r]
above -= distribution.low
below -= distribution.low
best_sample = (
self._sample_mo_numerical(study, trial, low, high, below, above, q=q)
+ distribution.low
)
return min(max(best_sample, distribution.low), distribution.high)
def _sample_mo_int(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.IntUniformDistribution,
below: np.ndarray,
above: np.ndarray,
) -> int:
d = distributions.DiscreteUniformDistribution(
low=distribution.low, high=distribution.high, q=distribution.step
)
return int(self._sample_mo_discrete_uniform(study, trial, d, below, above))
def _sample_mo_int_loguniform(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.IntLogUniformDistribution,
below: np.ndarray,
above: np.ndarray,
) -> int:
low = distribution.low - 0.5
high = distribution.high + 0.5
sample = self._sample_mo_numerical(study, trial, low, high, below, above, is_log=True)
best_sample = np.round(sample)
return int(min(max(best_sample, distribution.low), distribution.high))
def _sample_mo_numerical(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
low: float,
high: float,
below: np.ndarray,
above: np.ndarray,
q: Optional[float] = None,
is_log: bool = False,
) -> float:
if is_log:
low = np.log(low)
high = np.log(high)
below = np.log(below)
above = np.log(above)
size = (self._n_ehvi_candidates,)
weights_below: Callable[[int], np.ndarray]
weights_below = lambda _: self._weights_below[trial._trial_id] # NOQA
parzen_estimator_parameters_below = _ParzenEstimatorParameters(
self._parzen_estimator_parameters.consider_prior,
self._parzen_estimator_parameters.prior_weight,
self._parzen_estimator_parameters.consider_magic_clip,
self._parzen_estimator_parameters.consider_endpoints,
weights_below,
)
parzen_estimator_below = _ParzenEstimator(
mus=below, low=low, high=high, parameters=parzen_estimator_parameters_below
)
samples_below = self._sample_from_gmm(
parzen_estimator=parzen_estimator_below,
low=low,
high=high,
q=q,
size=size,
)
log_likelihoods_below = self._gmm_log_pdf(
samples=samples_below,
parzen_estimator=parzen_estimator_below,
low=low,
high=high,
q=q,
)
weights_above = self._weights
parzen_estimator_parameters_above = _ParzenEstimatorParameters(
self._parzen_estimator_parameters.consider_prior,
self._parzen_estimator_parameters.prior_weight,
self._parzen_estimator_parameters.consider_magic_clip,
self._parzen_estimator_parameters.consider_endpoints,
weights_above,
)
parzen_estimator_above = _ParzenEstimator(
mus=above, low=low, high=high, parameters=parzen_estimator_parameters_above
)
log_likelihoods_above = self._gmm_log_pdf(
samples=samples_below,
parzen_estimator=parzen_estimator_above,
low=low,
high=high,
q=q,
)
ret = float(
TPESampler._compare(
samples=samples_below, log_l=log_likelihoods_below, log_g=log_likelihoods_above
)[0]
)
return math.exp(ret) if is_log else ret
def _sample_mo_categorical_index(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
distribution: distributions.CategoricalDistribution,
below: np.ndarray,
above: np.ndarray,
) -> int:
choices = distribution.choices
below = below.astype(int)
above = above.astype(int)
upper = len(choices)
size = (self._n_ehvi_candidates,)
weights_below = self._weights_below[trial._trial_id]
counts_below = np.bincount(below, minlength=upper, weights=weights_below)
weighted_below = counts_below + self._prior_weight
weighted_below /= weighted_below.sum()
samples_below = self._sample_from_categorical_dist(weighted_below, size)
log_likelihoods_below = TPESampler._categorical_log_pdf(samples_below, weighted_below)
weights_above = self._weights(len(above))
counts_above = np.bincount(above, minlength=upper, weights=weights_above)
weighted_above = counts_above + self._prior_weight
weighted_above /= weighted_above.sum()
log_likelihoods_above = TPESampler._categorical_log_pdf(samples_below, weighted_above)
return int(
TPESampler._compare(
samples=samples_below, log_l=log_likelihoods_below, log_g=log_likelihoods_above
)[0]
)
@staticmethod
def _compute_hypervolume(solution_set: np.ndarray, reference_point: np.ndarray) -> float:
return optuna.multi_objective._hypervolume.WFG().compute(solution_set, reference_point)
def _solve_hssp(
self,
rank_i_loss_vals: np.ndarray,
rank_i_indices: np.ndarray,
subset_size: int,
reference_point: np.ndarray,
) -> np.ndarray:
"""Solve a hypervolume subset selection problem (HSSP) via a greedy algorithm.
This method is a 1-1/e approximation algorithm to solve HSSP.
For further information about algorithms to solve HSSP, please refer to the following
paper:
- `Greedy Hypervolume Subset Selection in Low Dimensions
<https://ieeexplore.ieee.org/document/7570501>`_
"""
selected_vecs = [] # type: List[np.ndarray]
selected_indices = [] # type: List[int]
contributions = [
self._compute_hypervolume(np.asarray([v]), reference_point) for v in rank_i_loss_vals
]
hv_selected = 0.0
while len(selected_indices) < subset_size:
max_index = int(np.argmax(contributions))
contributions[max_index] = -1 # mark as selected
selected_index = rank_i_indices[max_index]
selected_vec = rank_i_loss_vals[max_index]
for j, v in enumerate(rank_i_loss_vals):
if contributions[j] == -1:
continue
p = np.max([selected_vec, v], axis=0)
contributions[j] -= (
self._compute_hypervolume(np.asarray(selected_vecs + [p]), reference_point)
- hv_selected
)
selected_vecs += [selected_vec]
selected_indices += [selected_index]
hv_selected = self._compute_hypervolume(np.asarray(selected_vecs), reference_point)
return np.asarray(selected_indices, dtype=int)
def _calculate_weights_below(
self,
lvals: np.ndarray,
indices_below: np.ndarray,
) -> np.ndarray:
# Calculate weights based on hypervolume contributions.
n_below = len(indices_below)
if n_below == 0:
return np.asarray([])
elif n_below == 1:
return np.asarray([1.0])
else:
lvals_below = lvals[indices_below].tolist()
worst_point = np.max(lvals_below, axis=0)
reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point)
reference_point[reference_point == 0] = EPS
hv = self._compute_hypervolume(np.asarray(lvals_below), reference_point)
contributions = np.asarray(
[
hv
- self._compute_hypervolume(
np.asarray(lvals_below[:i] + lvals_below[i + 1 :]), reference_point
)
for i in range(n_below)
]
)
contributions += EPS
weights_below = np.clip(contributions / np.max(contributions), 0, 1)
return weights_below
[文档] def after_trial(
self,
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
state: optuna.trial.TrialState,
values: Optional[Sequence[float]],
) -> None:
if trial._trial_id in self._split_cache:
del self._split_cache[trial._trial_id]
if trial._trial_id in self._weights_below:
del self._weights_below[trial._trial_id]
self._mo_random_sampler.after_trial(study, trial, state, values)
def _calculate_nondomination_rank(loss_vals: np.ndarray) -> np.ndarray:
vecs = loss_vals.copy()
# Normalize values
lb = vecs.min(axis=0, keepdims=True)
ub = vecs.max(axis=0, keepdims=True)
vecs = (vecs - lb) / (ub - lb)
ranks = np.zeros(len(vecs))
num_unranked = len(vecs)
rank = 0
while num_unranked > 0:
extended = np.tile(vecs, (vecs.shape[0], 1, 1))
counts = np.sum(
np.logical_and(
np.all(extended <= np.swapaxes(extended, 0, 1), axis=2),
np.any(extended < np.swapaxes(extended, 0, 1), axis=2),
),
axis=1,
)
vecs[counts == 0] = 1.1 # mark as ranked
ranks[counts == 0] = rank
rank += 1
num_unranked -= np.sum(counts == 0)
return ranks
def _get_observation_pairs(
study: optuna.study.Study,
param_name: str,
) -> Tuple[List[Optional[float]], List[List[float]]]:
"""Get observation pairs from the study.
This function collects observation pairs from the complete trials of the study.
Pruning is currently not supported.
The values for trials that don't contain the parameter named ``param_name`` are set to None.
Objective values are negated if their directions are maximization and all objectives are
treated as minimization in the MOTPE algorithm.
"""
trials = study.get_trials(deepcopy=False, states=(optuna.trial.TrialState.COMPLETE,))
values = []
scores = []
for trial in trials:
param_value = None # type: Optional[float]
if param_name in trial.params:
distribution = trial.distributions[param_name]
param_value = distribution.to_internal_repr(trial.params[param_name])
# Convert all objectives to minimization
score = [
cast(float, v) if d == StudyDirection.MINIMIZE else -cast(float, v)
for d, v in zip(study.directions, trial.values)
]
values.append(param_value)
scores.append(score)
return values, scores