跳到内容

目标函数运行器

smac.runner.target_function_runner #

TargetFunctionRunner #

TargetFunctionRunner(
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
)

基类: 抽象串行运行器

用于执行作为 Python 函数的目标函数的类。为给定配置和资源限制评估函数。

目标函数可以返回一个浮点数(损失),或者一个元组,其中第一个元素是浮点数,第二个元素是额外的运行信息。在多目标设置中,浮点数会被一个浮点数列表取代。

参数#

target_function : Callable 目标函数。 scenario : Scenario required_arguments : list[str], defaults to [] 需要传递给目标函数的一系列必要参数。

源代码位于 smac/runner/target_function_runner.py
def __init__(
    self,
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function

    # Check if target function is callable
    if not callable(self._target_function):
        raise TypeError(
            "Argument `target_function` must be a callable but is type" f"`{type(self._target_function)}`."
        )

    # Signatures here
    signature = inspect.signature(self._target_function).parameters
    for argument in required_arguments:
        if argument not in signature.keys():
            raise RuntimeError(
                f"Target function needs to have the arguments {required_arguments} "
                f"but could not find {argument}."
            )

    # Now we check for additional arguments which are not used by SMAC
    # However, we only want to warn the user and not
    for key in list(signature.keys())[1:]:
        if key not in required_arguments:
            logger.warning(f"The argument {key} is not set by SMAC: Consider removing it from the target function.")

    # Pynisher limitations
    if (memory := self._scenario.trial_memory_limit) is not None:
        unit = None
        if isinstance(memory, (tuple, list)):
            memory, unit = memory
        memory = int(math.ceil(memory))
        if unit is not None:
            memory = (memory, unit)

    if (time := self._scenario.trial_walltime_limit) is not None:
        time = int(math.ceil(time))

    self._memory_limit = memory
    self._algorithm_walltime_limit = time

__call__ #

__call__(
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
)

调用算法,算法在 run 方法中处理。

源代码位于 smac/runner/target_function_runner.py
def __call__(
    self,
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
):
    """Calls the algorithm, which is processed in the ``run`` method."""
    return algorithm(config, **algorithm_kwargs)

count_available_workers #

count_available_workers() -> int

返回可用工作线程的数量。串行工作线程只有一个工作线程。

源代码位于 smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

如果设置了算法墙钟时间限制或内存限制,则使用 pynisher 调用目标函数。否则,直接调用该函数。

参数#

config : Configuration 要传递给目标函数的配置。 instance : str | None, defaults to None 问题实例。 budget : float | None, defaults to None 一个正的实数值,表示目标函数内部处理的任意限制。 seed : int, defaults to None dask_data_to_scatter: dict[str, Any] 当不使用 Dask 时,此 kwargs 必须为空!() 当用户将数据从其本地进程分散到分布式网络时,这些数据会以轮询方式按核心数分组分布。粗略地说,我们可以将这些数据保存在内存中,这样每次要执行带有大数据集的目标函数时,就不必(反)序列化数据。例如,当您的目标函数具有跨所有目标函数共享的大数据集时,此参数非常有用。

返回值#

status : StatusType 试验状态。 cost : float | list[float] 试验的成本。 runtime : float 目标函数运行所需的时间。 cpu_time : float 目标函数在硬件上运行所需的时间。 additional_info : dict 所有其他额外的试验信息。

源代码位于 smac/runner/target_function_runner.py
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
    **dask_data_to_scatter: dict[str, Any],
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Calls the target function with pynisher if algorithm wall time limit or memory limit is
    set. Otherwise, the function is called directly.

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None
    dask_data_to_scatter: dict[str, Any]
        This kwargs must be empty when we do not use dask! ()
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    kwargs.update(dask_data_to_scatter)

    if "seed" in self._required_arguments:
        kwargs["seed"] = seed

    if "instance" in self._required_arguments:
        kwargs["instance"] = instance

    if "budget" in self._required_arguments:
        kwargs["budget"] = budget

    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    additional_info = {}
    status = StatusType.CRASHED

    # If memory limit or walltime limit is set, we wanna use pynisher
    target_function: Callable
    if self._memory_limit is not None or self._algorithm_walltime_limit is not None:
        target_function = limit(
            self._target_function,
            memory=self._memory_limit,
            wall_time=self._algorithm_walltime_limit,
            wrap_errors=True,  # Hard to describe; see https://github.com/automl/pynisher
        )
    else:
        target_function = self._target_function

    # We don't want the user to change the configuration
    config_copy = copy.deepcopy(config)

    # Call target function
    try:
        start_time = time.time()
        cpu_time = time.process_time()
        rval = self(config_copy, target_function, kwargs)
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start_time
        status = StatusType.SUCCESS
    except WallTimeoutException:
        status = StatusType.TIMEOUT
    except MemoryLimitException:
        status = StatusType.MEMORYOUT
    except Exception as e:
        cost = np.asarray(cost).squeeze().tolist()
        additional_info = {
            "traceback": traceback.format_exc(),
            "error": repr(e),
        }
        status = StatusType.CRASHED

    if status != StatusType.SUCCESS:
        return status, cost, runtime, cpu_time, additional_info

    if isinstance(rval, tuple):
        result, additional_info = rval
    else:
        result, additional_info = rval, {}

    # Do some sanity checking (for multi objective)
    error = f"Returned costs {result} does not match the number of objectives {self._objectives}."

    # If dict convert to array and make sure the order is correct
    if isinstance(result, dict):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

        ordered_cost: list[float] = []
        for name in self._objectives:
            if name not in result:
                raise RuntimeError(f"Objective {name} was not found in the returned costs.")  # noqa: E713

            ordered_cost.append(result[name])

        result = ordered_cost

    if isinstance(result, list):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

    if isinstance(result, float):
        if isinstance(self._objectives, list) and len(self._objectives) != 1:
            raise RuntimeError(error)

    cost = result

    if cost is None:
        status = StatusType.CRASHED
        cost = self._crash_cost

    # We want to get either a float or a list of floats.
    cost = np.asarray(cost).squeeze().tolist()

    return status, cost, runtime, cpu_time, additional_info

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

围绕 run() 方法的包装器,用于执行和检查给定配置的执行。此函数封装了常见的处理逻辑,从而简化了 run() 的实现。

参数#

trial_info : RunInfo 包含足够信息以独立执行配置运行的对象。 dask_data_to_scatter: dict[str, Any] 当用户将数据从其本地进程分散到分布式网络时,这些数据会以轮询方式按核心数分组分布。粗略地说,我们可以将这些数据保存在内存中,这样每次要执行带有大数据集的目标函数时,就不必(反)序列化数据。例如,当您的目标函数具有跨所有目标函数共享的大数据集时,此参数非常有用。

返回值#

info : TrialInfo 包含已启动配置的对象。 value : TrialValue 包含关于配置状态/性能的信息。

源代码位于 smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

此函数以串行方式提交 trial_info 对象。由于此任务只有一个工作线程,此接口可被视为 run 方法的包装器。

结果/异常都可以在此步骤中完全确定,因此两个列表都被正确填充。

参数#

trial_info : TrialInfo 包含已启动配置的对象。

源代码位于 smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

SMBO/强化器可能需要等待试验完成才能做出决策。对于串行运行器,结果立即可用,因此无需等待。

源代码位于 smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return