MCMCGaussianProcess(
configspace: ConfigurationSpace,
kernel: Kernel,
n_mcmc_walkers: int = 20,
chain_length: int = 50,
burning_steps: int = 50,
mcmc_sampler: str = "emcee",
average_samples: bool = False,
normalize_y: bool = True,
instance_features: dict[str, list[int | float]]
| None = None,
pca_components: int | None = 7,
seed: int = 0,
)
基类: AbstractGaussianProcess
实现了通过马尔可夫链蒙特卡罗 (MCMC) 对超参数进行积分的高斯过程模型。如果你使用这个类,请确保你也使用集成采集函数,以如 Snoek 等人建议的那样,对 GP 的超参数进行积分。
此代码基于 RoBO 的实现
Klein, A. and Falkner, S. and Mansur, N. and Hutter, F. RoBO: A Flexible and Robust Bayesian Optimization Framework in Python In: NIPS 2017 Bayesian Optimization Workshop
参数
configspace : ConfigurationSpace kernel : Kernel 用于高斯过程的核函数。 n_mcmc_walkers : int, 默认为 20 超参数样本的数量。这也决定了用于 MCMC 采样的 walker 数量,因为每个 walker 将返回一个超参数样本。 chain_length : int, 默认为 50 MCMC 链的长度。我们为 chain_length
步启动 n_mcmc_walkers
个 walker,并使用链中的最后一个样本作为超参数样本。 burning_steps : int, 默认为 50 实际 MCMC 采样开始前的燃烧步数。 mcmc_sampler : str, 默认为 "emcee" 选择一个自适应调整的 MCMC 采样器。可以是 emcee
或 nuts
。 normalize_y : bool, 默认为 True 输出值的零均值单位方差归一化。 instance_features : dict[str, list[int | float]] | None, 默认为 None 实例(字符串)的特征(整数或浮点数列表)。这些特征被合并到 X 数据中,模型在此数据上进行训练。 pca_components : float, 默认为 7 使用 PCA 减少实例特征维度时保留的成分数量。 seed : int
源码位于 smac/model/gaussian_process/mcmc_gaussian_process.py
| def __init__(
self,
configspace: ConfigurationSpace,
kernel: Kernel,
n_mcmc_walkers: int = 20,
chain_length: int = 50,
burning_steps: int = 50,
mcmc_sampler: str = "emcee",
average_samples: bool = False,
normalize_y: bool = True,
instance_features: dict[str, list[int | float]] | None = None,
pca_components: int | None = 7,
seed: int = 0,
):
if mcmc_sampler not in ["emcee", "nuts"]:
raise ValueError(f"MCMC Gaussian process does not support the sampler `{mcmc_sampler}`.")
super().__init__(
configspace=configspace,
kernel=kernel,
instance_features=instance_features,
pca_components=pca_components,
seed=seed,
)
self._n_mcmc_walkers = n_mcmc_walkers
self._chain_length = chain_length
self._burning_steps = burning_steps
self._models: list[GaussianProcess] = []
self._normalize_y = normalize_y
self._mcmc_sampler = mcmc_sampler
self._average_samples = average_samples
self._set_has_conditions()
# Internal statistics
self._n_ll_evals = 0
self._burned = False
self._is_trained = False
self._samples: np.ndarray | None = None
|
predict
预测给定 X 的均值和方差。内部调用方法 _predict
。
参数
X : np.ndarray [#samples, #hyperparameters + #features] 输入数据点。 covariance_type: str | None, 默认为 "diagonal" 指定除了均值之外返回什么。仅应用于高斯过程。接受四种有效输入: * None: 只返回均值。 * "std": 返回测试点的标准差。 * "diagonal": 返回协方差矩阵的对角线。 * "full": 返回测试点之间的整个协方差矩阵。
返回值
means : np.ndarray [#samples, #objectives] 预测均值。 vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None 预测方差或标准差。
源码位于 smac/model/abstract_model.py
| def predict(
self,
X: np.ndarray,
covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
"""Predicts mean and variance for a given X. Internally, calls the method `_predict`.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
covariance_type: str | None, defaults to "diagonal"
Specifies what to return along with the mean. Applied only to Gaussian Processes.
Takes four valid inputs:
* None: Only the mean is returned.
* "std": Standard deviation at test points is returned.
* "diagonal": Diagonal of the covariance matrix is returned.
* "full": Whole covariance matrix between the test points is returned.
Returns
-------
means : np.ndarray [#samples, #objectives]
The predictive mean.
vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None
Predictive variance or standard deviation.
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps + self._n_features:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
f"but has {X.shape[1]} in total."
)
if self._apply_pca:
try:
X_feats = X[:, -self._n_features :]
X_feats = self._scaler.transform(X_feats)
X_feats = self._pca.transform(X_feats)
X = np.hstack((X[:, : self._n_hps], X_feats))
except NotFittedError:
# PCA not fitted if only one training sample
pass
if X.shape[1] != len(self._types):
raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._types), X.shape[1]))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "Predicted variances smaller than 0. Setting those variances to 0.")
mean, var = self._predict(X, covariance_type)
if len(mean.shape) == 1:
mean = mean.reshape((-1, 1))
if var is not None and len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean, var
|
predict_marginalized
预测对所有实例进行边缘化后的均值和方差。
警告
输入数据不得包含任何特征。
参数
X : np.ndarray [#samples, #hyperparameters] 输入数据点。
返回值
means : np.ndarray [#samples, 1] 预测均值。 vars : np.ndarray [#samples, 1] 预测方差。
源码位于 smac/model/abstract_model.py
| def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Predicts mean and variance marginalized over all instances.
Warning
-------
The input data must not include any features.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters]
Input data points.
Returns
-------
means : np.ndarray [#samples, 1]
The predictive mean.
vars : np.ndarray [#samples, 1]
The predictive variance.
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters (and no features) for this method, "
f"but has {X.shape[1]} in total."
)
if self._instance_features is None:
mean, var = self.predict(X)
assert var is not None
var[var < self._var_threshold] = self._var_threshold
var[np.isnan(var)] = self._var_threshold
return mean, var
else:
n_instances = len(self._instance_features)
mean = np.zeros(X.shape[0])
var = np.zeros(X.shape[0])
for i, x in enumerate(X):
features = np.array(list(self._instance_features.values()))
x_tiled = np.tile(x, (n_instances, 1))
X_ = np.hstack((x_tiled, features))
means, vars = self.predict(X_)
assert vars is not None
# VAR[1/n (X_1 + ... + X_n)] =
# 1/n^2 * ( VAR(X_1) + ... + VAR(X_n))
# for independent X_1 ... X_n
var_x = np.sum(vars) / (len(vars) ** 2)
if var_x < self._var_threshold:
var_x = self._var_threshold
var[i] = var_x
mean[i] = np.mean(means)
if len(mean.shape) == 1:
mean = mean.reshape((-1, 1))
if len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean, var
|
train
在 X 和 Y 上训练随机森林。内部调用方法 _train
。
参数
X : np.ndarray [#samples, #hyperparameters + #features] 输入数据点。 Y : np.ndarray [#samples, #objectives] 相应的目标值。
返回值
self : AbstractModel
源码位于 smac/model/abstract_model.py
| def train(self: Self, X: np.ndarray, Y: np.ndarray) -> Self:
"""Trains the random forest on X and Y. Internally, calls the method `_train`.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
Y : np.ndarray [#samples, #objectives]
The corresponding target values.
Returns
-------
self : AbstractModel
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps + self._n_features:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
f"but has {X.shape[1]} in total."
)
if X.shape[0] != Y.shape[0]:
raise ValueError("X.shape[0] ({}) != y.shape[0] ({})".format(X.shape[0], Y.shape[0]))
# Reduce dimensionality of features if larger than PCA_DIM
if (
self._pca_components is not None
and X.shape[0] > self._pca.n_components
and self._n_features >= self._pca_components
):
X_feats = X[:, -self._n_features :]
# Scale features
X_feats = self._scaler.fit_transform(X_feats)
X_feats = np.nan_to_num(X_feats) # if features with max == min
# PCA
X_feats = self._pca.fit_transform(X_feats)
X = np.hstack((X[:, : self._n_hps], X_feats))
if hasattr(self, "_types"):
# For RF, adapt types list
# if X_feats.shape[0] < self._pca, X_feats.shape[1] == X_feats.shape[0]
self._types = np.array(
np.hstack((self._types[: self._n_hps], np.zeros(X_feats.shape[1]))),
dtype=np.uint,
) # type: ignore
self._apply_pca = True
else:
self._apply_pca = False
if hasattr(self, "_types"):
self._types = copy.deepcopy(self._initial_types)
return self._train(X, Y)
|