跳到内容

抽象串行运行器

smac.runner.abstract_serial_runner #

AbstractSerialRunner #

AbstractSerialRunner(
    scenario: Scenario, required_arguments: list[str] = None
)

继承自: AbstractRunner

源代码位于 smac/runner/abstract_runner.py
def __init__(
    self,
    scenario: Scenario,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    self._scenario = scenario
    self._required_arguments = required_arguments

    # The results are a FIFO structure, implemented via a list
    # (because the Queue lock is not pickable). Finished runs are
    # put in this list and collected via _process_pending_runs
    self._results_queue: list[tuple[TrialInfo, TrialValue]] = []
    self._crash_cost = scenario.crash_cost
    self._supports_memory_limit = False

    if isinstance(scenario.objectives, str):
        objectives = [scenario.objectives]
    else:
        objectives = scenario.objectives

    self._objectives = objectives
    self._n_objectives = scenario.count_objectives()

    # We need to exapdn crash cost if the user did not do it
    if self._n_objectives > 1:
        if not isinstance(scenario.crash_cost, list):
            assert isinstance(scenario.crash_cost, float)
            self._crash_cost = [scenario.crash_cost for _ in range(self._n_objectives)]

meta 属性 #

meta: dict[str, Any]

返回创建对象的元数据。

count_available_workers #

count_available_workers() -> int

返回可用工作器的数量。串行工作器只有一个工作器。

源代码位于 smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run 抽象方法 #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

使用给定的配置,在单一的实例-预算-种子组合(即试验)上运行目标函数。

参数#

config : Configuration 传递给目标函数的配置。 instance : str | None,默认为 None 问题实例。 budget : float | None,默认为 None 一个正的实数值,表示目标函数内部处理的任意限制。 seed : int,默认为 None

返回值#

status : StatusType 试验状态。 cost : float | list[float] 试验产生的成本。 runtime : float 目标函数的运行时间。 cpu_time : float 目标函数在硬件上的运行时间。 additional_info : dict 所有其他的试验附加信息。

源代码位于 smac/runner/abstract_runner.py
@abstractmethod
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Runs the target function with a configuration on a single instance-budget-seed
    combination (aka trial).

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    raise NotImplementedError

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

run() 方法的包装器,用于执行并检查给定配置的执行。此函数封装了常见的处理/流程,从而简化了 run() 方法的实现。

参数#

trial_info : RunInfo 包含足够信息以独立执行配置运行的对象。 dask_data_to_scatter: dict[str, Any] 当用户将其本地进程中的数据分散到分布式网络时,这些数据会以轮询方式根据核心数量进行分组分发。粗略地说,我们可以将这些数据保存在内存中,这样在每次希望使用大数据集执行目标函数时,就不必(反)序列化数据。例如,当您的目标函数有一个在所有目标函数中共享的大数据集时,此参数非常有用。

返回值#

info : TrialInfo 包含已启动配置的对象。 value : TrialValue 包含配置状态/性能信息。

源代码位于 smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

此函数以串行方式提交一个 trial_info 对象。由于此任务只有一个工作器,此接口可以视为对 run 方法的包装器。

结果和异常都可以在此步骤完全确定,因此两个列表都被正确填充。

参数#

trial_info : TrialInfo 包含已启动配置的对象。

源代码位于 smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

SMBO/强化器可能需要在试验完成后才能做出决策。对于串行运行器,无需等待,因为结果会立即可用。

源代码位于 smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return