跳到内容

集群上的并行化#

展开复制 examples/1_basics/7_parallelization_cluster.py (右上角)
import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace, Float
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from smac import BlackBoxFacade, Scenario

__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"


class Branin(object):
    @property
    def configspace(self) -> ConfigurationSpace:
        cs = ConfigurationSpace(seed=0)
        x0 = Float("x0", (-5, 10), default=-5, log=False)
        x1 = Float("x1", (0, 15), default=2, log=False)
        cs.add([x0, x1])

        return cs

    def train(self, config: Configuration, seed: int = 0) -> float:
        """Branin function

        Parameters
        ----------
        config : Configuration
            Contains two continuous hyperparameters, x0 and x1
        seed : int, optional
            Not used, by default 0

        Returns
        -------
        float
            Branin function value
        """
        x0 = config["x0"]
        x1 = config["x1"]
        a = 1.0
        b = 5.1 / (4.0 * np.pi**2)
        c = 5.0 / np.pi
        r = 6.0
        s = 10.0
        t = 1.0 / (8.0 * np.pi)
        ret = a * (x1 - b * x0**2 + c * x0 - r) ** 2 + s * (1 - t) * np.cos(x0) + s

        return ret


if __name__ == "__main__":
    model = Branin()

    # Scenario object specifying the optimization "environment"
    scenario = Scenario(model.configspace, deterministic=True, n_trials=100, trial_walltime_limit=100)

    # Create cluster
    n_workers = 4  # Use 4 workers on the cluster
    # Please note that the number of workers is directly set in the
    # cluster / client. `scenario.n_workers` is ignored in this case.

    cluster = SLURMCluster(
        # This is the partition of our slurm cluster.
        queue="cpu_short",
        # Your account name
        # account="myaccount",
        cores=1,
        memory="1 GB",
        # Walltime limit for each worker. Ensure that your function evaluations
        # do not exceed this limit.
        # More tips on this here: https://jobqueue.dask.org.cn/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
        walltime="00:10:00",
        processes=1,
        log_directory="tmp/smac_dask_slurm",
        # if you would like to limit the resources consumption of each function evaluation with pynisher, you need to
        # set nanny as False
        # Otherwise, an error `daemonic processes are not allowed to have children` will raise!
        nanny=False,  # if you do not use pynisher to limit the memory/time usage, feel free to set this one as True
    )
    cluster.scale(jobs=n_workers)

    # Dask will create n_workers jobs on the cluster which stay open.
    # Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
    client = Client(
        address=cluster,
    )
    # Instead, you can also do
    # client = cluster.get_client()

    # Now we use SMAC to find the best hyperparameters
    smac = BlackBoxFacade(
        scenario,
        model.train,  # We pass the target function here
        overwrite=True,  # Overrides any previous results that are found that are inconsistent with the meta-data
        dask_client=client,
    )

    incumbent = smac.optimize()

    # Get cost of default configuration
    default_cost = smac.validate(model.configspace.get_default_configuration())
    print(f"Default cost: {default_cost}")

    # Let's calculate the cost of the incumbent
    incumbent_cost = smac.validate(incumbent)
    print(f"Incumbent cost: {incumbent_cost}")

描述#

这是一个使用 Dask 客户端在 SLURM 集群上通过并行化优化 Branin 函数的 SMAC 示例。如果您不想使用集群,而想使用本地机器,请将 dask_client 设置为 None 并将 n_workers 传递给 Scenario

有时,Slurm 客户端提交的作业启动后可能会被取消。在这种情况下,您可以尝试从计算节点启动作业

⚠ 在某些集群上,当您在某个作业内部运行 SLURMCluster 而非在登录节点上时,您无法生成新的作业。这可能不会引发明显的错误,但可能会静默地挂起。

有时您需要修改启动命令,这可以通过 SLURMCluster.job_class.submit_command 完成。

cluster.job_cls.submit_command = submit_command
cluster.job_cls.cancel_command = cancel_command

在这里,我们优化合成二维函数 Branin。我们使用黑盒外观模式(black-box facade),因为它专为黑盒函数优化而设计。黑盒外观模式使用高斯过程作为其代理模型(surrogate model)。该外观模式最适用于数值超参数配置空间,不应应用于评估预算很大(高达 1000 次评估)的问题。

import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace, Float
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from smac import BlackBoxFacade, Scenario

__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"


class Branin(object):
    @property
    def configspace(self) -> ConfigurationSpace:
        cs = ConfigurationSpace(seed=0)
        x0 = Float("x0", (-5, 10), default=-5, log=False)
        x1 = Float("x1", (0, 15), default=2, log=False)
        cs.add([x0, x1])

        return cs

    def train(self, config: Configuration, seed: int = 0) -> float:
        """Branin function

        Parameters
        ----------
        config : Configuration
            Contains two continuous hyperparameters, x0 and x1
        seed : int, optional
            Not used, by default 0

        Returns
        -------
        float
            Branin function value
        """
        x0 = config["x0"]
        x1 = config["x1"]
        a = 1.0
        b = 5.1 / (4.0 * np.pi**2)
        c = 5.0 / np.pi
        r = 6.0
        s = 10.0
        t = 1.0 / (8.0 * np.pi)
        ret = a * (x1 - b * x0**2 + c * x0 - r) ** 2 + s * (1 - t) * np.cos(x0) + s

        return ret


if __name__ == "__main__":
    model = Branin()

    # Scenario object specifying the optimization "environment"
    scenario = Scenario(model.configspace, deterministic=True, n_trials=100, trial_walltime_limit=100)

    # Create cluster
    n_workers = 4  # Use 4 workers on the cluster
    # Please note that the number of workers is directly set in the
    # cluster / client. `scenario.n_workers` is ignored in this case.

    cluster = SLURMCluster(
        # This is the partition of our slurm cluster.
        queue="cpu_short",
        # Your account name
        # account="myaccount",
        cores=1,
        memory="1 GB",
        # Walltime limit for each worker. Ensure that your function evaluations
        # do not exceed this limit.
        # More tips on this here: https://jobqueue.dask.org.cn/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
        walltime="00:10:00",
        processes=1,
        log_directory="tmp/smac_dask_slurm",
        # if you would like to limit the resources consumption of each function evaluation with pynisher, you need to
        # set nanny as False
        # Otherwise, an error `daemonic processes are not allowed to have children` will raise!
        nanny=False,  # if you do not use pynisher to limit the memory/time usage, feel free to set this one as True
    )
    cluster.scale(jobs=n_workers)

    # Dask will create n_workers jobs on the cluster which stay open.
    # Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
    client = Client(
        address=cluster,
    )
    # Instead, you can also do
    # client = cluster.get_client()

    # Now we use SMAC to find the best hyperparameters
    smac = BlackBoxFacade(
        scenario,
        model.train,  # We pass the target function here
        overwrite=True,  # Overrides any previous results that are found that are inconsistent with the meta-data
        dask_client=client,
    )

    incumbent = smac.optimize()

    # Get cost of default configuration
    default_cost = smac.validate(model.configspace.get_default_configuration())
    print(f"Default cost: {default_cost}")

    # Let's calculate the cost of the incumbent
    incumbent_cost = smac.validate(incumbent)
    print(f"Incumbent cost: {incumbent_cost}")