注意
点击 此处 下载完整的示例代码或通过 Binder 在浏览器中运行此示例
并行用法:从命令行生成工作节点¶
Auto-sklearn 使用 dask.distributed 进行并行优化。
此示例展示了如何从命令行手动启动 dask 调度器并为 Auto-sklearn 生成工作节点。以此示例为起点,在多台机器上并行化 Auto-sklearn。
要在单台机器上并行运行 Auto-sklearn,请查看示例 单机并行用法。
如果想在 Python 中手动启动所有组件,请参阅 :ref:sphx_glr_examples_60_search_example_parallel_manual_spawning_python.py
。
注意:由于问题 https://github.com/dask/distributed/issues/5627,以上示例已禁用
可以从 https://docs.dask.org.cn/en/latest/setup/cli.html 了解更多关于 dask 命令行接口的信息。
手动将 dask 客户端传递给 Auto-sklearn 时,所有逻辑都必须由 if __name__ == "__main__":
语句守护!我们使用多个此类语句来正确地将此示例渲染为 notebook,并允许通过命令行执行。
背景¶
要在多台机器上分布式运行 Auto-sklearn,我们需要设置三个组件
Auto-sklearn 和 dask 客户端。它将管理所有工作负载,寻找要评估的新配置,并通过 dask 客户端提交任务。由于它运行的是贝叶斯优化,应在其自身的 CPU 上执行。
dask 工作节点。它们将执行运行机器学习算法的实际工作,并且每个都需要自己的 CPU。
调度器。它管理 dask 客户端和不同 dask 工作节点之间的通信。由于客户端和所有工作节点都连接到调度器,它必须首先启动。这是一项轻量级工作,不需要自己的 CPU。
我们现在将按相反的顺序启动这三个组件:调度器、工作节点和客户端。另外,在实际设置中,调度器和工作节点应该从命令行启动,而不是像这里这样(为了有一个自包含的示例)通过 Python 文件中的 subprocess
模块启动。
导入语句¶
import multiprocessing
import subprocess
import time
import dask.distributed
import sklearn.datasets
import sklearn.metrics
from autosklearn.classification import AutoSklearnClassifier
from autosklearn.constants import MULTICLASS_CLASSIFICATION
tmp_folder = "/tmp/autosklearn_parallel_3_example_tmp"
worker_processes = []
0. 设置客户端与调度器通信¶
在此示例中,dask 调度器启动时没有指定显式地址和端口。相反,调度器会使用一个空闲端口,并将相关信息存储在一个文件中,我们为此文件提供了名称和位置。该文件名也提供给工作节点,以便它们找到连接到调度器的所有相关信息。
scheduler_file_name = "scheduler-file.json"
1. 启动调度器¶
使用以下 bash 命令启动调度器
dask-scheduler --scheduler-file scheduler-file.json --idle-timeout 10
我们现在将从 Python 中执行此 bash 命令,以便得到一个自包含的示例
def cli_start_scheduler(scheduler_file_name):
command = f"dask-scheduler --scheduler-file {scheduler_file_name} --idle-timeout 10"
proc = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
check=True,
)
while proc.returncode is None:
time.sleep(1)
if __name__ == "__main__":
process_python_worker = multiprocessing.Process(
target=cli_start_scheduler,
args=(scheduler_file_name,),
)
process_python_worker.start()
worker_processes.append(process_python_worker)
# Wait a second for the scheduler to become available
time.sleep(1)
2. 启动两个工作节点¶
使用以下 bash 命令启动调度器
DASK_DISTRIBUTED__WORKER__DAEMON=False \
dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 \
--scheduler-file scheduler-file.json
我们现在将从 Python 中执行此 bash 命令,以便得到一个自包含的示例。请注意,在这种情况下需要设置 DASK_DISTRIBUTED__WORKER__DAEMON=False
,因为 dask-worker 会创建一个新进程,默认情况下这与 Auto-sklearn 在工作节点内部创建新进程不兼容。我们通过传递 --memory-limit
禁用 dask 的内存管理,因为 Auto-sklearn 自己进行内存管理。
def cli_start_worker(scheduler_file_name):
command = (
"DASK_DISTRIBUTED__WORKER__DAEMON=False "
"dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 "
f"--scheduler-file {scheduler_file_name}"
)
proc = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True
)
while proc.returncode is None:
time.sleep(1)
if __name__ == "__main__":
for _ in range(2):
process_cli_worker = multiprocessing.Process(
target=cli_start_worker,
args=(scheduler_file_name,),
)
process_cli_worker.start()
worker_processes.append(process_cli_worker)
# Wait a second for workers to become available
time.sleep(1)
3. 在 Python 中创建客户端¶
最后,我们创建一个 dask 集群,它也通过调度器创建的文件中的信息连接到调度器。
client = dask.distributed.Client(scheduler_file=scheduler_file_name)
启动 Auto-sklearn¶
if __name__ == "__main__":
X, y = sklearn.datasets.load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
X, y, random_state=1
)
automl = AutoSklearnClassifier(
delete_tmp_folder_after_terminate=False,
time_left_for_this_task=30,
per_run_time_limit=10,
memory_limit=2048,
tmp_folder=tmp_folder,
seed=777,
# n_jobs is ignored internally as we pass a dask client.
n_jobs=1,
# Pass a dask client which connects to the previously constructed cluster.
dask_client=client,
)
automl.fit(X_train, y_train)
automl.fit_ensemble(
y_train,
task=MULTICLASS_CLASSIFICATION,
dataset_name="digits",
ensemble_kwargs={"ensemble_size": 20},
ensemble_nbest=50,
)
predictions = automl.predict(X_test)
print(automl.sprint_statistics())
print("Accuracy score", sklearn.metrics.accuracy_score(y_test, predictions))
auto-sklearn results:
Dataset name: 3f766bf6-38c4-11ed-8830-892d16569fbe
Metric: accuracy
Best validation score: 0.992908
Number of target algorithm runs: 10
Number of successful target algorithm runs: 9
Number of crashed target algorithm runs: 0
Number of target algorithms that exceeded the time limit: 1
Number of target algorithms that exceeded the memory limit: 0
Accuracy score 0.958041958041958
等待所有工作节点关闭¶
仅当工作节点是从此 Python 脚本中启动时,这才是必要的。在实际应用中,通常会直接从命令行启动它们。
if __name__ == "__main__":
process_python_worker.join()
for process in worker_processes:
process.join()
脚本总运行时间: ( 0 分 43.202 秒)