跳到内容

Smbo

smac.main.smbo #

SMBO #

SMBO(
    scenario: Scenario,
    runner: AbstractRunner,
    runhistory: RunHistory,
    intensifier: AbstractIntensifier,
    overwrite: bool = False,
)

实现了主要的贝叶斯优化循环。

参数#

scenario : Scenario 场景对象,包含所有环境信息。
runner : AbstractRunner 运行器(包含目标函数)在内部被调用以评估试验性能。
runhistory : Runhistory 运行历史记录存储所有试验。
intensifier : AbstractIntensifier 强化器决定接下来应该运行哪个试验(配置、种子、预算和实例的组合)。
overwrite: bool, 默认为 False 当为 True 时,如果找到与当前设置元数据不一致的先前运行结果,则覆盖运行结果。如果 overwrite 设置为 False,则会询问用户确切行为(完全覆盖、保存旧运行或使用旧结果)。

警告#

此模型只能通过 facade 进行初始化。

源代码位于 smac/main/smbo.py
def __init__(
    self,
    scenario: Scenario,
    runner: AbstractRunner,
    runhistory: RunHistory,
    intensifier: AbstractIntensifier,
    overwrite: bool = False,
):
    self._scenario = scenario
    self._configspace = scenario.configspace
    self._runhistory = runhistory
    self._intensifier = intensifier
    self._trial_generator = iter(intensifier)
    self._runner = runner
    self._overwrite = overwrite

    # Internal variables
    self._finished = False
    self._stop = False  # Gracefully stop SMAC
    self._callbacks: list[Callback] = []

    # Stats variables
    self._start_time: float | None = None
    self._used_target_function_walltime = 0.0
    self._used_target_function_cputime = 0.0

    # Set walltime used method for intensifier
    self._intensifier.used_walltime = lambda: self.used_walltime  # type: ignore

    # We initialize the state based on previous data.
    # If no previous data is found then we take care of the initial design.
    self._initialize_state()

budget_exhausted property #

budget_exhausted: bool

检查是否超过了剩余的墙钟时间、CPU 时间或试验次数。

intensifier property #

intensifier: AbstractIntensifier

运行历史,其中填充了优化过程中收集的所有信息。

remaining_cputime property #

remaining_cputime: float

用目标函数的运行预算减去已用时间。

remaining_trials property #

remaining_trials: int

用场景中目标函数的运行次数减去已用的 TA 运行次数。

remaining_walltime property #

remaining_walltime: float

用运行时配置预算减去已用的墙钟时间。

runhistory property #

runhistory: RunHistory

运行历史,其中填充了优化过程中收集的所有信息。

used_target_function_cputime property #

used_target_function_cputime: float

返回目标函数到目前为止在硬件上花费了多少时间。

used_target_function_walltime property #

used_target_function_walltime: float

返回目标函数到目前为止花费了多少墙钟时间。

used_walltime property #

used_walltime: float

返回已用的墙钟时间。

ask #

ask() -> TrialInfo

向强化器询问下一个试验。

返回值#

info : TrialInfo 关于试验的信息(配置、实例、种子、预算)。

源代码位于 smac/main/smbo.py
def ask(self) -> TrialInfo:
    """Asks the intensifier for the next trial.

    Returns
    -------
    info : TrialInfo
        Information about the trial (config, instance, seed, budget).
    """
    logger.debug("Calling ask...")

    for callback in self._callbacks:
        callback.on_ask_start(self)

    # Now we use our generator to get the next trial info
    trial_info = next(self._trial_generator)

    # Track the fact that the trial was returned
    # This is really important because otherwise the intensifier would most likly sample the same trial again
    self._runhistory.add_running_trial(trial_info)

    for callback in self._callbacks:
        callback.on_ask_end(self, trial_info)

    logger.debug("...and received a new trial.")

    return trial_info

exists #

exists(filename: str | Path) -> bool

检查与运行相关的文件是否已存在。检查由优化器创建的所有文件。

参数#

filename : str | Path SMAC 运行文件夹的名称。

源代码位于 smac/main/smbo.py
def exists(self, filename: str | Path) -> bool:
    """Checks if the files associated with the run already exist.
    Checks all files that are created by the optimizer.

    Parameters
    ----------
    filename : str | Path
        The name of the folder of the SMAC run.
    """
    if isinstance(filename, str):
        filename = Path(filename)

    optimization_fn = filename / "optimization.json"
    runhistory_fn = filename / "runhistory.json"
    intensifier_fn = filename / "intensifier.json"

    if optimization_fn.exists() and runhistory_fn.exists() and intensifier_fn.exists():
        return True

    return False

load #

load() -> None

从场景中指定的输出目录加载优化器、强化器和运行历史。

源代码位于 smac/main/smbo.py
def load(self) -> None:
    """Loads the optimizer, intensifier, and runhistory from the output directory specified in the scenario."""
    filename = self._scenario.output_directory

    optimization_fn = filename / "optimization.json"
    runhistory_fn = filename / "runhistory.json"
    intensifier_fn = filename / "intensifier.json"

    if filename is not None:
        with open(optimization_fn) as fp:
            data = json.load(fp)

        self._runhistory.load(runhistory_fn, configspace=self._scenario.configspace)
        self._intensifier.load(intensifier_fn)

        self._used_target_function_walltime = data["used_target_function_walltime"]
        self._used_target_function_cputime = data["used_target_function_cputime"]
        self._finished = data["finished"]
        self._start_time = time.time() - data["used_walltime"]

optimize #

optimize(
    *, data_to_scatter: dict[str, Any] | None = None
) -> Configuration | list[Configuration]

运行贝叶斯优化循环。

参数#

data_to_scatter: dict[str, Any] | None 当用户将数据从本地进程散布到分布式网络时,这些数据会以轮询方式按核心数量分组分发。粗略地说,我们可以将这些数据保存在内存中,这样每次执行带有大数据集的目标函数时,就不必对数据进行(反)序列化。例如,当你的目标函数有一个跨所有目标函数共享的大数据集时,这个参数非常有用。

返回值#

incumbent : Configuration 找到的最佳配置。

源代码位于 smac/main/smbo.py
def optimize(self, *, data_to_scatter: dict[str, Any] | None = None) -> Configuration | list[Configuration]:
    """Runs the Bayesian optimization loop.

    Parameters
    ----------
    data_to_scatter: dict[str, Any] | None
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    incumbent : Configuration
        The best found configuration.
    """
    # We return the incumbent if we already finished the a process (we don't want to allow to call
    # optimize more than once).
    if self._finished:
        logger.info("Optimization process was already finished. Returning incumbent...")
        if self._scenario.count_objectives() == 1:
            return self.intensifier.get_incumbent()
        else:
            return self.intensifier.get_incumbents()

    # Start the timer before we do anything
    # If we continue the optimization, the starting time is set by the load method
    if self._start_time is None:
        self._start_time = time.time()

    for callback in self._callbacks:
        callback.on_start(self)

    dask_data_to_scatter = {}
    if isinstance(self._runner, DaskParallelRunner) and data_to_scatter is not None:
        dask_data_to_scatter = dict(data_to_scatter=self._runner._client.scatter(data_to_scatter, broadcast=True))
    elif data_to_scatter is not None:
        raise ValueError(
            "data_to_scatter is valid only for DaskParallelRunner, "
            f"but {dask_data_to_scatter} was provided for {self._runner.__class__.__name__}"
        )

    # Main BO loop
    while True:
        for callback in self._callbacks:
            callback.on_iteration_start(self)

        try:
            # Sample next trial from the intensification
            trial_info = self.ask()

            # We submit the trial to the runner
            # In multi-worker mode, SMAC waits till a new worker is available here
            self._runner.submit_trial(trial_info=trial_info, **dask_data_to_scatter)
        except StopIteration:
            self._stop = True

        # We add results from the runner if results are available
        self._add_results()

        # Some statistics
        logger.debug(
            f"Remaining wallclock time: {self.remaining_walltime}; "
            f"Remaining cpu time: {self.remaining_cputime}; "
            f"Remaining trials: {self.remaining_trials}"
        )

        if self.runhistory.finished % 50 == 0:
            logger.info(f"Finished {self.runhistory.finished} trials.")

        for callback in self._callbacks:
            callback.on_iteration_end(self)

        # Now we check whether we have to stop the optimization
        if self.budget_exhausted or self._stop:
            if self.budget_exhausted:
                logger.info("Configuration budget is exhausted:")
                logger.info(f"--- Remaining wallclock time: {self.remaining_walltime}")
                logger.info(f"--- Remaining cpu time: {self.remaining_cputime}")
                logger.info(f"--- Remaining trials: {self.remaining_trials}")
            else:
                logger.info("Shutting down because the stop flag was set.")

            # Wait for the trials to finish
            while self._runner.is_running():
                self._runner.wait()
                self._add_results()

            # Break from the intensification loop, as there are no more resources
            break

    for callback in self._callbacks:
        callback.on_end(self)

    # We only set the finished flag if the budget really was exhausted
    if self.budget_exhausted:
        self._finished = True

    if self._scenario.count_objectives() == 1:
        return self.intensifier.get_incumbent()
    else:
        return self.intensifier.get_incumbents()

print_stats #

print_stats() -> None

打印所有统计信息。

源代码位于 smac/main/smbo.py
def print_stats(self) -> None:
    """Prints all statistics."""
    logger.info(
        "\n"
        f"--- STATISTICS -------------------------------------\n"
        f"--- Incumbent changed: {self.intensifier.incumbents_changed}\n"
        f"--- Submitted trials: {self.runhistory.submitted} / {self._scenario.n_trials}\n"
        f"--- Finished trials: {self.runhistory.finished} / {self._scenario.n_trials}\n"
        f"--- Configurations: {self.runhistory._n_id}\n"
        f"--- Used wallclock time: {round(self.used_walltime)} / {self._scenario.walltime_limit} sec\n"
        "--- Used target function runtime: "
        f"{round(self.used_target_function_walltime, 2)} / {self._scenario.cputime_limit} sec\n"
        "--- Used target function CPU time: "
        f"{round(self.used_target_function_cputime, 2)} / {self._scenario.cputime_limit} sec\n"
        f"----------------------------------------------------"
    )

register_callback #

register_callback(
    callback: Callback, index: int | None = None
) -> None

注册一个回调函数,用于在贝叶斯优化循环之前、期间和之后调用。

默认情况下,回调函数被附加到列表中。

参数#

callback : Callback 要注册的回调函数。
index : int | None, optional 回调函数应该注册的索引。默认为 None。如果为 None,则将回调函数附加到列表中。

源代码位于 smac/main/smbo.py
def register_callback(self, callback: Callback, index: int | None = None) -> None:
    """
    Registers a callback to be called before, in between, and after the Bayesian optimization loop.

    Callback is appended to the list by default.

    Parameters
    ----------
    callback : Callback
        The callback to be registered.
    index : int, optional
        The index at which the callback should be registered. The default is None.
        If it is None, append the callback to the list.
    """
    if index is None:
        index = len(self._callbacks)
    self._callbacks.insert(index, callback)

reset #

reset() -> None

重置优化器、强化器和运行历史的内部变量。

源代码位于 smac/main/smbo.py
def reset(self) -> None:
    """Resets the internal variables of the optimizer, intensifier, and runhistory."""
    self._used_target_function_walltime = 0
    self._used_target_function_cputime = 0
    self._finished = False

    # We also reset runhistory and intensifier here
    self._runhistory.reset()
    self._intensifier.reset()

save #

save() -> None

保存当前的统计信息、运行历史和强化器。

源代码位于 smac/main/smbo.py
def save(self) -> None:
    """Saves the current stats, runhistory, and intensifier."""
    path = self._scenario.output_directory

    if path is not None:
        data = {
            "used_walltime": self.used_walltime,
            "used_target_function_walltime": self.used_target_function_walltime,
            "used_target_function_cputime": self.used_target_function_cputime,
            "last_update": time.time(),
            "finished": self._finished,
        }

        # Save optimization data
        with open(str(path / "optimization.json"), "w") as file:
            json.dump(data, file, indent=2, cls=NumpyEncoder)

        # And save runhistory and intensifier
        self._runhistory.save(path / "runhistory.json")
        self._intensifier.save(path / "intensifier.json")

tell #

tell(
    info: TrialInfo, value: TrialValue, save: bool = True
) -> None

将试验结果添加到运行历史并更新统计对象。

参数#

info : TrialInfo 描述要处理结果的试验。
value : TrialValue 包含有关试验执行的相关信息。
save : bool, optional to True 是否应保存运行历史。

源代码位于 smac/main/smbo.py
def tell(
    self,
    info: TrialInfo,
    value: TrialValue,
    save: bool = True,
) -> None:
    """Adds the result of a trial to the runhistory and updates the stats object.

    Parameters
    ----------
    info : TrialInfo
        Describes the trial from which to process the results.
    value : TrialValue
        Contains relevant information regarding the execution of a trial.
    save : bool, optional to True
        Whether the runhistory should be saved.
    """
    if info.config.origin is None:
        info.config.origin = "Custom"

    for callback in self._callbacks:
        response = callback.on_tell_start(self, info, value)

        # If a callback returns False, the optimization loop should be interrupted
        # the other callbacks are still being called.
        if response is False:
            logger.info("A callback returned False. Abort is requested.")
            self._stop = True

    # Some sanity checks here
    if self._intensifier.uses_instances and info.instance is None:
        raise ValueError("Passed instance is None but intensifier requires instances.")

    if self._intensifier.uses_budgets and info.budget is None:
        raise ValueError("Passed budget is None but intensifier requires budgets.")

    self._runhistory.add(
        config=info.config,
        cost=value.cost,
        time=value.time,
        cpu_time=value.cpu_time,
        status=value.status,
        instance=info.instance,
        seed=info.seed,
        budget=info.budget,
        starttime=value.starttime,
        endtime=value.endtime,
        additional_info=value.additional_info,
        force_update=True,  # Important to overwrite the status RUNNING
    )

    logger.debug(f"Tell method was called with cost {value.cost} ({StatusType(value.status).name}).")

    for callback in self._callbacks:
        response = callback.on_tell_end(self, info, value)

        # If a callback returns False, the optimization loop should be interrupted
        # the other callbacks are still being called.
        if response is False:
            logger.info("A callback returned False. Abort is requested.")
            self._stop = True

    if save:
        self.save()

update_acquisition_function #

update_acquisition_function(
    acquisition_function: AbstractAcquisitionFunction,
) -> None

更新采集函数,包括相关的模型和采集优化器。

源代码位于 smac/main/smbo.py
def update_acquisition_function(self, acquisition_function: AbstractAcquisitionFunction) -> None:
    """Updates the acquisition function including the associated model and the acquisition
    optimizer.
    """
    if (config_selector := self._intensifier._config_selector) is not None:
        config_selector._acquisition_function = acquisition_function
        config_selector._acquisition_function.model = config_selector._model

        assert config_selector._acquisition_maximizer is not None
        config_selector._acquisition_maximizer.acquisition_function = acquisition_function

update_model #

update_model(model: AbstractModel) -> None

更新模型并更新采集函数。

源代码位于 smac/main/smbo.py
def update_model(self, model: AbstractModel) -> None:
    """Updates the model and updates the acquisition function."""
    if (config_selector := self._intensifier._config_selector) is not None:
        config_selector._model = model

        assert config_selector._acquisition_function is not None
        config_selector._acquisition_function.model = model

validate #

validate(
    config: Configuration, *, seed: int | None = None
) -> float | ndarray[float]

在优化过程中使用的种子之外的其他种子以及最高预算(如果预算类型是实值)上验证配置。不超过场景中定义的配置调用或种子的最大数量。

参数#

config : Configuration 要验证的配置。
In case that the budget type is real-valued budget, this argument is ignored.
seed : int | None, defaults to None 如果为 None,则使用场景中的种子。

返回值#

cost : float | ndarray[float] 配置的平均成本。在多保真度的情况下,每个目标的成本都将平均。

源代码位于 smac/main/smbo.py
def validate(
    self,
    config: Configuration,
    *,
    seed: int | None = None,
) -> float | ndarray[float]:
    """Validates a configuration on other seeds than the ones used in the optimization process and on the highest
    budget (if budget type is real-valued). Does not exceed the maximum number of config calls or seeds as defined
    in the scenario.

    Parameters
    ----------
    config : Configuration
        Configuration to validate
        In case that the budget type is real-valued budget, this argument is ignored.
    seed : int | None, defaults to None
        If None, the seed from the scenario is used.

    Returns
    -------
    cost : float | ndarray[float]
        The averaged cost of the configuration. In case of multi-fidelity, the cost of each objective is
        averaged.
    """
    if seed is None:
        seed = self._scenario.seed

    costs = []
    for trial in self._intensifier.get_trials_of_interest(config, validate=True, seed=seed):
        kwargs: dict[str, Any] = {}
        if trial.seed is not None:
            kwargs["seed"] = trial.seed
        if trial.budget is not None:
            kwargs["budget"] = trial.budget
        if trial.instance is not None:
            kwargs["instance"] = trial.instance

        # TODO: Use submit run for faster evaluation
        # self._runner.submit_trial(trial_info=trial)
        _, cost, _, _, _ = self._runner.run(config, **kwargs)
        costs += [cost]

    np_costs = np.array(costs)
    return np.mean(np_costs, axis=0)