跳到内容

Mcmc 高斯过程

smac.model.gaussian_process.mcmc_gaussian_process #

MCMCGaussianProcess #

MCMCGaussianProcess(
    configspace: ConfigurationSpace,
    kernel: Kernel,
    n_mcmc_walkers: int = 20,
    chain_length: int = 50,
    burning_steps: int = 50,
    mcmc_sampler: str = "emcee",
    average_samples: bool = False,
    normalize_y: bool = True,
    instance_features: dict[str, list[int | float]]
    | None = None,
    pca_components: int | None = 7,
    seed: int = 0,
)

基类: AbstractGaussianProcess

实现了通过马尔可夫链蒙特卡罗 (MCMC) 对超参数进行积分的高斯过程模型。如果你使用这个类,请确保你也使用集成采集函数,以如 Snoek 等人建议的那样,对 GP 的超参数进行积分。

此代码基于 RoBO 的实现

Klein, A. and Falkner, S. and Mansur, N. and Hutter, F. RoBO: A Flexible and Robust Bayesian Optimization Framework in Python In: NIPS 2017 Bayesian Optimization Workshop

参数#

configspace : ConfigurationSpace kernel : Kernel 用于高斯过程的核函数。 n_mcmc_walkers : int, 默认为 20 超参数样本的数量。这也决定了用于 MCMC 采样的 walker 数量,因为每个 walker 将返回一个超参数样本。 chain_length : int, 默认为 50 MCMC 链的长度。我们为 chain_length 步启动 n_mcmc_walkers 个 walker,并使用链中的最后一个样本作为超参数样本。 burning_steps : int, 默认为 50 实际 MCMC 采样开始前的燃烧步数。 mcmc_sampler : str, 默认为 "emcee" 选择一个自适应调整的 MCMC 采样器。可以是 emceenuts。 normalize_y : bool, 默认为 True 输出值的零均值单位方差归一化。 instance_features : dict[str, list[int | float]] | None, 默认为 None 实例(字符串)的特征(整数或浮点数列表)。这些特征被合并到 X 数据中,模型在此数据上进行训练。 pca_components : float, 默认为 7 使用 PCA 减少实例特征维度时保留的成分数量。 seed : int

源码位于 smac/model/gaussian_process/mcmc_gaussian_process.py
def __init__(
    self,
    configspace: ConfigurationSpace,
    kernel: Kernel,
    n_mcmc_walkers: int = 20,
    chain_length: int = 50,
    burning_steps: int = 50,
    mcmc_sampler: str = "emcee",
    average_samples: bool = False,
    normalize_y: bool = True,
    instance_features: dict[str, list[int | float]] | None = None,
    pca_components: int | None = 7,
    seed: int = 0,
):
    if mcmc_sampler not in ["emcee", "nuts"]:
        raise ValueError(f"MCMC Gaussian process does not support the sampler `{mcmc_sampler}`.")

    super().__init__(
        configspace=configspace,
        kernel=kernel,
        instance_features=instance_features,
        pca_components=pca_components,
        seed=seed,
    )

    self._n_mcmc_walkers = n_mcmc_walkers
    self._chain_length = chain_length
    self._burning_steps = burning_steps
    self._models: list[GaussianProcess] = []
    self._normalize_y = normalize_y
    self._mcmc_sampler = mcmc_sampler
    self._average_samples = average_samples
    self._set_has_conditions()

    # Internal statistics
    self._n_ll_evals = 0
    self._burned = False
    self._is_trained = False
    self._samples: np.ndarray | None = None

models 属性 #

返回内部使用的高斯过程。

predict #

predict(
    X: ndarray, covariance_type: str | None = "diagonal"
) -> tuple[ndarray, ndarray | None]

预测给定 X 的均值和方差。内部调用方法 _predict

参数#

X : np.ndarray [#samples, #hyperparameters + #features] 输入数据点。 covariance_type: str | None, 默认为 "diagonal" 指定除了均值之外返回什么。仅应用于高斯过程。接受四种有效输入: * None: 只返回均值。 * "std": 返回测试点的标准差。 * "diagonal": 返回协方差矩阵的对角线。 * "full": 返回测试点之间的整个协方差矩阵。

返回值#

means : np.ndarray [#samples, #objectives] 预测均值。 vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None 预测方差或标准差。

源码位于 smac/model/abstract_model.py
def predict(
    self,
    X: np.ndarray,
    covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
    """Predicts mean and variance for a given X. Internally, calls the method `_predict`.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters + #features]
        Input data points.
    covariance_type: str | None, defaults to "diagonal"
        Specifies what to return along with the mean. Applied only to Gaussian Processes.
        Takes four valid inputs:
        * None: Only the mean is returned.
        * "std": Standard deviation at test points is returned.
        * "diagonal": Diagonal of the covariance matrix is returned.
        * "full": Whole covariance matrix between the test points is returned.

    Returns
    -------
    means : np.ndarray [#samples, #objectives]
        The predictive mean.
    vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None
        Predictive variance or standard deviation.
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps + self._n_features:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
            f"but has {X.shape[1]} in total."
        )

    if self._apply_pca:
        try:
            X_feats = X[:, -self._n_features :]
            X_feats = self._scaler.transform(X_feats)
            X_feats = self._pca.transform(X_feats)
            X = np.hstack((X[:, : self._n_hps], X_feats))
        except NotFittedError:
            # PCA not fitted if only one training sample
            pass

    if X.shape[1] != len(self._types):
        raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._types), X.shape[1]))

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", "Predicted variances smaller than 0. Setting those variances to 0.")
        mean, var = self._predict(X, covariance_type)

    if len(mean.shape) == 1:
        mean = mean.reshape((-1, 1))

    if var is not None and len(var.shape) == 1:
        var = var.reshape((-1, 1))

    return mean, var

predict_marginalized #

predict_marginalized(X: ndarray) -> tuple[ndarray, ndarray]

预测对所有实例进行边缘化后的均值和方差。

警告#

输入数据不得包含任何特征。

参数#

X : np.ndarray [#samples, #hyperparameters] 输入数据点。

返回值#

means : np.ndarray [#samples, 1] 预测均值。 vars : np.ndarray [#samples, 1] 预测方差。

源码位于 smac/model/abstract_model.py
def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Predicts mean and variance marginalized over all instances.

    Warning
    -------
    The input data must not include any features.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters]
        Input data points.

    Returns
    -------
    means : np.ndarray [#samples, 1]
        The predictive mean.
    vars : np.ndarray [#samples, 1]
        The predictive variance.
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters (and no features) for this method, "
            f"but has {X.shape[1]} in total."
        )

    if self._instance_features is None:
        mean, var = self.predict(X)
        assert var is not None

        var[var < self._var_threshold] = self._var_threshold
        var[np.isnan(var)] = self._var_threshold

        return mean, var
    else:
        n_instances = len(self._instance_features)

        mean = np.zeros(X.shape[0])
        var = np.zeros(X.shape[0])
        for i, x in enumerate(X):
            features = np.array(list(self._instance_features.values()))
            x_tiled = np.tile(x, (n_instances, 1))
            X_ = np.hstack((x_tiled, features))

            means, vars = self.predict(X_)
            assert vars is not None

            # VAR[1/n (X_1 + ... + X_n)] =
            # 1/n^2 * ( VAR(X_1) + ... + VAR(X_n))
            # for independent X_1 ... X_n
            var_x = np.sum(vars) / (len(vars) ** 2)
            if var_x < self._var_threshold:
                var_x = self._var_threshold

            var[i] = var_x
            mean[i] = np.mean(means)

        if len(mean.shape) == 1:
            mean = mean.reshape((-1, 1))

        if len(var.shape) == 1:
            var = var.reshape((-1, 1))

        return mean, var

train #

train(X: ndarray, Y: ndarray) -> Self

在 X 和 Y 上训练随机森林。内部调用方法 _train

参数#

X : np.ndarray [#samples, #hyperparameters + #features] 输入数据点。 Y : np.ndarray [#samples, #objectives] 相应的目标值。

返回值#

self : AbstractModel

源码位于 smac/model/abstract_model.py
def train(self: Self, X: np.ndarray, Y: np.ndarray) -> Self:
    """Trains the random forest on X and Y. Internally, calls the method `_train`.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters + #features]
        Input data points.
    Y : np.ndarray [#samples, #objectives]
        The corresponding target values.

    Returns
    -------
    self : AbstractModel
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps + self._n_features:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
            f"but has {X.shape[1]} in total."
        )

    if X.shape[0] != Y.shape[0]:
        raise ValueError("X.shape[0] ({}) != y.shape[0] ({})".format(X.shape[0], Y.shape[0]))

    # Reduce dimensionality of features if larger than PCA_DIM
    if (
        self._pca_components is not None
        and X.shape[0] > self._pca.n_components
        and self._n_features >= self._pca_components
    ):
        X_feats = X[:, -self._n_features :]

        # Scale features
        X_feats = self._scaler.fit_transform(X_feats)
        X_feats = np.nan_to_num(X_feats)  # if features with max == min

        # PCA
        X_feats = self._pca.fit_transform(X_feats)
        X = np.hstack((X[:, : self._n_hps], X_feats))

        if hasattr(self, "_types"):
            # For RF, adapt types list
            # if X_feats.shape[0] < self._pca, X_feats.shape[1] == X_feats.shape[0]
            self._types = np.array(
                np.hstack((self._types[: self._n_hps], np.zeros(X_feats.shape[1]))),
                dtype=np.uint,
            )  # type: ignore

        self._apply_pca = True
    else:
        self._apply_pca = False

        if hasattr(self, "_types"):
            self._types = copy.deepcopy(self._initial_types)

    return self._train(X, Y)