Source code for optuna.terminator.improvement.evaluator

from __future__ import annotations

import abc
from typing import Dict
from typing import List
from typing import TYPE_CHECKING

import numpy as np

from optuna._experimental import experimental_class
from optuna.distributions import BaseDistribution
from optuna.samplers._lazy_random_state import LazyRandomState
from optuna.search_space import intersection_search_space
from optuna.study import StudyDirection
from optuna.trial import FrozenTrial
from optuna.trial import TrialState


if TYPE_CHECKING:
    from optuna._gp import acqf
    from optuna._gp import gp
    from optuna._gp import optim_sample
    from optuna._gp import prior
    from optuna._gp import search_space
else:
    from optuna._imports import _LazyImport

    gp = _LazyImport("optuna._gp.gp")
    optim_sample = _LazyImport("optuna._gp.optim_sample")
    acqf = _LazyImport("optuna._gp.acqf")
    prior = _LazyImport("optuna._gp.prior")
    search_space = _LazyImport("optuna._gp.search_space")

DEFAULT_TOP_TRIALS_RATIO = 0.5
DEFAULT_MIN_N_TRIALS = 20


def _get_beta(n_params: int, n_trials: int, delta: float = 0.1) -> float:
    # TODO(nabenabe0928): Check the original implementation to verify.
    # Especially, |D| seems to be the domain size, but not the dimension based on Theorem 1.
    beta = 2 * np.log(n_params * n_trials**2 * np.pi**2 / 6 / delta)

    # The following div is according to the original paper: "We then further scale it down
    # by a factor of 5 as defined in the experiments in
    # `Srinivas et al. (2010) <https://dl.acm.org/doi/10.5555/3104322.3104451>`_"
    beta /= 5

    return beta


[docs] @experimental_class("3.2.0") class BaseImprovementEvaluator(metaclass=abc.ABCMeta): """Base class for improvement evaluators.""" @abc.abstractmethod def evaluate( self, trials: List[FrozenTrial], study_direction: StudyDirection, ) -> float: pass
[docs] @experimental_class("3.2.0") class RegretBoundEvaluator(BaseImprovementEvaluator): """An error evaluator for upper bound on the regret with high-probability confidence. This evaluator evaluates the regret of current best solution, which defined as the difference between the objective value of the best solution and of the global optimum. To be specific, this evaluator calculates the upper bound on the regret based on the fact that empirical estimator of the objective function is bounded by lower and upper confidence bounds with high probability under the Gaussian process model assumption. Args: gp: A Gaussian process model on which evaluation base. If not specified, the default Gaussian process model is used. top_trials_ratio: A ratio of top trials to be considered when estimating the regret. Default to 0.5. min_n_trials: A minimum number of complete trials to estimate the regret. Default to 20. min_lcb_n_additional_samples: A minimum number of additional samples to estimate the lower confidence bound. Default to 2000. For further information about this evaluator, please refer to the following paper: - `Automatic Termination for Hyperparameter Optimization <https://proceedings.mlr.press/v188/makarova22a.html>`_ """ # NOQA: E501 def __init__( self, top_trials_ratio: float = DEFAULT_TOP_TRIALS_RATIO, min_n_trials: int = DEFAULT_MIN_N_TRIALS, seed: int | None = None, ) -> None: self._top_trials_ratio = top_trials_ratio self._min_n_trials = min_n_trials self._log_prior = prior.default_log_prior self._minimum_noise = prior.DEFAULT_MINIMUM_NOISE_VAR self._optimize_n_samples = 2048 self._rng = LazyRandomState(seed) def _get_top_n( self, normalized_params: np.ndarray, values: np.ndarray ) -> tuple[np.ndarray, np.ndarray]: assert len(normalized_params) == len(values) n_trials = len(normalized_params) top_n = np.clip(int(n_trials * self._top_trials_ratio), self._min_n_trials, n_trials) top_n_val = np.partition(values, n_trials - top_n)[n_trials - top_n] top_n_mask = values >= top_n_val return normalized_params[top_n_mask], values[top_n_mask] def _compute_standardized_regret_bound( self, kernel_params: gp.KernelParamsTensor, gp_search_space: search_space.SearchSpace, normalized_top_n_params: np.ndarray, standarized_top_n_values: np.ndarray, ) -> float: """ In the original paper, f(x) was intended to be minimized, but here we would like to maximize f(x). Hence, the following changes happen: 1. min(ucb) over top trials becomes max(lcb) over top trials, and 2. min(lcb) over the search space becomes max(ucb) over the search space, and 3. Regret bound becomes max(ucb) over the search space minus max(lcb) over top trials. """ n_trials, n_params = normalized_top_n_params.shape # calculate max_ucb beta = _get_beta(n_params, n_trials) ucb_acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.UCB, kernel_params=kernel_params, search_space=gp_search_space, X=normalized_top_n_params, Y=standarized_top_n_values, beta=beta, ) # UCB over the search space. (Original: LCB over the search space. See Change 1 above.) standardized_ucb_value = max( acqf.eval_acqf_no_grad(ucb_acqf_params, normalized_top_n_params).max(), optim_sample.optimize_acqf_sample( ucb_acqf_params, n_samples=self._optimize_n_samples, rng=self._rng.rng )[1], ) # calculate min_lcb lcb_acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LCB, kernel_params=kernel_params, search_space=gp_search_space, X=normalized_top_n_params, Y=standarized_top_n_values, beta=beta, ) # LCB over the top trials. (Original: UCB over the top trials. See Change 2 above.) standardized_lcb_value = np.max( acqf.eval_acqf_no_grad(lcb_acqf_params, normalized_top_n_params) ) # max(UCB) - max(LCB). (Original: min(UCB) - min(LCB). See Change 3 above.) return standardized_ucb_value - standardized_lcb_value # standardized regret bound def evaluate( self, trials: List[FrozenTrial], study_direction: StudyDirection, ) -> float: optuna_search_space = intersection_search_space(trials) self._validate_input(trials, optuna_search_space) complete_trials = [t for t in trials if t.state == TrialState.COMPLETE] # _gp module assumes that optimization direction is maximization sign = -1 if study_direction == StudyDirection.MINIMIZE else 1 values = np.array([t.value for t in complete_trials]) * sign gp_search_space, normalized_params = search_space.get_search_space_and_normalized_params( complete_trials, optuna_search_space ) normalized_top_n_params, top_n_values = self._get_top_n(normalized_params, values) top_n_values_mean = top_n_values.mean() top_n_values_std = max(1e-10, top_n_values.std()) standarized_top_n_values = (top_n_values - top_n_values_mean) / top_n_values_std kernel_params = gp.fit_kernel_params( X=normalized_top_n_params, Y=standarized_top_n_values, is_categorical=(gp_search_space.scale_types == search_space.ScaleType.CATEGORICAL), log_prior=self._log_prior, minimum_noise=self._minimum_noise, # TODO(contramundum53): Add option to specify this. deterministic_objective=False, # TODO(y0z): Add `kernel_params_cache` to speedup. initial_kernel_params=None, ) standardized_regret_bound = self._compute_standardized_regret_bound( kernel_params, gp_search_space, normalized_top_n_params, standarized_top_n_values, ) return standardized_regret_bound * top_n_values_std # regret bound @classmethod def _validate_input( cls, trials: List[FrozenTrial], search_space: Dict[str, BaseDistribution] ) -> None: if len([t for t in trials if t.state == TrialState.COMPLETE]) == 0: raise ValueError( "Because no trial has been completed yet, the regret bound cannot be evaluated." ) if len(search_space) == 0: raise ValueError( "The intersection search space is empty. This condition is not supported by " f"{cls.__name__}." )
[docs] @experimental_class("3.4.0") class BestValueStagnationEvaluator(BaseImprovementEvaluator): """Evaluates the stagnation period of the best value in an optimization process. This class is initialized with a maximum stagnation period (`max_stagnation_trials`) and is designed to evaluate the remaining trials before reaching this maximum period of allowed stagnation. If this remaining trials reach zero, the trial terminates. Therefore, the default error evaluator is instantiated by StaticErrorEvaluator(const=0). Args: max_stagnation_trials: The maximum number of trials allowed for stagnation. """ def __init__( self, max_stagnation_trials: int = 30, ) -> None: if max_stagnation_trials < 0: raise ValueError("The maximum number of stagnant trials must not be negative.") self._max_stagnation_trials = max_stagnation_trials def evaluate( self, trials: List[FrozenTrial], study_direction: StudyDirection, ) -> float: self._validate_input(trials) is_maximize_direction = True if (study_direction == StudyDirection.MAXIMIZE) else False trials = [t for t in trials if t.state == TrialState.COMPLETE] current_step = len(trials) - 1 best_step = 0 for i, trial in enumerate(trials): best_value = trials[best_step].value current_value = trial.value assert best_value is not None assert current_value is not None if is_maximize_direction and (best_value < current_value): best_step = i elif (not is_maximize_direction) and (best_value > current_value): best_step = i return self._max_stagnation_trials - (current_step - best_step) @classmethod def _validate_input( cls, trials: List[FrozenTrial], ) -> None: if len([t for t in trials if t.state == TrialState.COMPLETE]) == 0: raise ValueError( "Because no trial has been completed yet, the improvement cannot be evaluated." )