跳到内容

目标函数脚本运行器

smac.runner.target_function_script_runner #

TargetFunctionScriptRunner #

TargetFunctionScriptRunner(
    target_function: str,
    scenario: Scenario,
    required_arguments: list[str] = None,
)

继承自: AbstractSerialRunner

用于从脚本执行目标函数的类。使用 Popen 在子进程中执行脚本。

以下示例展示了脚本如何被调用:target_function --instance=test --instance_features=test --seed=0 --hyperparameter1=5323

脚本必须返回以下形式的回显(空白字符已移除):cost=0.5; runtime=0.01; status=SUCCESS; additional_info=test(单目标)cost=0.5, 0.4; runtime=0.01; status=SUCCESS; additional_info=test(多目标)

状态必须是字符串,并且必须是 StatusType 值之一。然而,runtimestatusadditional_info 是可选的。

注意#

每次传递一个实例时,也会传递一个实例特征,形式为逗号分隔的浮点数列表(无空格)。如果未给定实例的实例特征,则传递一个空列表。

参数#

target_function : Callable 目标函数。scenario : Scenario 场景。required_arguments : list[str] 必需参数列表,这些参数会传递给目标函数。

源代码位于 smac/runner/target_function_script_runner.py
def __init__(
    self,
    target_function: str,
    scenario: Scenario,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function

    # Check if target function is callable
    if not isinstance(self._target_function, str):
        raise TypeError(
            "Argument `target_function` must be a string but is type" f"`{type(self._target_function)}`."
        )

    if self._scenario.trial_memory_limit is not None:
        logger.warning("Trial memory limit is not supported for script target functions.")

    if self._scenario.trial_walltime_limit is not None:
        logger.warning("Trial walltime limit is not supported for script target functions.")

__call__ #

__call__(
    algorithm_kwargs: dict[str, Any]
) -> tuple[str, str]

调用算法,该算法在 `run` 方法中处理。

源代码位于 smac/runner/target_function_script_runner.py
def __call__(
    self,
    algorithm_kwargs: dict[str, Any],
) -> tuple[str, str]:
    """Calls the algorithm, which is processed in the ``run`` method."""
    cmd = [self._target_function]
    for k, v in algorithm_kwargs.items():
        v = str(v)
        k = str(k)

        # Let's remove some spaces
        v = v.replace(" ", "")

        cmd += [f"--{k}={v}"]

    logger.debug(f"Calling: {' '.join(cmd)}")
    p = Popen(cmd, shell=False, stdout=PIPE, stderr=PIPE, universal_newlines=True)
    output, error = p.communicate()

    logger.debug("Stdout: %s" % output)
    logger.debug("Stderr: %s" % error)

    return output, error

count_available_workers #

count_available_workers() -> int

返回可用工作进程的数量。串行工作进程只有一个工作进程。

源代码位于 smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

调用目标函数。

参数#

config : Configuration 要传递给目标函数的配置。instance : str | None, defaults to None 问题实例。budget : float | None, defaults to None 一个正的实数值,表示由目标函数内部处理的任意限制。seed : int, defaults to None

返回值#

status : StatusType 试验的状态。cost : float | list[float] 试验产生的成本。runtime : float 目标函数的运行时间。cpu_time : float 目标函数在硬件上运行的时间。additional_info : dict 所有其他附加试验信息。

源代码位于 smac/runner/target_function_script_runner.py
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Calls the target function.

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    if "seed" in self._required_arguments:
        kwargs["seed"] = seed

    if "instance" in self._required_arguments:
        kwargs["instance"] = instance

        # In contrast to the normal target function runner, we also add the instance features here.
        if self._scenario.instance_features is not None and instance in self._scenario.instance_features:
            kwargs["instance_features"] = self._scenario.instance_features[instance]
        else:
            kwargs["instance_features"] = []

    if "budget" in self._required_arguments:
        kwargs["budget"] = budget

    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    additional_info = {}
    status = StatusType.SUCCESS

    # Add config arguments to the kwargs
    for k, v in dict(config).items():
        if k in kwargs:
            raise RuntimeError(f"The key {k} is already in use. Please use a different one.")
        kwargs[k] = v

    # Call target function
    start_time = time.time()
    cpu_time = time.process_time()
    output, error = self(kwargs)
    cpu_time = time.process_time() - cpu_time
    runtime = time.time() - start_time

    # Now we have to parse the std output
    # First remove white-spaces
    output = output.replace(" ", "")

    outputs = {}
    for pair in output.split(";"):
        try:
            kv = pair.split("=")
            k, v = kv[0], kv[1]

            # Get rid of the trailing newline
            v = v.strip()

            outputs[k] = v
        except Exception:
            pass

    # Parse status
    if "status" in outputs:
        status = getattr(StatusType, outputs["status"])

    # Parse costs (depends on the number of objectives)
    if "cost" in outputs:
        if self._n_objectives == 1:
            cost = float(outputs["cost"])
        else:
            costs = outputs["cost"].split(",")
            costs = [float(c) for c in costs]
            cost = costs

            if len(costs) != self._n_objectives:
                raise RuntimeError("The number of costs does not match the number of objectives.")
    else:
        status = StatusType.CRASHED

    # Overwrite runtime
    if "runtime" in outputs:
        runtime = float(outputs["runtime"])

    # Overwrite CPU time
    if "cpu_time" in outputs:
        cpu_time = float(outputs["cpu_time"])

    # Add additional info
    if "additional_info" in outputs:
        additional_info["additional_info"] = outputs["additional_info"]

    if status != StatusType.SUCCESS:
        additional_info["error"] = error

        if cost != self._crash_cost:
            cost = self._crash_cost
            logger.info(
                "The target function crashed but returned a cost. The cost is ignored and replaced by crash cost."
            )

    return status, cost, runtime, cpu_time, additional_info

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

`run()` 方法的包装器,用于执行并检查给定配置的执行。此函数封装了通用的处理逻辑,从而简化了 `run()` 方法的实现。

参数#

trial_info : RunInfo 包含足够信息以便独立执行配置运行的对象。dask_data_to_scatter: dict[str, Any 当用户将数据从本地进程分散到分布式网络时,数据会以轮询方式按核心数进行分组分发。粗略地说,我们可以将这些数据保存在内存中,这样每次需要执行带有大数据集的目标函数时,就不必进行(反)序列化。例如,当您的目标函数有一个在所有目标函数间共享的大数据集时,此参数非常有用。

返回值#

info : TrialInfo 包含已启动配置的对象。value : TrialValue 包含配置的状态/性能信息。

源代码位于 smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

此函数以串行方式提交一个 `trial_info` 对象。由于此任务只有一个工作进程,此接口可以视为对 `run` 方法的包装。

结果/异常都可以在此步骤中完全确定,因此两个列表都会被正确填充。

参数#

trial_info : TrialInfo 包含已启动配置的对象。

源代码位于 smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

SMBO/强化器可能需要等待试验完成才能做出决策。对于串行运行器,无需等待,因为结果立即可用。

源代码位于 smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return