Source code for optuna.samplers._gp.sampler

from __future__ import annotations

from collections.abc import Callable
from collections.abc import Sequence
from typing import Any
from typing import cast
from typing import TYPE_CHECKING
import warnings

import numpy as np

import optuna
from optuna._experimental import experimental_class
from optuna._experimental import warn_experimental_argument
from optuna.distributions import BaseDistribution
from optuna.samplers._base import _CONSTRAINTS_KEY
from optuna.samplers._base import _process_constraints_after_trial
from optuna.samplers._base import BaseSampler
from optuna.samplers._lazy_random_state import LazyRandomState
from optuna.study import StudyDirection
from optuna.trial import FrozenTrial
from optuna.trial import TrialState


if TYPE_CHECKING:
    import torch

    import optuna._gp.acqf as acqf
    import optuna._gp.gp as gp
    import optuna._gp.optim_mixed as optim_mixed
    import optuna._gp.prior as prior
    import optuna._gp.search_space as gp_search_space
    from optuna.study import Study
else:
    from optuna._imports import _LazyImport

    torch = _LazyImport("torch")
    gp_search_space = _LazyImport("optuna._gp.search_space")
    gp = _LazyImport("optuna._gp.gp")
    optim_mixed = _LazyImport("optuna._gp.optim_mixed")
    acqf = _LazyImport("optuna._gp.acqf")
    prior = _LazyImport("optuna._gp.prior")


EPS = 1e-10


[docs] @experimental_class("3.6.0") class GPSampler(BaseSampler): """Sampler using Gaussian process-based Bayesian optimization. This sampler fits a Gaussian process (GP) to the objective function and optimizes the acquisition function to suggest the next parameters. The current implementation uses: - Matern kernel with nu=2.5 (twice differentiable), - Automatic relevance determination (ARD) for the length scale of each parameter, - Gamma prior for inverse squared lengthscales, kernel scale, and noise variance, - Log Expected Improvement (logEI) as the acquisition function, and - Quasi-Monte Carlo (QMC) sampling to optimize the acquisition function. .. note:: This sampler requires ``scipy`` and ``torch``. You can install these dependencies with ``pip install scipy torch``. Args: seed: Random seed to initialize internal random number generator. Defaults to :obj:`None` (a seed is picked randomly). independent_sampler: Sampler used for initial sampling (for the first ``n_startup_trials`` trials) and for conditional parameters. Defaults to :obj:`None` (a random sampler with the same ``seed`` is used). n_startup_trials: Number of initial trials. Defaults to 10. deterministic_objective: Whether the objective function is deterministic or not. If :obj:`True`, the sampler will fix the noise variance of the surrogate model to the minimum value (slightly above 0 to ensure numerical stability). Defaults to :obj:`False`. """ def __init__( self, *, seed: int | None = None, independent_sampler: BaseSampler | None = None, n_startup_trials: int = 10, deterministic_objective: bool = False, constraints_func: Callable[[FrozenTrial], Sequence[float]] | None = None, ) -> None: self._rng = LazyRandomState(seed) self._independent_sampler = independent_sampler or optuna.samplers.RandomSampler(seed=seed) self._intersection_search_space = optuna.search_space.IntersectionSearchSpace() self._n_startup_trials = n_startup_trials self._log_prior: "Callable[[gp.KernelParamsTensor], torch.Tensor]" = ( prior.default_log_prior ) self._minimum_noise: float = prior.DEFAULT_MINIMUM_NOISE_VAR # We cache the kernel parameters for initial values of fitting the next time. self._kernel_params_cache: "gp.KernelParamsTensor | None" = None self._constraints_kernel_params_cache: "list[gp.KernelParamsTensor] | None" = None self._optimize_n_samples: int = 2048 self._deterministic = deterministic_objective self._constraints_func = constraints_func if constraints_func is not None: warn_experimental_argument("constraints_func")
[docs] def reseed_rng(self) -> None: self._rng.rng.seed() self._independent_sampler.reseed_rng()
[docs] def infer_relative_search_space( self, study: Study, trial: FrozenTrial ) -> dict[str, BaseDistribution]: search_space = {} for name, distribution in self._intersection_search_space.calculate(study).items(): if distribution.single(): continue search_space[name] = distribution return search_space
def _optimize_acqf( self, acqf_params: "acqf.AcquisitionFunctionParams", best_params: np.ndarray | None, ) -> np.ndarray: # Advanced users can override this method to change the optimization algorithm. # However, we do not make any effort to keep backward compatibility between versions. # Particularly, we may remove this function in future refactoring. normalized_params, _acqf_val = optim_mixed.optimize_acqf_mixed( acqf_params, warmstart_normalized_params_array=( None if best_params is None else best_params[None, :] ), n_preliminary_samples=2048, n_local_search=10, tol=1e-4, rng=self._rng.rng, ) return normalized_params def _get_constraints_acqf_params( self, constraint_vals: np.ndarray, internal_search_space: gp_search_space.SearchSpace, normalized_params: np.ndarray, ) -> list[acqf.AcquisitionFunctionParams]: constraint_vals = _warn_and_convert_inf(constraint_vals) means = np.mean(constraint_vals, axis=0) stds = np.std(constraint_vals, axis=0) standardized_constraint_vals = (constraint_vals - means) / np.maximum(EPS, stds) if self._kernel_params_cache is not None and len( self._kernel_params_cache.inverse_squared_lengthscales ) != len(internal_search_space.scale_types): # Clear cache if the search space changes. self._constraints_kernel_params_cache = None is_categorical = internal_search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL constraints_kernel_params = [] constraints_acqf_params = [] for i, (vals, mean, std) in enumerate(zip(standardized_constraint_vals.T, means, stds)): cache = ( self._constraints_kernel_params_cache and self._constraints_kernel_params_cache[i] ) assert isinstance(cache, gp.KernelParamsTensor) or cache is None kernel_params = gp.fit_kernel_params( X=normalized_params, Y=vals, is_categorical=is_categorical, log_prior=self._log_prior, minimum_noise=self._minimum_noise, initial_kernel_params=cache, deterministic_objective=self._deterministic, ) constraints_kernel_params.append(kernel_params) constraints_acqf_params.append( acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_PI, kernel_params=kernel_params, search_space=internal_search_space, X=normalized_params, Y=vals, # Since 0 is the threshold value, we use the normalized value of 0. max_Y=-mean / max(EPS, std), ) ) self._constraints_kernel_params_cache = constraints_kernel_params return constraints_acqf_params
[docs] def sample_relative( self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution] ) -> dict[str, Any]: self._raise_error_if_multi_objective(study) if search_space == {}: return {} states = (TrialState.COMPLETE,) trials = study._get_trials(deepcopy=False, states=states, use_cache=True) if len(trials) < self._n_startup_trials: return {} ( internal_search_space, normalized_params, ) = gp_search_space.get_search_space_and_normalized_params(trials, search_space) _sign = -1.0 if study.direction == StudyDirection.MINIMIZE else 1.0 score_vals = np.array([_sign * cast(float, trial.value) for trial in trials]) score_vals = _warn_and_convert_inf(score_vals) standardized_score_vals = (score_vals - np.mean(score_vals)) / max(EPS, np.std(score_vals)) if self._kernel_params_cache is not None and len( self._kernel_params_cache.inverse_squared_lengthscales ) != len(internal_search_space.scale_types): # Clear cache if the search space changes. self._kernel_params_cache = None kernel_params = gp.fit_kernel_params( X=normalized_params, Y=standardized_score_vals, is_categorical=( internal_search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL ), log_prior=self._log_prior, minimum_noise=self._minimum_noise, initial_kernel_params=self._kernel_params_cache, deterministic_objective=self._deterministic, ) self._kernel_params_cache = kernel_params best_params: np.ndarray | None if self._constraints_func is None: acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_EI, kernel_params=kernel_params, search_space=internal_search_space, X=normalized_params, Y=standardized_score_vals, ) best_params = normalized_params[np.argmax(standardized_score_vals), :] else: constraint_vals, is_feasible = _get_constraint_vals_and_feasibility(study, trials) is_all_infeasible = not np.any(is_feasible) # TODO(kAIto47802): If is_all_infeasible, the acquisition function for the objective # function is ignored, so skipping the computation of kernel_params and acqf_params # can improve speed. # TODO(kAIto47802): Consider the case where all trials are feasible. We can ignore # constraints in this case. max_Y = -np.inf if is_all_infeasible else np.max(standardized_score_vals[is_feasible]) acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_EI, kernel_params=kernel_params, search_space=internal_search_space, X=normalized_params, Y=standardized_score_vals, max_Y=max_Y, ) constraints_acqf_params = self._get_constraints_acqf_params( constraint_vals, internal_search_space, normalized_params ) acqf_params = acqf.ConstrainedAcquisitionFunctionParams.from_acqf_params( acqf_params, constraints_acqf_params ) best_params = ( None if is_all_infeasible else normalized_params[np.argmax(standardized_score_vals[is_feasible]), :] ) normalized_param = self._optimize_acqf(acqf_params, best_params) return gp_search_space.get_unnormalized_param(search_space, normalized_param)
[docs] def sample_independent( self, study: Study, trial: FrozenTrial, param_name: str, param_distribution: BaseDistribution, ) -> Any: self._raise_error_if_multi_objective(study) return self._independent_sampler.sample_independent( study, trial, param_name, param_distribution )
[docs] def before_trial(self, study: Study, trial: FrozenTrial) -> None: self._independent_sampler.before_trial(study, trial)
[docs] def after_trial( self, study: Study, trial: FrozenTrial, state: TrialState, values: Sequence[float] | None, ) -> None: if self._constraints_func is not None: _process_constraints_after_trial(self._constraints_func, study, trial, state) self._independent_sampler.after_trial(study, trial, state, values)
def _warn_and_convert_inf( values: np.ndarray, ) -> np.ndarray: if np.any(~np.isfinite(values)): warnings.warn( "GPSampler cannot handle +/-inf, so we clip them to the best/worst finite value." ) finite_vals = values[np.isfinite(values)] best_finite_val = np.max(finite_vals, axis=0, initial=0.0) worst_finite_val = np.min(finite_vals, axis=0, initial=0.0) return np.clip(values, worst_finite_val, best_finite_val) return values def _get_constraint_vals_and_feasibility( study: Study, trials: list[FrozenTrial] ) -> tuple[np.ndarray, np.ndarray]: _constraint_vals = [ study._storage.get_trial_system_attrs(trial._trial_id).get(_CONSTRAINTS_KEY, ()) for trial in trials ] if any(len(_constraint_vals[0]) != len(c) for c in _constraint_vals): raise ValueError("The number of constraints must be the same for all trials.") constraint_vals = np.array(_constraint_vals) assert len(constraint_vals.shape) == 2, "constraint_vals must be a 2d array." is_feasible = np.all(constraint_vals <= 0, axis=1) assert not isinstance(is_feasible, np.bool_), "MyPy Redefinition for NumPy v2.2.0." return constraint_vals, is_feasible