Source code for optuna.importance._ped_anova.evaluator

from __future__ import annotations

from collections import defaultdict
from typing import cast
from typing import TYPE_CHECKING

import numpy as np

from optuna._deprecated import _DEPRECATION_WARNING_TEMPLATE
from optuna._experimental import experimental_class
from optuna._warnings import optuna_warn
from optuna.importance._base import _check_evaluate_args
from optuna.importance._base import _sort_dict_by_importance
from optuna.importance._base import BaseImportanceEvaluator
from optuna.importance._ped_anova.scott_parzen_estimator import build_parzen_estimator_on_grid
from optuna.study import StudyDirection
from optuna.trial import TrialState


if TYPE_CHECKING:
    from collections.abc import Callable

    from optuna.distributions import BaseDistribution
    from optuna.study import Study
    from optuna.trial import FrozenTrial


class _QuantileFilter:
    def __init__(
        self,
        quantile: float,
        is_lower_better: bool,
        min_n_top_trials: int,
        target: Callable[[FrozenTrial], float] | None,
    ) -> None:
        assert 0 < quantile <= 1, "quantile must be in (0, 1]."
        assert min_n_top_trials > 0, "min_n_top_trials must be positive."

        self._quantile = quantile
        self._is_lower_better = is_lower_better
        self._min_n_top_trials = min_n_top_trials
        self._target = target

    def filter(self, trials: list[FrozenTrial]) -> list[FrozenTrial]:
        target, min_n_top_trials = self._target, self._min_n_top_trials
        sign = 1.0 if self._is_lower_better else -1.0
        loss_values = sign * np.asarray([t.value if target is None else target(t) for t in trials])
        err_msg = "len(trials) must be larger than or equal to min_n_top_trials"
        assert min_n_top_trials <= loss_values.size, err_msg

        def _quantile(v: np.ndarray, q: float) -> float:
            cutoff_index = int(np.ceil(q * loss_values.size)) - 1
            return float(np.partition(loss_values, cutoff_index)[cutoff_index])

        cutoff_val = max(
            np.partition(loss_values, min_n_top_trials - 1)[min_n_top_trials - 1],
            # TODO(nabenabe0928): After dropping Python3.10, replace below with
            # np.quantile(loss_values, self._quantile, method="inverted_cdf").
            _quantile(loss_values, self._quantile),
        )
        should_keep_trials = loss_values <= cutoff_val
        return [t for t, should_keep in zip(trials, should_keep_trials) if should_keep]


[docs] @experimental_class("3.6.0") class PedAnovaImportanceEvaluator(BaseImportanceEvaluator): """PED-ANOVA importance evaluator. Implements the PED-ANOVA hyperparameter importance evaluation algorithm. PED-ANOVA fits Parzen estimators of :class:`~optuna.trial.TrialState.COMPLETE` trials better than a user-specified ``target_quantile``. The importance can be interpreted as how important each hyperparameter is to get the performance better than ``target_quantile``. For further information about PED-ANOVA algorithm, please refer to the following paper: - `PED-ANOVA: Efficiently Quantifying Hyperparameter Importance in Arbitrary Subspaces <https://arxiv.org/abs/2304.10255>`__ (IJCAI 2023) For further information on how conditional parameters are handled, please refer to the following paper: - `Conditional PED-ANOVA: Hyperparameter Importance in Hierarchical & Dynamic Search Spaces <https://arxiv.org/abs/2601.20800>`__ (KDD 2026) ``target_quantile`` and ``region_quantile`` correspond to the parameters :math:`\\gamma'` and :math:`\\gamma` in the original paper, respectively. .. note:: The performance of PED-ANOVA depends on how many trials to consider above ``target_quantile``. To stabilize the analysis, it is preferable to include at least 5 trials above ``target_quantile``. .. note:: Please also refer to the original implementations: - `PED-ANOVA <https://github.com/nabenabe0928/local-anova>`__ - `condPED-ANOVA <https://github.com/kAIto47802/condPED-ANOVA>`__ Args: target_quantile: Compute the importance of achieving top-``target_quantile`` quantile objective value. For example, ``target_quantile=0.1`` means that the importances give the information of which parameters were important to achieve the top-10% performance during optimization. region_quantile: Define the region where we compute the importance. For example, ``region_quantile=0.5`` means that we compute the importance in the region where trials achieve top-50% performance. If ``region_quantile=1.0``, the importance is computed in the whole search space. baseline_quantile: Compute the importance of achieving top-``baseline_quantile`` quantile objective value. For example, ``baseline_quantile=0.1`` means that the importances give the information of which parameters were important to achieve the top-10% performance during optimization. .. warning:: Deprecated in v4.7.0. This feature will be removed in the future. The removal of this feature is currently scheduled for v5.0.0, but this schedule is subject to change. ``baseline_quantile`` is currently ignored. Use ``target_quantile`` instead. See https://github.com/optuna/optuna/releases/tag/v4.7.0. evaluate_on_local: Whether we measure the importance in the local or global space. If :obj:`True`, the importances imply how importance each parameter is during optimization. Meanwhile, ``evaluate_on_local=False`` gives the importances in the specified search_space. ``evaluate_on_local=True`` is especially useful when users modify search space during optimization. Example: An example of using PED-ANOVA is as follows: .. testcode:: import optuna from optuna.importance import PedAnovaImportanceEvaluator def objective(trial): x1 = trial.suggest_float("x1", -10, 10) x2 = trial.suggest_float("x2", -10, 10) return x1 + x2 / 1000 study = optuna.create_study() study.optimize(objective, n_trials=100) evaluator = PedAnovaImportanceEvaluator() importance = optuna.importance.get_param_importances(study, evaluator=evaluator) """ def __init__( self, *, target_quantile: float = 0.1, # gamma' in the original paper region_quantile: float = 1.0, # gamma in the original paper baseline_quantile: float | None = None, evaluate_on_local: bool = True, ) -> None: assert 0.0 < target_quantile < region_quantile <= 1.0, ( "condition 0.0 < `target_quantile` < `region_quantile` <= 1.0 must be satisfied" ) if baseline_quantile is not None: msg = _DEPRECATION_WARNING_TEMPLATE.format( name="`baseline_quantile`", d_ver="4.7.0", r_ver="5.0.0" ) optuna_warn( f"{msg} `baseline_quantile` is currently ignored. Use `target_quantile` instead.", ) if region_quantile != 1.0 and not evaluate_on_local: optuna_warn("If `evaluate_on_local` is False, `region_quantile` has no effect.") self._target_quantile = target_quantile self._region_quantile = region_quantile self._evaluate_on_local = evaluate_on_local # Advanced Setups. # Discretize a domain [low, high] as `np.linspace(low, high, n_steps)`. self._n_steps: int = 50 # Control the regularization effect by prior. self._prior_weight = 1.0 # How many `trials` must be included in `top_trials`. self._min_n_top_trials = 2 # How many `trials` must be included in each regime. self._min_n_trials_in_regime = 2 def _get_top_quantile_trials( self, study: Study, trials: list[FrozenTrial], quantile: float, target: Callable[[FrozenTrial], float] | None, ) -> list[FrozenTrial]: if quantile == 1.0: return trials is_lower_better = study.directions[0] == StudyDirection.MINIMIZE if target is not None: optuna_warn( f"{self.__class__.__name__} computes the importances of params to achieve " "low `target` values. If this is not what you want, " "please modify target, e.g., by multiplying the output by -1." ) is_lower_better = True top_trials = _QuantileFilter( quantile, is_lower_better, self._min_n_top_trials, target ).filter(trials) return top_trials def _compute_pearson_divergence( self, param_name: str, dist: BaseDistribution, target_trials: list[FrozenTrial], region_trials: list[FrozenTrial], ) -> float: # When pdf_all == pdf_top, i.e. all_trials == top_trials, this method will give 0.0. prior_weight = self._prior_weight pe_top, grid_size = build_parzen_estimator_on_grid( param_name, dist, target_trials, self._n_steps, prior_weight ) grids = np.arange(grid_size) pdf_top = pe_top.pdf({param_name: grids}) + 1e-12 if self._evaluate_on_local: # The importance of param during the study. pe_local, _ = build_parzen_estimator_on_grid( param_name, dist, region_trials, self._n_steps, prior_weight ) pdf_local = pe_local.pdf({param_name: grids}) + 1e-12 else: # The importance of param in the search space. pdf_local = np.full(grid_size, 1.0 / grid_size) return float(pdf_local @ ((pdf_top / pdf_local - 1) ** 2))
[docs] def evaluate( self, study: Study, params: list[str] | None = None, *, target: Callable[[FrozenTrial], float] | None = None, ) -> dict[str, float]: if target is None and study._is_multi_objective(): raise ValueError( "If the `study` is being used for multi-objective optimization, " "please specify the `target`. For example, use " "`target=lambda t: t.values[0]` for the first objective value. " f"{self.__class__.__name__} computes the importances of params to achieve " "low `target` values. If this is not what you want, " "please modify target, e.g., by multiplying the output by -1." ) dists = _get_distributions_list(study, params=params) if params is None: params = list(dict.fromkeys(k for d in dists for k in d)) assert params is not None trials = _get_filtered_trials(study, target=target) # The following should be tested at _get_filtered_trials. assert target is not None or max([len(t.values) for t in trials], default=1) == 1 if len(trials) <= self._min_n_top_trials: return {k: 0.0 for k in params} target_trials = self._get_top_quantile_trials(study, trials, self._target_quantile, target) region_trials = self._get_top_quantile_trials(study, trials, self._region_quantile, target) if len(target_trials) == len(region_trials): optuna_warn( "Target and region quantiles select the same set of trials. " "Parameter importances will be equal." ) if len(target_trials) == 0: return {k: 0.0 for k in params} # Theorem 4.2 and Algorithm 1 in the original paper: # https://arxiv.org/abs/2601.20800 quantile = len(target_trials) / len(region_trials) # gamma' / gamma param_importances = {k: 0.0 for k in params} target_trial_ids = set(t._trial_id for t in target_trials) for param_name in params: regime_trials = _partition_by_regime( param_name, region_trials, self._min_n_trials_in_regime ) for dist, region_trials_regime in regime_trials.items(): target_trials_regime = [ t for t in region_trials_regime if t._trial_id in target_trial_ids ] target_prob_regime = len(target_trials_regime) / len(target_trials) # alpha_i region_prob_regime = len(region_trials_regime) / len(region_trials) # beta_i if dist is not None and not dist.single() and len(target_trials_regime): param_importances[param_name] += ( target_prob_regime**2 / region_prob_regime * self._compute_pearson_divergence( param_name, dist, target_trials=target_trials_regime, region_trials=region_trials_regime, ) ) param_importances = {k: v * quantile**2 for k, v in param_importances.items()} return _sort_dict_by_importance(param_importances)
def _partition_by_regime( param_name: str, trials: list[FrozenTrial], min_n_trials_in_regime: int ) -> dict[BaseDistribution | None, list[FrozenTrial]]: # None for the inactive regime regime_trials: dict[BaseDistribution | None, list[FrozenTrial]] = defaultdict(list) for trial in trials: regime_trials[trial.distributions.get(param_name)].append(trial) # NOTE(kAIto47802): We support the domain that takes one of several discrete values depending # on the condition. However, when the domain changes smoothly, some ranges need to be merged # into the same regime to stabilize the KDE within each regime. # TODO(kAIto47802): Implement this. if any(len(v) < min_n_trials_in_regime for v in regime_trials.values()): optuna_warn( f"Some regimes for parameter `{param_name}` have less than " f"{min_n_trials_in_regime} trials. " "The importance of the parameter may be inaccurate." ) regime_trials = {k: v for k, v in regime_trials.items() if len(v) >= min_n_trials_in_regime} return regime_trials def _get_filtered_trials( study: Study, target: Callable[[FrozenTrial], float] | None ) -> list[FrozenTrial]: trials = study.get_trials(deepcopy=False, states=(TrialState.COMPLETE,)) return [ trial for trial in trials if np.isfinite(target(trial) if target is not None else cast(float, trial.value)) # TC006 ] def _get_distributions_list( study: Study, params: list[str] | None ) -> list[dict[str, BaseDistribution]]: trials = study.get_trials(deepcopy=False, states=(TrialState.COMPLETE,)) _check_evaluate_args(trials, params) params_set = set(params) if params is not None else None return [ {k: v for k, v in t.distributions.items() if params_set is None or k in params_set} for t in trials ]