我有一个CPU密集的芹菜任务。我希望在许多think).实例中使用所有的处理能力(核心)来使这个工作完成得更快,(一个带有多处理的芹菜并行分布式任务- I )。
术语、线程、分布式计算、分布式并行处理都是我想要更好理解的术语。
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()使用上面的代码(如果可能的话)(如果可能的话),人们将如何使用芹菜来分配这个任务,允许使用云中所有可用机器的所有计算CPU能力来分割这个任务?
发布于 2014-06-03 15:43:50
你的目标是:
芹菜可以很容易地为你做这两件事。首先要理解的是,每个芹菜工人都是默认配置,可以在系统上运行尽可能多的任务:
并发性是用于并发处理任务的预叉工作进程的数量,当所有这些都忙于工作时,新任务必须等待其中一个任务完成后才能处理。 默认并发号是计算机(包括核心)上的CPU数,可以使用-c选项指定自定义数字。没有推荐的值,因为最优的数量取决于许多因素,但是如果您的任务主要是I/O绑定的,那么您可以尝试增加它,实验已经表明,增加CPU数量的两倍以上是很少有效的,而且可能会降低性能。
这意味着每个单独的任务不需要担心使用多进程/线程来使用多个CPU/核心。相反,芹菜将同时运行足够的任务来使用每个可用的CPU。
这样,下一步就是创建一个处理list_of_millions_of_ids的某些子集的任务。这里有几个选项-一个是让每个任务处理一个ID,因此运行N个任务,在其中运行N == len(list_of_millions_of_ids)。这将保证工作在所有任务中平均分配,因为永远不会出现这样的情况:一个工作人员完成得很早,只是在等待;如果它需要工作,它可以从队列中提取一个id。您可以使用芹菜group (正如John提到的那样)这样做。
tasks.py:
@app.task
def process_ids(item):
id = item #long complicated equation here
database.objects(newid=id).save()并执行以下任务:
from celery import group
from tasks import process_id
jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()另一种选择是将列表分解为较小的部分,并将其分发给员工。这种方法有浪费某些周期的风险,因为您可能最终会让一些工作人员在等着,而其他人还在工作。然而,芹菜文件说明认为这种担心往往是没有根据的:
有些人可能担心分块任务会导致并行性降低,但对于繁忙的集群和实践中,这很少是正确的,因为您正在避免消息传递的开销,这可能会大大提高性能。
因此,您可能会发现,由于减少了消息传递开销,分组列表并将块分发给每个任务执行得更好。通过计算每个id,将其存储在一个列表中,然后在完成后将整个列表添加到DB中,您可能还可以以这种方式减轻数据库的负担,而不是一次只执行一个id。分块的方法看起来会像这样
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.并开始执行以下任务:
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()你可以试一试什么块状的大小给你最好的结果。你想要找到一个甜蜜的地方,在那里你可以减少消息开销,同时保持足够小的大小,这样你就不会比其他员工更快地完成他们的任务,然后无所事事地等待。
发布于 2014-06-06 02:10:02
在分销的世界里,最重要的是你应该记住一件事:
过早的优化是万恶之源。D. Knuth
我知道这听起来很明显,但在分发双重检查之前,您使用的是最佳算法(如果存在.)。话虽如此,优化分配是三件事之间的平衡:
计算机是制造的,所以你越接近你的处理单元(3),更快和更有效率(1)和(2)将是。经典集群的顺序是:网络硬盘驱动器、本地硬盘驱动器、RAM、内部处理单元领土.如今处理器变得越来越复杂,被认为是独立硬件处理单元(通常称为核)的集合,这些核心通过线程(2)处理数据(3)。假设您的核心速度如此之快,当您用一个线程发送数据时,您使用的是计算机功率的50%,如果内核有2个线程,那么您将使用100%。每个内核有两个线程称为超线程,您的操作系统将看到每个超线程内核有两个CPU。
在处理器中管理线程通常称为多线程。从操作系统管理CPU通常称为多处理。管理集群中的并发任务通常称为并行编程。在集群中管理依赖任务通常称为分布式编程。
,那么瓶颈在哪里?
芹菜怎么样?
Celery是用于分布式编程的消息传递框架,它将使用代理模块进行通信(2)和后端模块用于持久性(1),这意味着您可以通过更改配置来避免网络上和网络上的大多数瓶颈(如果可能的话)。首先,分析您的代码,以便在一台计算机上实现最佳性能。然后使用默认配置在集群中使用芹菜,并设置CELERY_RESULT_PERSISTENT=True:
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost')
@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
#code that does stuff
return result在执行过程中,打开您最喜欢的监控工具,我使用默认的rabbitMQ和花卉作为芹菜和顶部的cpus,您的结果将保存在您的后端。网络瓶颈的一个例子是任务队列增长过大以至于延迟执行,您可以继续更改模块或芹菜配置,如果不是瓶颈在其他地方。
发布于 2014-05-30 07:48:48
为什么不使用group芹菜任务呢?
http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
基本上,您应该将ids划分为块(或范围),并将它们分配给group中的一组任务。
为了获得更复杂的结果,比如聚合特定芹菜任务的结果,我成功地将chord任务用于类似的目的:
http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
将settings.CELERYD_CONCURRENCY增加到一个合理的、你负担得起的数字,然后那些芹菜工人会一直在一个小组或一个和弦中执行你的任务,直到完成为止。
注意:由于kombu中的一个bug,过去在大量任务中重用工作人员遇到了麻烦,我不知道现在是否已经修复了。也许是,但如果不是,减少CELERYD_MAX_TASKS_PER_CHILD。
基于我运行的简化和修改代码的示例:
@app.task
def do_matches():
match_data = ...
result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())summarize获取所有single_batch_processor任务的结果。每个任务都运行在任何芹菜工人身上,kombu会协调这一点。
现在我明白了:single_batch_processor和summarize也必须是芹菜任务,而不是常规函数--否则它当然不会被并行化(如果不是芹菜任务,我甚至不确定chord构造函数会接受它)。
https://stackoverflow.com/questions/23916413
复制相似问题