Source code for optuna.terminator.improvement.emmr

from __future__ import annotations

import math
import sys
from typing import cast
from typing import TYPE_CHECKING
import warnings

import numpy as np

from optuna._experimental import experimental_class
from optuna.samplers._lazy_random_state import LazyRandomState
from optuna.search_space import intersection_search_space
from optuna.study import StudyDirection
from optuna.terminator.improvement.evaluator import _compute_standardized_regret_bound
from optuna.terminator.improvement.evaluator import BaseImprovementEvaluator
from optuna.trial import FrozenTrial
from optuna.trial import TrialState


if TYPE_CHECKING:
    import scipy.stats as scipy_stats
    import torch

    from optuna._gp import acqf
    from optuna._gp import gp
    from optuna._gp import prior
    from optuna._gp import search_space as gp_search_space
else:
    from optuna._imports import _LazyImport

    torch = _LazyImport("torch")
    gp = _LazyImport("optuna._gp.gp")
    acqf = _LazyImport("optuna._gp.acqf")
    prior = _LazyImport("optuna._gp.prior")
    gp_search_space = _LazyImport("optuna._gp.search_space")
    scipy_stats = _LazyImport("scipy.stats")

MARGIN_FOR_NUMARICAL_STABILITY = 0.1


[docs] @experimental_class("4.0.0") class EMMREvaluator(BaseImprovementEvaluator): """Evaluates a kind of regrets, called the Expected Minimum Model Regret(EMMR). EMMR is an upper bound of "expected minimum simple regret" in the optimization process. Expected minimum simple regret is a quantity that converges to zero only if the optimization process has found the global optima. For further information about expected minimum simple regret and the algorithm, please refer to the following paper: - `A stopping criterion for Bayesian optimization by the gap of expected minimum simple regrets <https://proceedings.mlr.press/v206/ishibashi23a.html>`__ Also, there is our blog post explaining this evaluator: - `Introducing A New Terminator: Early Termination of Black-box Optimization Based on Expected Minimum Model Regret <https://medium.com/optuna/introducing-a-new-terminator-early-termination-of-black-box-optimization-based-on-expected-9a660774fcdb>`__ Args: deterministic_objective: A boolean value which indicates whether the objective function is deterministic. Default is :obj:`False`. delta: A float number related to the criterion for termination. Default to 0.1. For further information about this parameter, please see the aforementioned paper. min_n_trials: A minimum number of complete trials to compute the criterion. Default to 2. seed: A random seed for EMMREvaluator. Example: .. testcode:: import optuna from optuna.terminator import EMMREvaluator from optuna.terminator import MedianErrorEvaluator from optuna.terminator import Terminator sampler = optuna.samplers.TPESampler(seed=0) study = optuna.create_study(sampler=sampler, direction="minimize") emmr_improvement_evaluator = EMMREvaluator() median_error_evaluator = MedianErrorEvaluator(emmr_improvement_evaluator) terminator = Terminator( improvement_evaluator=emmr_improvement_evaluator, error_evaluator=median_error_evaluator, ) for i in range(1000): trial = study.ask() ys = [trial.suggest_float(f"x{i}", -10.0, 10.0) for i in range(5)] value = sum(ys[i] ** 2 for i in range(5)) study.tell(trial, value) if terminator.should_terminate(study): # Terminated by Optuna Terminator! break """ def __init__( self, deterministic_objective: bool = False, delta: float = 0.1, min_n_trials: int = 2, seed: int | None = None, ) -> None: if min_n_trials <= 1 or not np.isfinite(min_n_trials): raise ValueError("`min_n_trials` is expected to be a finite integer more than one.") self._deterministic = deterministic_objective self._delta = delta self.min_n_trials = min_n_trials self._rng = LazyRandomState(seed) def evaluate(self, trials: list[FrozenTrial], study_direction: StudyDirection) -> float: optuna_search_space = intersection_search_space(trials) complete_trials = [t for t in trials if t.state == TrialState.COMPLETE] if len(complete_trials) < self.min_n_trials: return sys.float_info.max * MARGIN_FOR_NUMARICAL_STABILITY # Do not terminate. search_space, normalized_params = gp_search_space.get_search_space_and_normalized_params( complete_trials, optuna_search_space ) if len(search_space.scale_types) == 0: warnings.warn( f"{self.__class__.__name__} cannot consider any search space." "Termination will never occur in this study." ) return sys.float_info.max * MARGIN_FOR_NUMARICAL_STABILITY # Do not terminate. len_trials = len(complete_trials) len_params = len(search_space.scale_types) assert normalized_params.shape == (len_trials, len_params) # _gp module assumes that optimization direction is maximization sign = -1 if study_direction == StudyDirection.MINIMIZE else 1 score_vals = np.array([cast(float, t.value) for t in complete_trials]) * sign if np.any(~np.isfinite(score_vals)): warnings.warn( f"{self.__class__.__name__} cannot handle infinite values." "Those values are clamped to worst/best finite value." ) finite_score_vals = score_vals[np.isfinite(score_vals)] best_finite_score = np.max(finite_score_vals, initial=0.0) worst_finite_score = np.min(finite_score_vals, initial=0.0) score_vals = np.clip(score_vals, worst_finite_score, best_finite_score) standarized_score_vals = (score_vals - score_vals.mean()) / max( sys.float_info.min, score_vals.std() ) assert len(standarized_score_vals) == len(normalized_params) kernel_params_t1 = gp.fit_kernel_params( # Fit kernel with up to (t-1)-th observation X=normalized_params[..., :-1, :], Y=standarized_score_vals[:-1], is_categorical=(search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL), log_prior=prior.default_log_prior, minimum_noise=prior.DEFAULT_MINIMUM_NOISE_VAR, initial_kernel_params=None, deterministic_objective=self._deterministic, ) kernel_params_t = gp.fit_kernel_params( # Fit kernel with up to t-th observation X=normalized_params, Y=standarized_score_vals, is_categorical=(search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL), log_prior=prior.default_log_prior, minimum_noise=prior.DEFAULT_MINIMUM_NOISE_VAR, initial_kernel_params=kernel_params_t1, deterministic_objective=self._deterministic, ) theta_t_star_index = int(np.argmax(standarized_score_vals)) theta_t1_star_index = int(np.argmax(standarized_score_vals[:-1])) theta_t_star = normalized_params[theta_t_star_index, :] theta_t1_star = normalized_params[theta_t1_star_index, :] cov_t_between_theta_t_star_and_theta_t1_star = _compute_gp_posterior_cov_two_thetas( search_space, normalized_params, standarized_score_vals, kernel_params_t, theta_t_star_index, theta_t1_star_index, ) mu_t1_theta_t_with_nu_t, variance_t1_theta_t_with_nu_t = _compute_gp_posterior( search_space, normalized_params[:-1, :], standarized_score_vals[:-1], normalized_params[-1, :], kernel_params_t, # Use kernel_params_t instead of kernel_params_t1. # Use "t" under the assumption that "t" and "t1" are approximately the same. # This is because kernel should same when computing KLD. # For detailed information, please see section 4.4 of the paper: # https://proceedings.mlr.press/v206/ishibashi23a/ishibashi23a.pdf ) _, variance_t_theta_t1_star = _compute_gp_posterior( search_space, normalized_params, standarized_score_vals, theta_t1_star, kernel_params_t, ) mu_t_theta_t_star, variance_t_theta_t_star = _compute_gp_posterior( search_space, normalized_params, standarized_score_vals, theta_t_star, kernel_params_t, ) mu_t1_theta_t1_star, _ = _compute_gp_posterior( search_space, normalized_params[:-1, :], standarized_score_vals[:-1], theta_t1_star, kernel_params_t1, ) y_t = standarized_score_vals[-1] kappa_t1 = _compute_standardized_regret_bound( kernel_params_t1, search_space, normalized_params[:-1, :], standarized_score_vals[:-1], self._delta, rng=self._rng.rng, ) theorem1_delta_mu_t_star = mu_t1_theta_t1_star - mu_t_theta_t_star alg1_delta_r_tilde_t_term1 = theorem1_delta_mu_t_star theorem1_v = math.sqrt( max( 1e-10, variance_t_theta_t_star - 2.0 * cov_t_between_theta_t_star_and_theta_t1_star + variance_t_theta_t1_star, ) ) theorem1_g = (mu_t_theta_t_star - mu_t1_theta_t1_star) / theorem1_v alg1_delta_r_tilde_t_term2 = theorem1_v * scipy_stats.norm.pdf(theorem1_g) alg1_delta_r_tilde_t_term3 = theorem1_v * theorem1_g * scipy_stats.norm.cdf(theorem1_g) _lambda = prior.DEFAULT_MINIMUM_NOISE_VAR**-1 eq4_rhs_term1 = 0.5 * math.log(1.0 + _lambda * variance_t1_theta_t_with_nu_t) eq4_rhs_term2 = ( -0.5 * variance_t1_theta_t_with_nu_t / (variance_t1_theta_t_with_nu_t + _lambda**-1) ) eq4_rhs_term3 = ( 0.5 * variance_t1_theta_t_with_nu_t * (y_t - mu_t1_theta_t_with_nu_t) ** 2 / (variance_t1_theta_t_with_nu_t + _lambda**-1) ** 2 ) alg1_delta_r_tilde_t_term4 = kappa_t1 * math.sqrt( 0.5 * (eq4_rhs_term1 + eq4_rhs_term2 + eq4_rhs_term3) ) return min( sys.float_info.max * 0.5, alg1_delta_r_tilde_t_term1 + alg1_delta_r_tilde_t_term2 + alg1_delta_r_tilde_t_term3 + alg1_delta_r_tilde_t_term4, )
def _compute_gp_posterior( search_space: gp_search_space.SearchSpace, X: np.ndarray, Y: np.ndarray, x_params: np.ndarray, kernel_params: gp.KernelParamsTensor, ) -> tuple[float, float]: # mean, var acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_EI, kernel_params=kernel_params, search_space=search_space, X=X, # normalized_params[..., :-1, :], Y=Y, # standarized_score_vals[:-1], ) mean, var = gp.posterior( acqf_params.kernel_params, torch.from_numpy(acqf_params.X), torch.from_numpy( acqf_params.search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL ), torch.from_numpy(acqf_params.cov_Y_Y_inv), torch.from_numpy(acqf_params.cov_Y_Y_inv_Y), torch.from_numpy(x_params), # best_params or normalized_params[..., -1, :]), ) mean = mean.detach().numpy().flatten() var = var.detach().numpy().flatten() assert len(mean) == 1 and len(var) == 1 return float(mean[0]), float(var[0]) def _posterior_of_batched_theta( kernel_params: gp.KernelParamsTensor, X: torch.Tensor, # [len(trials), len(params)] is_categorical: torch.Tensor, # bool[len(params)] cov_Y_Y_inv: torch.Tensor, # [len(trials), len(trials)] cov_Y_Y_inv_Y: torch.Tensor, # [len(trials)] theta: torch.Tensor, # [batch, len(params)] ) -> tuple[torch.Tensor, torch.Tensor]: # (mean: [(batch,)], var: [(batch,batch)]) assert len(X.shape) == 2 len_trials, len_params = X.shape assert len(theta.shape) == 2 len_batch = theta.shape[0] assert theta.shape == (len_batch, len_params) assert is_categorical.shape == (len_params,) assert cov_Y_Y_inv.shape == (len_trials, len_trials) assert cov_Y_Y_inv_Y.shape == (len_trials,) cov_ftheta_fX = gp.kernel(is_categorical, kernel_params, theta[..., None, :], X)[..., 0, :] assert cov_ftheta_fX.shape == (len_batch, len_trials) cov_ftheta_ftheta = gp.kernel(is_categorical, kernel_params, theta[..., None, :], theta)[ ..., 0, : ] assert cov_ftheta_ftheta.shape == (len_batch, len_batch) assert torch.allclose(cov_ftheta_ftheta.diag(), gp.kernel_at_zero_distance(kernel_params)) assert torch.allclose(cov_ftheta_ftheta, cov_ftheta_ftheta.T) mean = cov_ftheta_fX @ cov_Y_Y_inv_Y assert mean.shape == (len_batch,) var = cov_ftheta_ftheta - cov_ftheta_fX @ cov_Y_Y_inv @ cov_ftheta_fX.T assert var.shape == (len_batch, len_batch) # We need to clamp the variance to avoid negative values due to numerical errors. return mean, torch.clamp(var, min=0.0) def _compute_gp_posterior_cov_two_thetas( search_space: gp_search_space.SearchSpace, normalized_params: np.ndarray, standarized_score_vals: np.ndarray, kernel_params: gp.KernelParamsTensor, theta1_index: int, theta2_index: int, ) -> float: # cov if theta1_index == theta2_index: return _compute_gp_posterior( search_space, normalized_params, standarized_score_vals, normalized_params[theta1_index], kernel_params, )[1] assert normalized_params.shape[0] == standarized_score_vals.shape[0] acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_EI, kernel_params=kernel_params, search_space=search_space, X=normalized_params, Y=standarized_score_vals, ) _, var = _posterior_of_batched_theta( acqf_params.kernel_params, torch.from_numpy(acqf_params.X), torch.from_numpy( acqf_params.search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL ), torch.from_numpy(acqf_params.cov_Y_Y_inv), torch.from_numpy(acqf_params.cov_Y_Y_inv_Y), torch.from_numpy(normalized_params[[theta1_index, theta2_index]]), ) assert var.shape == (2, 2) var = var.detach().numpy()[0, 1] return float(var)