并行用法:从命令行生成工作节点

Auto-sklearn 使用 dask.distributed 进行并行优化。

此示例展示了如何从命令行手动启动 dask 调度器并为 Auto-sklearn 生成工作节点。以此示例为起点,在多台机器上并行化 Auto-sklearn

要在单台机器上并行运行 Auto-sklearn,请查看示例 单机并行用法

如果想在 Python 中手动启动所有组件,请参阅 :ref:sphx_glr_examples_60_search_example_parallel_manual_spawning_python.py

注意:由于问题 https://github.com/dask/distributed/issues/5627,以上示例已禁用

可以从 https://docs.dask.org.cn/en/latest/setup/cli.html 了解更多关于 dask 命令行接口的信息。

手动将 dask 客户端传递给 Auto-sklearn 时,所有逻辑都必须由 if __name__ == "__main__": 语句守护!我们使用多个此类语句来正确地将此示例渲染为 notebook,并允许通过命令行执行。

背景

要在多台机器上分布式运行 Auto-sklearn,我们需要设置三个组件

  1. Auto-sklearn 和 dask 客户端。它将管理所有工作负载,寻找要评估的新配置,并通过 dask 客户端提交任务。由于它运行的是贝叶斯优化,应在其自身的 CPU 上执行。

  2. dask 工作节点。它们将执行运行机器学习算法的实际工作,并且每个都需要自己的 CPU。

  3. 调度器。它管理 dask 客户端和不同 dask 工作节点之间的通信。由于客户端和所有工作节点都连接到调度器,它必须首先启动。这是一项轻量级工作,不需要自己的 CPU。

我们现在将按相反的顺序启动这三个组件:调度器、工作节点和客户端。另外,在实际设置中,调度器和工作节点应该从命令行启动,而不是像这里这样(为了有一个自包含的示例)通过 Python 文件中的 subprocess 模块启动。

导入语句

import multiprocessing
import subprocess
import time

import dask.distributed
import sklearn.datasets
import sklearn.metrics

from autosklearn.classification import AutoSklearnClassifier
from autosklearn.constants import MULTICLASS_CLASSIFICATION

tmp_folder = "/tmp/autosklearn_parallel_3_example_tmp"

worker_processes = []

0. 设置客户端与调度器通信

在此示例中,dask 调度器启动时没有指定显式地址和端口。相反,调度器会使用一个空闲端口,并将相关信息存储在一个文件中,我们为此文件提供了名称和位置。该文件名也提供给工作节点,以便它们找到连接到调度器的所有相关信息。

scheduler_file_name = "scheduler-file.json"

1. 启动调度器

使用以下 bash 命令启动调度器

dask-scheduler --scheduler-file scheduler-file.json --idle-timeout 10

我们现在将从 Python 中执行此 bash 命令,以便得到一个自包含的示例

def cli_start_scheduler(scheduler_file_name):
    command = f"dask-scheduler --scheduler-file {scheduler_file_name} --idle-timeout 10"
    proc = subprocess.run(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        shell=True,
        check=True,
    )
    while proc.returncode is None:
        time.sleep(1)


if __name__ == "__main__":
    process_python_worker = multiprocessing.Process(
        target=cli_start_scheduler,
        args=(scheduler_file_name,),
    )
    process_python_worker.start()
    worker_processes.append(process_python_worker)

    # Wait a second for the scheduler to become available
    time.sleep(1)

2. 启动两个工作节点

使用以下 bash 命令启动调度器

DASK_DISTRIBUTED__WORKER__DAEMON=False \
    dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 \
    --scheduler-file scheduler-file.json

我们现在将从 Python 中执行此 bash 命令,以便得到一个自包含的示例。请注意,在这种情况下需要设置 DASK_DISTRIBUTED__WORKER__DAEMON=False,因为 dask-worker 会创建一个新进程,默认情况下这与 Auto-sklearn 在工作节点内部创建新进程不兼容。我们通过传递 --memory-limit 禁用 dask 的内存管理,因为 Auto-sklearn 自己进行内存管理。

def cli_start_worker(scheduler_file_name):
    command = (
        "DASK_DISTRIBUTED__WORKER__DAEMON=False "
        "dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 "
        f"--scheduler-file {scheduler_file_name}"
    )
    proc = subprocess.run(
        command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True
    )
    while proc.returncode is None:
        time.sleep(1)


if __name__ == "__main__":
    for _ in range(2):
        process_cli_worker = multiprocessing.Process(
            target=cli_start_worker,
            args=(scheduler_file_name,),
        )
        process_cli_worker.start()
        worker_processes.append(process_cli_worker)

    # Wait a second for workers to become available
    time.sleep(1)

3. 在 Python 中创建客户端

最后,我们创建一个 dask 集群,它也通过调度器创建的文件中的信息连接到调度器。

client = dask.distributed.Client(scheduler_file=scheduler_file_name)

启动 Auto-sklearn

if __name__ == "__main__":
    X, y = sklearn.datasets.load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
        X, y, random_state=1
    )

    automl = AutoSklearnClassifier(
        delete_tmp_folder_after_terminate=False,
        time_left_for_this_task=30,
        per_run_time_limit=10,
        memory_limit=2048,
        tmp_folder=tmp_folder,
        seed=777,
        # n_jobs is ignored internally as we pass a dask client.
        n_jobs=1,
        # Pass a dask client which connects to the previously constructed cluster.
        dask_client=client,
    )
    automl.fit(X_train, y_train)

    automl.fit_ensemble(
        y_train,
        task=MULTICLASS_CLASSIFICATION,
        dataset_name="digits",
        ensemble_kwargs={"ensemble_size": 20},
        ensemble_nbest=50,
    )

    predictions = automl.predict(X_test)
    print(automl.sprint_statistics())
    print("Accuracy score", sklearn.metrics.accuracy_score(y_test, predictions))
auto-sklearn results:
  Dataset name: 3f766bf6-38c4-11ed-8830-892d16569fbe
  Metric: accuracy
  Best validation score: 0.992908
  Number of target algorithm runs: 10
  Number of successful target algorithm runs: 9
  Number of crashed target algorithm runs: 0
  Number of target algorithms that exceeded the time limit: 1
  Number of target algorithms that exceeded the memory limit: 0

Accuracy score 0.958041958041958

等待所有工作节点关闭

仅当工作节点是从此 Python 脚本中启动时,这才是必要的。在实际应用中,通常会直接从命令行启动它们。

if __name__ == "__main__":
    process_python_worker.join()
    for process in worker_processes:
        process.join()

脚本总运行时间: ( 0 分 43.202 秒)

图库由 Sphinx-Gallery 生成