我正在尝试调整一些参数,搜索空间非常大。到目前为止,我有5个维度,可能会增加到10个维度。问题是,如果我能弄清楚如何多处理它,我可以获得显着的加速,但我找不到任何好的方法来做到这一点。我正在使用hyperopt,但我不知道如何让它使用多个内核。这是我的代码,没有所有不相关的东西:
from numpy import random
from pandas import DataFrame
from hyperopt import fmin, tpe, hp, Trials
def calc_result(x):
huge_df = DataFrame(random.randn(100000, 5), columns=['A', 'B', 'C', 'D', 'E'])
total = 0
# Assume that I MUST iterate
for idx_and_row in huge_df.iterrows():
idx = idx_and_row[0]
row = idx_and_row[1]
# Assume there is no way to optimize here
curr_sum = row['A'] * x['adjustment_1'] + \
row['B'] * x['adjustment_2'] + \
row['C'] * x['adjustment_3'] + \
row['D'] * x['adjustment_4'] + \
row['E'] * x['adjustment_5']
total += curr_sum
# In real life I want the total as high as possible, but for the minimizer, it has to negative a negative value
total_as_neg = total * -1
print(total_as_neg)
return total_as_neg
space = {'adjustment_1': hp.quniform('adjustment_1', 0, 1, 0.001),
'adjustment_2': hp.quniform('adjustment_2', 0, 1, 0.001),
'adjustment_3': hp.quniform('adjustment_3', 0, 1, 0.001),
'adjustment_4': hp.quniform('adjustment_4', 0, 1, 0.001),
'adjustment_5': hp.quniform('adjustment_5', 0, 1, 0.001)}
trials = Trials()
best = fmin(fn = calc_result,
space = space,
algo = tpe.suggest,
max_evals = 20000,
trials = trials)到目前为止,我有4个核心,但我基本上可以得到我需要的数量。如何让hyperopt使用多个内核,或者是否有一个可以多进程的库?
发布于 2018-04-05 05:47:56
如果您有Mac或Linux (或Windows Linux子系统),则可以添加大约10行代码来与ray并行执行此操作。如果您通过latest wheels here安装ray,那么您只需进行最少的修改即可运行脚本,如下所示,以便使用HyperOpt进行并行/分布式网格搜索。在较高的级别上,它使用tpe.suggest运行fmin,并以并行方式在内部创建一个Trials对象。
from numpy import random
from pandas import DataFrame
from hyperopt import fmin, tpe, hp, Trials
def calc_result(x, reporter): # add a reporter param here
huge_df = DataFrame(random.randn(100000, 5), columns=['A', 'B', 'C', 'D', 'E'])
total = 0
# Assume that I MUST iterate
for idx_and_row in huge_df.iterrows():
idx = idx_and_row[0]
row = idx_and_row[1]
# Assume there is no way to optimize here
curr_sum = row['A'] * x['adjustment_1'] + \
row['B'] * x['adjustment_2'] + \
row['C'] * x['adjustment_3'] + \
row['D'] * x['adjustment_4'] + \
row['E'] * x['adjustment_5']
total += curr_sum
# In real life I want the total as high as possible, but for the minimizer, it has to negative a negative value
# total_as_neg = total * -1
# print(total_as_neg)
# Ray will negate this by itself to feed into HyperOpt
reporter(timesteps_total=1, episode_reward_mean=total)
return total_as_neg
space = {'adjustment_1': hp.quniform('adjustment_1', 0, 1, 0.001),
'adjustment_2': hp.quniform('adjustment_2', 0, 1, 0.001),
'adjustment_3': hp.quniform('adjustment_3', 0, 1, 0.001),
'adjustment_4': hp.quniform('adjustment_4', 0, 1, 0.001),
'adjustment_5': hp.quniform('adjustment_5', 0, 1, 0.001)}
import ray
import ray.tune as tune
from ray.tune.hpo_scheduler import HyperOptScheduler
ray.init()
tune.register_trainable("calc_result", calc_result)
tune.run_experiments({"experiment": {
"run": "calc_result",
"repeat": 20000,
"config": {"space": space}}}, scheduler=HyperOptScheduler())发布于 2018-03-20 03:52:39
您可以使用multiprocessing来运行任务,通过绕过Python的全局解释器锁,在可用的多个处理器中有效地并发运行这些任务。
要运行多处理任务,必须实例化一个Pool,并让该对象在可迭代对象上执行map函数。
函数map只是将一个函数应用于一个迭代器的每个元素上,比如一个列表,并返回另一个包含元素的列表。
from multiprocessing import Pool
def filter_gt_5(x):
for i in x:
if i > 5
return i
if __name__ == '__main__':
p = Pool(4)
a_list = [6, 5, 4, 3, 7, 8, 10, 9, 2]
#find a better way to split your list.
lists = p.map(filter_gt_5, [a_list[:3], a_list[3:6], a_list[6:])
#this will join the lists in one.
filtered_list = list(chain(*lists))发布于 2020-04-11 11:46:39
您所要求的内容可以通过使用SparkTrials()来实现,而不是使用hyperopt中的Trials()。
请参阅文档here。
SparkTrials接口:
可以通过3个参数配置SparkTrials,所有这些参数都是可选的:
parallelism
同时评估的最大试验数。更大的并行性允许对更多超参数设置进行横向扩展测试。默认为Spark executors的数量。
权衡: parallelism参数可以与fmin()中的max_evals参数一起设置。Hyperopt将分批测试parallelism大小的超参数的max_evals总设置。如果为parallelism = max_evals,则Hyperopt将执行随机搜索:它将选择所有超参数设置进行独立测试,然后并行评估它们。如果为parallelism = 1,则Hyperopt可以充分利用自适应算法,如迭代探索超参数空间的Parzen Estimators树算法:测试的每个新的超参数设置将基于先前的结果进行选择。在1和max_evals之间设置parallelism允许您在可伸缩性(更快地获得结果)和适应性(有时获得更好的模型)之间进行权衡。
限制:目前有128个并行度的硬性上限。SparkTrials还将检查集群的配置,看看SparkTrials允许多少并发任务;如果并行度超过这个最大值,Spark会将并行度减少到这个最大值。
代码片段:
from hyperopt import SparkTrials, fmin, hp, tpe, STATUS_OK
spark_trials = SparkTrials(parallelism= no. of cores)
best_hyperparameters = fmin(
fn=train,
space=search_space,
algo=algo,
max_evals=32)Another useful reference:
https://stackoverflow.com/questions/49370879
复制相似问题