Source code for optuna.samplers._gp.sampler

from __future__ import annotations

from typing import Any
from typing import Callable
from typing import cast
from typing import Sequence
from typing import TYPE_CHECKING
import warnings

import numpy as np

import optuna
from optuna._experimental import experimental_class
from optuna.distributions import BaseDistribution
from optuna.samplers._base import BaseSampler
from optuna.samplers._lazy_random_state import LazyRandomState
from optuna.study import StudyDirection
from optuna.trial import FrozenTrial
from optuna.trial import TrialState


if TYPE_CHECKING:
    import torch

    import optuna._gp.acqf as acqf
    import optuna._gp.gp as gp
    import optuna._gp.optim_mixed as optim_mixed
    import optuna._gp.prior as prior
    import optuna._gp.search_space as gp_search_space
    from optuna.study import Study
else:
    from optuna._imports import _LazyImport

    torch = _LazyImport("torch")
    gp_search_space = _LazyImport("optuna._gp.search_space")
    gp = _LazyImport("optuna._gp.gp")
    optim_mixed = _LazyImport("optuna._gp.optim_mixed")
    acqf = _LazyImport("optuna._gp.acqf")
    prior = _LazyImport("optuna._gp.prior")


[docs] @experimental_class("3.6.0") class GPSampler(BaseSampler): """Sampler using Gaussian process-based Bayesian optimization. This sampler fits a Gaussian process (GP) to the objective function and optimizes the acquisition function to suggest the next parameters. The current implementation uses: - Matern kernel with nu=2.5 (twice differentiable), - Automatic relevance determination (ARD) for the length scale of each parameter, - Gamma prior for inverse squared lengthscales, kernel scale, and noise variance, - Log Expected Improvement (logEI) as the acquisition function, and - Quasi-Monte Carlo (QMC) sampling to optimize the acquisition function. .. note:: This sampler requires ``scipy`` and ``torch``. You can install these dependencies with ``pip install scipy torch``. Args: seed: Random seed to initialize internal random number generator. Defaults to :obj:`None` (a seed is picked randomly). independent_sampler: Sampler used for initial sampling (for the first ``n_startup_trials`` trials) and for conditional parameters. Defaults to :obj:`None` (a random sampler with the same ``seed`` is used). n_startup_trials: Number of initial trials. Defaults to 10. deterministic_objective: Whether the objective function is deterministic or not. If :obj:`True`, the sampler will fix the noise variance of the surrogate model to the minimum value (slightly above 0 to ensure numerical stability). Defaults to :obj:`False`. """ def __init__( self, *, seed: int | None = None, independent_sampler: BaseSampler | None = None, n_startup_trials: int = 10, deterministic_objective: bool = False, ) -> None: self._rng = LazyRandomState(seed) self._independent_sampler = independent_sampler or optuna.samplers.RandomSampler(seed=seed) self._intersection_search_space = optuna.search_space.IntersectionSearchSpace() self._n_startup_trials = n_startup_trials self._log_prior: "Callable[[gp.KernelParamsTensor], torch.Tensor]" = ( prior.default_log_prior ) self._minimum_noise: float = prior.DEFAULT_MINIMUM_NOISE_VAR # We cache the kernel parameters for initial values of fitting the next time. self._kernel_params_cache: "gp.KernelParamsTensor | None" = None self._optimize_n_samples: int = 2048 self._deterministic = deterministic_objective
[docs] def reseed_rng(self) -> None: self._rng.rng.seed() self._independent_sampler.reseed_rng()
[docs] def infer_relative_search_space( self, study: Study, trial: FrozenTrial ) -> dict[str, BaseDistribution]: search_space = {} for name, distribution in self._intersection_search_space.calculate(study).items(): if distribution.single(): continue search_space[name] = distribution return search_space
def _optimize_acqf( self, acqf_params: "acqf.AcquisitionFunctionParams", best_params: np.ndarray, ) -> np.ndarray: # Advanced users can override this method to change the optimization algorithm. # However, we do not make any effort to keep backward compatibility between versions. # Particularly, we may remove this function in future refactoring. normalized_params, _acqf_val = optim_mixed.optimize_acqf_mixed( acqf_params, warmstart_normalized_params_array=best_params[None, :], n_preliminary_samples=2048, n_local_search=10, tol=1e-4, rng=self._rng.rng, ) return normalized_params
[docs] def sample_relative( self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution] ) -> dict[str, Any]: self._raise_error_if_multi_objective(study) if search_space == {}: return {} states = (TrialState.COMPLETE,) trials = study._get_trials(deepcopy=False, states=states, use_cache=True) if len(trials) < self._n_startup_trials: return {} ( internal_search_space, normalized_params, ) = gp_search_space.get_search_space_and_normalized_params(trials, search_space) _sign = -1.0 if study.direction == StudyDirection.MINIMIZE else 1.0 score_vals = np.array([_sign * cast(float, trial.value) for trial in trials]) if np.any(~np.isfinite(score_vals)): warnings.warn( "GPSampler cannot handle infinite values. " "We clamp those values to worst/best finite value." ) finite_score_vals = score_vals[np.isfinite(score_vals)] best_finite_score = np.max(finite_score_vals, initial=0.0) worst_finite_score = np.min(finite_score_vals, initial=0.0) score_vals = np.clip(score_vals, worst_finite_score, best_finite_score) standarized_score_vals = (score_vals - score_vals.mean()) / max(1e-10, score_vals.std()) if self._kernel_params_cache is not None and len( self._kernel_params_cache.inverse_squared_lengthscales ) != len(internal_search_space.scale_types): # Clear cache if the search space changes. self._kernel_params_cache = None kernel_params = gp.fit_kernel_params( X=normalized_params, Y=standarized_score_vals, is_categorical=( internal_search_space.scale_types == gp_search_space.ScaleType.CATEGORICAL ), log_prior=self._log_prior, minimum_noise=self._minimum_noise, initial_kernel_params=self._kernel_params_cache, deterministic_objective=self._deterministic, ) self._kernel_params_cache = kernel_params acqf_params = acqf.create_acqf_params( acqf_type=acqf.AcquisitionFunctionType.LOG_EI, kernel_params=kernel_params, search_space=internal_search_space, X=normalized_params, Y=standarized_score_vals, ) normalized_param = self._optimize_acqf( acqf_params, normalized_params[np.argmax(standarized_score_vals), :] ) return gp_search_space.get_unnormalized_param(search_space, normalized_param)
[docs] def sample_independent( self, study: Study, trial: FrozenTrial, param_name: str, param_distribution: BaseDistribution, ) -> Any: self._raise_error_if_multi_objective(study) return self._independent_sampler.sample_independent( study, trial, param_name, param_distribution )
[docs] def before_trial(self, study: Study, trial: FrozenTrial) -> None: self._independent_sampler.before_trial(study, trial)
[docs] def after_trial( self, study: Study, trial: FrozenTrial, state: TrialState, values: Sequence[float] | None, ) -> None: self._independent_sampler.after_trial(study, trial, state, values)