集群上的并行化#
展开复制 examples/1_basics/7_parallelization_cluster.py
(右上角)
import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace, Float
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from smac import BlackBoxFacade, Scenario
__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"
class Branin(object):
@property
def configspace(self) -> ConfigurationSpace:
cs = ConfigurationSpace(seed=0)
x0 = Float("x0", (-5, 10), default=-5, log=False)
x1 = Float("x1", (0, 15), default=2, log=False)
cs.add([x0, x1])
return cs
def train(self, config: Configuration, seed: int = 0) -> float:
"""Branin function
Parameters
----------
config : Configuration
Contains two continuous hyperparameters, x0 and x1
seed : int, optional
Not used, by default 0
Returns
-------
float
Branin function value
"""
x0 = config["x0"]
x1 = config["x1"]
a = 1.0
b = 5.1 / (4.0 * np.pi**2)
c = 5.0 / np.pi
r = 6.0
s = 10.0
t = 1.0 / (8.0 * np.pi)
ret = a * (x1 - b * x0**2 + c * x0 - r) ** 2 + s * (1 - t) * np.cos(x0) + s
return ret
if __name__ == "__main__":
model = Branin()
# Scenario object specifying the optimization "environment"
scenario = Scenario(model.configspace, deterministic=True, n_trials=100, trial_walltime_limit=100)
# Create cluster
n_workers = 4 # Use 4 workers on the cluster
# Please note that the number of workers is directly set in the
# cluster / client. `scenario.n_workers` is ignored in this case.
cluster = SLURMCluster(
# This is the partition of our slurm cluster.
queue="cpu_short",
# Your account name
# account="myaccount",
cores=1,
memory="1 GB",
# Walltime limit for each worker. Ensure that your function evaluations
# do not exceed this limit.
# More tips on this here: https://jobqueue.dask.org.cn/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
walltime="00:10:00",
processes=1,
log_directory="tmp/smac_dask_slurm",
# if you would like to limit the resources consumption of each function evaluation with pynisher, you need to
# set nanny as False
# Otherwise, an error `daemonic processes are not allowed to have children` will raise!
nanny=False, # if you do not use pynisher to limit the memory/time usage, feel free to set this one as True
)
cluster.scale(jobs=n_workers)
# Dask will create n_workers jobs on the cluster which stay open.
# Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
client = Client(
address=cluster,
)
# Instead, you can also do
# client = cluster.get_client()
# Now we use SMAC to find the best hyperparameters
smac = BlackBoxFacade(
scenario,
model.train, # We pass the target function here
overwrite=True, # Overrides any previous results that are found that are inconsistent with the meta-data
dask_client=client,
)
incumbent = smac.optimize()
# Get cost of default configuration
default_cost = smac.validate(model.configspace.get_default_configuration())
print(f"Default cost: {default_cost}")
# Let's calculate the cost of the incumbent
incumbent_cost = smac.validate(incumbent)
print(f"Incumbent cost: {incumbent_cost}")
描述#
这是一个使用 Dask 客户端在 SLURM 集群上通过并行化优化 Branin 函数的 SMAC 示例。如果您不想使用集群,而想使用本地机器,请将 dask_client 设置为 None
并将 n_workers
传递给 Scenario
。
有时,Slurm 客户端提交的作业启动后可能会被取消。在这种情况下,您可以尝试从计算节点启动作业
在某些集群上,当您在某个作业内部运行 SLURMCluster 而非在登录节点上时,您无法生成新的作业。这可能不会引发明显的错误,但可能会静默地挂起。
有时您需要修改启动命令,这可以通过 SLURMCluster.job_class.submit_command
完成。
在这里,我们优化合成二维函数 Branin。我们使用黑盒外观模式(black-box facade),因为它专为黑盒函数优化而设计。黑盒外观模式使用高斯过程作为其代理模型(surrogate model)。该外观模式最适用于数值超参数配置空间,不应应用于评估预算很大(高达 1000 次评估)的问题。
import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace, Float
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from smac import BlackBoxFacade, Scenario
__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"
class Branin(object):
@property
def configspace(self) -> ConfigurationSpace:
cs = ConfigurationSpace(seed=0)
x0 = Float("x0", (-5, 10), default=-5, log=False)
x1 = Float("x1", (0, 15), default=2, log=False)
cs.add([x0, x1])
return cs
def train(self, config: Configuration, seed: int = 0) -> float:
"""Branin function
Parameters
----------
config : Configuration
Contains two continuous hyperparameters, x0 and x1
seed : int, optional
Not used, by default 0
Returns
-------
float
Branin function value
"""
x0 = config["x0"]
x1 = config["x1"]
a = 1.0
b = 5.1 / (4.0 * np.pi**2)
c = 5.0 / np.pi
r = 6.0
s = 10.0
t = 1.0 / (8.0 * np.pi)
ret = a * (x1 - b * x0**2 + c * x0 - r) ** 2 + s * (1 - t) * np.cos(x0) + s
return ret
if __name__ == "__main__":
model = Branin()
# Scenario object specifying the optimization "environment"
scenario = Scenario(model.configspace, deterministic=True, n_trials=100, trial_walltime_limit=100)
# Create cluster
n_workers = 4 # Use 4 workers on the cluster
# Please note that the number of workers is directly set in the
# cluster / client. `scenario.n_workers` is ignored in this case.
cluster = SLURMCluster(
# This is the partition of our slurm cluster.
queue="cpu_short",
# Your account name
# account="myaccount",
cores=1,
memory="1 GB",
# Walltime limit for each worker. Ensure that your function evaluations
# do not exceed this limit.
# More tips on this here: https://jobqueue.dask.org.cn/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
walltime="00:10:00",
processes=1,
log_directory="tmp/smac_dask_slurm",
# if you would like to limit the resources consumption of each function evaluation with pynisher, you need to
# set nanny as False
# Otherwise, an error `daemonic processes are not allowed to have children` will raise!
nanny=False, # if you do not use pynisher to limit the memory/time usage, feel free to set this one as True
)
cluster.scale(jobs=n_workers)
# Dask will create n_workers jobs on the cluster which stay open.
# Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
client = Client(
address=cluster,
)
# Instead, you can also do
# client = cluster.get_client()
# Now we use SMAC to find the best hyperparameters
smac = BlackBoxFacade(
scenario,
model.train, # We pass the target function here
overwrite=True, # Overrides any previous results that are found that are inconsistent with the meta-data
dask_client=client,
)
incumbent = smac.optimize()
# Get cost of default configuration
default_cost = smac.validate(model.configspace.get_default_configuration())
print(f"Default cost: {default_cost}")
# Let's calculate the cost of the incumbent
incumbent_cost = smac.validate(incumbent)
print(f"Incumbent cost: {incumbent_cost}")