跳到内容

抽象运行器

smac.runner.abstract_runner #

AbstractRunner #

AbstractRunner(
    scenario: Scenario, required_arguments: list[str] = None
)

基类: ABC

处理 SMAC 配置执行的接口类。此接口定义了如何与 SMBO 循环交互。运行配置和处理结果的复杂性通过 AbstractRunner 抽象到 SMBO。

从 SMBO 的角度来看,启动配置遵循以下提交/收集方案

  1. 运行通过 submit_run() 启动

  2. submit_run 在内部调用 run_wrapper(),这是一个包含不同运行器通用处理函数的方法。

  3. 实现 AbstractRunner 的类定义了 run(),它实际上是将 TrialInfo 转换为 TrialValue 的算法,即将配置转换为实际结果。
  4. 已完成的运行通过 iter_results() 收集,该方法迭代并消费任何已完成的运行(如果有)。
  5. 此接口还提供了 wait() 方法,作为一种机制,以确保我们在下一次迭代中有足够的数据进行决策。例如,强化器可能需要等待更多结果可用后才能选择下一个挑战者。
参数#

scenario : Scenario required_arguments : list[str] 一个必需参数列表,这些参数被传递给目标函数。

源代码位于 smac/runner/abstract_runner.py
def __init__(
    self,
    scenario: Scenario,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    self._scenario = scenario
    self._required_arguments = required_arguments

    # The results are a FIFO structure, implemented via a list
    # (because the Queue lock is not pickable). Finished runs are
    # put in this list and collected via _process_pending_runs
    self._results_queue: list[tuple[TrialInfo, TrialValue]] = []
    self._crash_cost = scenario.crash_cost
    self._supports_memory_limit = False

    if isinstance(scenario.objectives, str):
        objectives = [scenario.objectives]
    else:
        objectives = scenario.objectives

    self._objectives = objectives
    self._n_objectives = scenario.count_objectives()

    # We need to exapdn crash cost if the user did not do it
    if self._n_objectives > 1:
        if not isinstance(scenario.crash_cost, list):
            assert isinstance(scenario.crash_cost, float)
            self._crash_cost = [scenario.crash_cost for _ in range(self._n_objectives)]

meta property #

meta: dict[str, Any]

返回创建对象的元数据。

count_available_workers abstractmethod #

count_available_workers() -> int

返回可用工作线程数。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def count_available_workers(self) -> int:
    """Returns the number of available workers."""
    raise NotImplementedError

is_running abstractmethod #

is_running() -> bool

是否有试验仍在运行。

通常,如果运行器是串行的,启动一个试验会立即返回其结果。在并行运行器上,可能存在待完成的配置。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def is_running(self) -> bool:
    """Whether there are trials still running.

    Generally, if the runner is serial, launching a trial instantly returns its result. On
    parallel runners, there might be pending configurations to complete.
    """
    raise NotImplementedError

iter_results abstractmethod #

iter_results() -> Iterator[tuple[TrialInfo, TrialValue]]

此方法返回任何已完成的配置,并返回一个列表,其中包含执行这些配置的结果。此类不断向 self._results_queue 填充结果,直到调用 get_finished 试验完成。在这种情况下,self._results_queue 列表会被清空,并返回运行 run 生成的所有试验值。

返回值#

Iterator[tuple[TrialInfo, TrialValue]]: 一个 TrialInfo/TrialValue 元组列表,所有这些元组都已完成。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def iter_results(self) -> Iterator[tuple[TrialInfo, TrialValue]]:
    """This method returns any finished configuration, and returns a list with the
    results of executing the configurations. This class keeps populating results
    to ``self._results_queue`` until a call to ``get_finished`` trials is done. In this case,
    the `self._results_queue` list is emptied and all trial values produced by running
    `run` are returned.

    Returns
    -------
    Iterator[tuple[TrialInfo, TrialValue]]:
        A list of TrialInfo/TrialValue tuples, all of which have been finished.
    """
    raise NotImplementedError

run abstractmethod #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

在单个实例-预算-种子组合(即试验)上使用配置运行目标函数。

参数#

config : Configuration 要传递给目标函数的配置。instance : str | None,默认为 None 问题实例。budget : float | None,默认为 None 一个正实数,表示目标函数在内部处理的任意限制。seed : int,默认为 None

返回值#

status : StatusType 试验的状态。cost : float | list[float] 试验的结果成本。runtime : float 目标函数运行所需的时间。cpu_time : float 目标函数在硬件上运行所需的时间。additional_info : dict 所有其他附加试验信息。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Runs the target function with a configuration on a single instance-budget-seed
    combination (aka trial).

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    raise NotImplementedError

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

围绕 run() 的包装器,用于执行和检查给定配置的执行。此函数封装了常见的处理,从而简化了 run() 的实现。

参数#

trial_info : RunInfo 包含足够信息以独立执行配置运行的对象。dask_data_to_scatter: dict[str, Any] 当用户将其本地进程中的数据分散到分布式网络时,此数据以轮询方式按核心数分组进行分发。粗略地说,我们可以将这些数据保留在内存中,这样每次我们想要使用大型数据集执行目标函数时,就不必(反)序列化数据了。例如,当您的目标函数有一个跨所有目标函数共享的大型数据集时,此参数非常有用。

返回值#

info : TrialInfo 包含已启动配置的对象。value : TrialValue 包含关于配置状态/性能的信息。

源代码位于 smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial abstractmethod #

submit_trial(trial_info: TrialInfo) -> None

此函数提交嵌入在 TrialInfo 对象中的配置,并使用其中一个工作线程生成结果(此结果最终将在 self._results_queue FIFO 上可用)。

SMBO 将调用此接口方法,期望由工作线程执行一个函数。执行什么由 trial_info 指示,以及 如何 执行由实现 run 方法的子类决定。

由于配置提交可以是串行或并行的操作,因此期望由子类实现。

参数#

trial_info : TrialInfo 包含已启动配置的对象。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a configuration embedded in a TrialInfo object, and uses one of the workers to produce
    a result (such result will eventually be available on the ``self._results_queue`` FIFO).

    This interface method will be called by SMBO, with the expectation that a function will be executed by a worker.
    What will be executed is dictated by ``trial_info``, and `how` it will be executed is decided via the child
    class that implements a ``run`` method.

    Because config submission can be a serial/parallel endeavor, it is expected to be implemented by a child class.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    raise NotImplementedError

wait abstractmethod #

wait() -> None

SMBO/强化器可能需要等待试验完成才能做出决策。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision."""
    raise NotImplementedError