Source code for optuna.samplers._cmaes

import copy
import math
import pickle
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from cmaes import CMA
import numpy as np

import optuna
from optuna._experimental import experimental
from optuna.distributions import BaseDistribution
from optuna import logging
from optuna.samplers import BaseSampler
from optuna.trial import FrozenTrial
from optuna.trial import TrialState

_logger = logging.get_logger(__name__)

# Minimum value of sigma0 to avoid ZeroDivisionError.
_MIN_SIGMA0 = 1e-10


[docs]class CmaEsSampler(BaseSampler): """A Sampler using CMA-ES algorithm. Example: Optimize a simple quadratic function by using :class:`~optuna.samplers.CmaEsSampler`. .. testcode:: import optuna def objective(trial): x = trial.suggest_uniform('x', -1, 1) y = trial.suggest_int('y', -1, 1) return x ** 2 + y sampler = optuna.samplers.CmaEsSampler() study = optuna.create_study(sampler=sampler) study.optimize(objective, n_trials=20) Please note that this sampler does not support CategoricalDistribution. If your search space contains categorical parameters, I recommend you to use :class:`~optuna.samplers.TPESampler` instead. Furthermore, there is room for performance improvements in parallel optimization settings. This sampler cannot use some trials for updating the parameters of multivariate normal distribution. .. seealso:: You can also use :class:`optuna.integration.CmaEsSampler` which is a sampler using cma library as the backend. Args: x0: A dictionary of an initial parameter values for CMA-ES. By default, the mean of ``low`` and ``high`` for each distribution is used. sigma0: Initial standard deviation of CMA-ES. By default, ``sigma0`` is set to ``min_range / 6``, where ``min_range`` denotes the minimum range of the distributions in the search space. seed: A random seed for CMA-ES. n_startup_trials: The independent sampling is used instead of the CMA-ES algorithm until the given number of trials finish in the same study. independent_sampler: A :class:`~optuna.samplers.BaseSampler` instance that is used for independent sampling. The parameters not contained in the relative search space are sampled by this sampler. The search space for :class:`~optuna.samplers.CmaEsSampler` is determined by :func:`~optuna.samplers.intersection_search_space()`. If :obj:`None` is specified, :class:`~optuna.samplers.RandomSampler` is used as the default. .. seealso:: :class:`optuna.samplers` module provides built-in independent samplers such as :class:`~optuna.samplers.RandomSampler` and :class:`~optuna.samplers.TPESampler`. warn_independent_sampling: If this is :obj:`True`, a warning message is emitted when the value of a parameter is sampled by using an independent sampler. Note that the parameters of the first trial in a study are always sampled via an independent sampler, so no warning messages are emitted in this case. consider_pruned_trials: If this is :obj:`True`, the PRUNED trials are considered for sampling. .. note:: Added in v2.0.0 as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v2.0.0. .. note:: It is suggested to set this flag :obj:`False` when the :class:`~optuna.pruners.MedianPruner` is used. On the other hand, it is suggested to set this flag :obj:`True` when the :class:`~optuna.pruners.HyperbandPruner` is used. Please see `the benchmark result <https://github.com/optuna/optuna/pull/1229>`_ for the details. """
[docs] def __init__( self, x0: Optional[Dict[str, Any]] = None, sigma0: Optional[float] = None, n_startup_trials: int = 1, independent_sampler: Optional[BaseSampler] = None, warn_independent_sampling: bool = True, seed: Optional[int] = None, *, consider_pruned_trials: bool = False ) -> None: self._x0 = x0 self._sigma0 = sigma0 self._independent_sampler = independent_sampler or optuna.samplers.RandomSampler(seed=seed) self._n_startup_trials = n_startup_trials self._warn_independent_sampling = warn_independent_sampling self._logger = optuna.logging.get_logger(__name__) self._cma_rng = np.random.RandomState(seed) self._search_space = optuna.samplers.IntersectionSearchSpace() self._consider_pruned_trials = consider_pruned_trials if self._consider_pruned_trials: self._raise_experimental_warning_for_consider_pruned_trials()
@experimental("2.0.0", name="`consider_pruned_trials = True` in CmaEsSampler") def _raise_experimental_warning_for_consider_pruned_trials(self) -> None: pass
[docs] def reseed_rng(self) -> None: # _cma_rng doesn't require reseeding because the relative sampling reseeds in each trial. self._independent_sampler.reseed_rng()
[docs] def infer_relative_search_space( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", ) -> Dict[str, BaseDistribution]: search_space = {} # type: Dict[str, BaseDistribution] for name, distribution in self._search_space.calculate(study).items(): if distribution.single(): # `cma` cannot handle distributions that contain just a single value, so we skip # them. Note that the parameter values for such distributions are sampled in # `Trial`. continue if not isinstance( distribution, ( optuna.distributions.UniformDistribution, optuna.distributions.LogUniformDistribution, optuna.distributions.DiscreteUniformDistribution, optuna.distributions.IntUniformDistribution, optuna.distributions.IntLogUniformDistribution, ), ): # Categorical distribution is unsupported. continue search_space[name] = distribution return search_space
[docs] def sample_relative( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", search_space: Dict[str, BaseDistribution], ) -> Dict[str, Any]: if len(search_space) == 0: return {} completed_trials = self._get_trials(study) if len(completed_trials) < self._n_startup_trials: return {} if len(search_space) == 1: self._logger.info( "`CmaEsSampler` only supports two or more dimensional continuous " "search space. `{}` is used instead of `CmaEsSampler`.".format( self._independent_sampler.__class__.__name__ ) ) self._warn_independent_sampling = False return {} # TODO(c-bata): Remove `ordered_keys` by passing `ordered_dict=True` # to `intersection_search_space`. ordered_keys = [key for key in search_space] ordered_keys.sort() optimizer = self._restore_or_init_optimizer(completed_trials, search_space, ordered_keys) if optimizer.dim != len(ordered_keys): self._logger.info( "`CmaEsSampler` does not support dynamic search space. " "`{}` is used instead of `CmaEsSampler`.".format( self._independent_sampler.__class__.__name__ ) ) self._warn_independent_sampling = False return {} # TODO(c-bata): Reduce the number of wasted trials during parallel optimization. # See https://github.com/optuna/optuna/pull/920#discussion_r385114002 for details. solution_trials = [ t for t in completed_trials if optimizer.generation == t.system_attrs.get("cma:generation", -1) ] if len(solution_trials) >= optimizer.population_size: solutions = [] # type: List[Tuple[np.ndarray, float]] for t in solution_trials[: optimizer.population_size]: assert t.value is not None, "completed trials must have a value" x = np.array( [_to_cma_param(search_space[k], t.params[k]) for k in ordered_keys], dtype=float, ) solutions.append((x, t.value)) optimizer.tell(solutions) optimizer_str = pickle.dumps(optimizer).hex() study._storage.set_trial_system_attr(trial._trial_id, "cma:optimizer", optimizer_str) # Caution: optimizer should update its seed value seed = self._cma_rng.randint(1, 2 ** 16) + trial.number optimizer._rng = np.random.RandomState(seed) params = optimizer.ask() study._storage.set_trial_system_attr( trial._trial_id, "cma:generation", optimizer.generation ) external_values = { k: _to_optuna_param(search_space[k], p) for k, p in zip(ordered_keys, params) } return external_values
def _restore_or_init_optimizer( self, completed_trials: "List[optuna.trial.FrozenTrial]", search_space: Dict[str, BaseDistribution], ordered_keys: List[str], ) -> CMA: # Restore a previous CMA object. for trial in reversed(completed_trials): serialized_optimizer = trial.system_attrs.get( "cma:optimizer", None ) # type: Optional[str] if serialized_optimizer is None: continue return pickle.loads(bytes.fromhex(serialized_optimizer)) # Init a CMA object. if self._x0 is None: self._x0 = _initialize_x0(search_space) if self._sigma0 is None: sigma0 = _initialize_sigma0(search_space) else: sigma0 = self._sigma0 sigma0 = max(sigma0, _MIN_SIGMA0) mean = np.array([self._x0[k] for k in ordered_keys], dtype=float) bounds = _get_search_space_bound(ordered_keys, search_space) n_dimension = len(ordered_keys) return CMA( mean=mean, sigma=sigma0, bounds=bounds, seed=self._cma_rng.randint(1, 2 ** 31 - 2), n_max_resampling=10 * n_dimension, )
[docs] def sample_independent( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", param_name: str, param_distribution: BaseDistribution, ) -> Any: if self._warn_independent_sampling: complete_trials = self._get_trials(study) if len(complete_trials) >= self._n_startup_trials: self._log_independent_sampling(trial, param_name) return self._independent_sampler.sample_independent( study, trial, param_name, param_distribution )
def _log_independent_sampling(self, trial: FrozenTrial, param_name: str) -> None: self._logger.warning( "The parameter '{}' in trial#{} is sampled independently " "by using `{}` instead of `CmaEsSampler` " "(optimization performance may be degraded). " "You can suppress this warning by setting `warn_independent_sampling` " "to `False` in the constructor of `CmaEsSampler`, " "if this independent sampling is intended behavior.".format( param_name, trial.number, self._independent_sampler.__class__.__name__ ) ) def _get_trials(self, study: "optuna.Study") -> List[FrozenTrial]: complete_trials = [] for t in study.get_trials(deepcopy=False): if t.state == TrialState.COMPLETE: complete_trials.append(t) elif ( t.state == TrialState.PRUNED and len(t.intermediate_values) > 0 and self._consider_pruned_trials ): _, value = max(t.intermediate_values.items()) if value is None: continue # We rewrite the value of the trial `t` for sampling, so we need a deepcopy. copied_t = copy.deepcopy(t) copied_t.value = value complete_trials.append(copied_t) return complete_trials
def _to_cma_param(distribution: BaseDistribution, optuna_param: Any) -> float: if isinstance(distribution, optuna.distributions.LogUniformDistribution): return math.log(optuna_param) if isinstance(distribution, optuna.distributions.IntUniformDistribution): return float(optuna_param) if isinstance(distribution, optuna.distributions.IntLogUniformDistribution): return math.log(optuna_param) return optuna_param def _to_optuna_param(distribution: BaseDistribution, cma_param: float) -> Any: if isinstance(distribution, optuna.distributions.LogUniformDistribution): return math.exp(cma_param) if isinstance(distribution, optuna.distributions.DiscreteUniformDistribution): v = np.round(cma_param / distribution.q) * distribution.q + distribution.low # v may slightly exceed range due to round-off errors. return float(min(max(v, distribution.low), distribution.high)) if isinstance(distribution, optuna.distributions.IntUniformDistribution): r = np.round((cma_param - distribution.low) / distribution.step) v = r * distribution.step + distribution.low return int(v) if isinstance(distribution, optuna.distributions.IntLogUniformDistribution): r = np.round(cma_param - math.log(distribution.low)) v = r + math.log(distribution.low) return int(math.exp(v)) return cma_param def _initialize_x0(search_space: Dict[str, BaseDistribution]) -> Dict[str, np.ndarray]: x0 = {} for name, distribution in search_space.items(): if isinstance(distribution, optuna.distributions.UniformDistribution): x0[name] = np.mean([distribution.high, distribution.low]) elif isinstance(distribution, optuna.distributions.DiscreteUniformDistribution): x0[name] = np.mean([distribution.high, distribution.low]) elif isinstance(distribution, optuna.distributions.IntUniformDistribution): x0[name] = int(np.mean([distribution.high, distribution.low])) elif isinstance(distribution, optuna.distributions.IntLogUniformDistribution): log_high = math.log(distribution.high) log_low = math.log(distribution.low) x0[name] = np.mean([log_high, log_low]) elif isinstance(distribution, optuna.distributions.LogUniformDistribution): log_high = math.log(distribution.high) log_low = math.log(distribution.low) x0[name] = np.mean([log_high, log_low]) else: raise NotImplementedError( "The distribution {} is not implemented.".format(distribution) ) return x0 def _initialize_sigma0(search_space: Dict[str, BaseDistribution]) -> float: sigma0 = [] for name, distribution in search_space.items(): if isinstance(distribution, optuna.distributions.UniformDistribution): sigma0.append((distribution.high - distribution.low) / 6) elif isinstance(distribution, optuna.distributions.DiscreteUniformDistribution): sigma0.append((distribution.high - distribution.low) / 6) elif isinstance(distribution, optuna.distributions.IntUniformDistribution): sigma0.append((distribution.high - distribution.low) / 6) elif isinstance(distribution, optuna.distributions.IntLogUniformDistribution): log_high = math.log(distribution.high) log_low = math.log(distribution.low) sigma0.append((log_high - log_low) / 6) elif isinstance(distribution, optuna.distributions.LogUniformDistribution): log_high = math.log(distribution.high) log_low = math.log(distribution.low) sigma0.append((log_high - log_low) / 6) else: raise NotImplementedError( "The distribution {} is not implemented.".format(distribution) ) return min(sigma0) def _get_search_space_bound( keys: List[str], search_space: Dict[str, BaseDistribution], ) -> np.ndarray: bounds = [] for param_name in keys: dist = search_space[param_name] if isinstance( dist, ( optuna.distributions.UniformDistribution, optuna.distributions.LogUniformDistribution, optuna.distributions.DiscreteUniformDistribution, optuna.distributions.IntUniformDistribution, optuna.distributions.IntLogUniformDistribution, ), ): bounds.append([_to_cma_param(dist, dist.low), _to_cma_param(dist, dist.high)]) else: raise NotImplementedError("The distribution {} is not implemented.".format(dist)) return np.array(bounds, dtype=float)