首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >芹菜多处理并行分布式任务

芹菜多处理并行分布式任务
EN

Stack Overflow用户
提问于 2014-05-28 15:53:27
回答 4查看 50.3K关注 0票数 91

我有一个CPU密集的芹菜任务。我希望在许多think).实例中使用所有的处理能力(核心)来使这个工作完成得更快,(一个带有多处理的芹菜并行分布式任务- I )。

术语、线程、分布式计算、分布式并行处理都是我想要更好理解的术语。

示例任务:

代码语言:javascript
复制
  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

使用上面的代码(如果可能的话)(如果可能的话),人们将如何使用芹菜来分配这个任务,允许使用云中所有可用机器的所有计算CPU能力来分割这个任务?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2014-06-03 15:43:50

你的目标是:

  1. 将您的工作分配给许多机器(分布式计算/分布式并行处理)
  2. 在所有CPU(多处理/线程处理)上分配给定机器上的工作

芹菜可以很容易地为你做这两件事。首先要理解的是,每个芹菜工人都是默认配置,可以在系统上运行尽可能多的任务:

并发性是用于并发处理任务的预叉工作进程的数量,当所有这些都忙于工作时,新任务必须等待其中一个任务完成后才能处理。 默认并发号是计算机(包括核心)上的CPU数,可以使用-c选项指定自定义数字。没有推荐的值,因为最优的数量取决于许多因素,但是如果您的任务主要是I/O绑定的,那么您可以尝试增加它,实验已经表明,增加CPU数量的两倍以上是很少有效的,而且可能会降低性能。

这意味着每个单独的任务不需要担心使用多进程/线程来使用多个CPU/核心。相反,芹菜将同时运行足够的任务来使用每个可用的CPU。

这样,下一步就是创建一个处理list_of_millions_of_ids的某些子集的任务。这里有几个选项-一个是让每个任务处理一个ID,因此运行N个任务,在其中运行N == len(list_of_millions_of_ids)。这将保证工作在所有任务中平均分配,因为永远不会出现这样的情况:一个工作人员完成得很早,只是在等待;如果它需要工作,它可以从队列中提取一个id。您可以使用芹菜group (正如John提到的那样)这样做。

tasks.py:

代码语言:javascript
复制
@app.task
def process_ids(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

并执行以下任务:

代码语言:javascript
复制
from celery import group
from tasks import process_id

jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

另一种选择是将列表分解为较小的部分,并将其分发给员工。这种方法有浪费某些周期的风险,因为您可能最终会让一些工作人员在等着,而其他人还在工作。然而,芹菜文件说明认为这种担心往往是没有根据的:

有些人可能担心分块任务会导致并行性降低,但对于繁忙的集群和实践中,这很少是正确的,因为您正在避免消息传递的开销,这可能会大大提高性能。

因此,您可能会发现,由于减少了消息传递开销,分组列表并将块分发给每个任务执行得更好。通过计算每个id,将其存储在一个列表中,然后在完成后将整个列表添加到DB中,您可能还可以以这种方式减轻数据库的负担,而不是一次只执行一个id。分块的方法看起来会像这样

tasks.py:

代码语言:javascript
复制
@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

并开始执行以下任务:

代码语言:javascript
复制
from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

你可以试一试什么块状的大小给你最好的结果。你想要找到一个甜蜜的地方,在那里你可以减少消息开销,同时保持足够小的大小,这样你就不会比其他员工更快地完成他们的任务,然后无所事事地等待。

票数 141
EN

Stack Overflow用户

发布于 2014-06-06 02:10:02

在分销的世界里,最重要的是你应该记住一件事:

过早的优化是万恶之源。D. Knuth

我知道这听起来很明显,但在分发双重检查之前,您使用的是最佳算法(如果存在.)。话虽如此,优化分配是三件事之间的平衡:

  1. 从持久性介质写入/读取数据,
  2. 将数据从介质A移动到介质B,
  3. 处理数据,

计算机是制造的,所以你越接近你的处理单元(3),更快和更有效率(1)和(2)将是。经典集群的顺序是:网络硬盘驱动器、本地硬盘驱动器、RAM、内部处理单元领土.如今处理器变得越来越复杂,被认为是独立硬件处理单元(通常称为核)的集合,这些核心通过线程(2)处理数据(3)。假设您的核心速度如此之快,当您用一个线程发送数据时,您使用的是计算机功率的50%,如果内核有2个线程,那么您将使用100%。每个内核有两个线程称为超线程,您的操作系统将看到每个超线程内核有两个CPU。

在处理器中管理线程通常称为多线程。从操作系统管理CPU通常称为多处理。管理集群中的并发任务通常称为并行编程。在集群中管理依赖任务通常称为分布式编程。

,那么瓶颈在哪里?

  • 在(1):尝试持久化和流从上层(一个更接近你的处理单元,例如,如果网络硬盘驱动器是缓慢首先保存在本地硬盘驱动器)
  • 在(2):这是最常见的,尽量避免通信包不需要的分配或压缩“动态”的数据包(例如,如果HD是缓慢的,只保存一个“批计算”消息,并将中间结果保存在RAM中)。
  • (3):你完蛋了!你正在使用你可支配的所有处理能力。

芹菜怎么样?

Celery是用于分布式编程的消息传递框架,它将使用代理模块进行通信(2)和后端模块用于持久性(1),这意味着您可以通过更改配置来避免网络上和网络上的大多数瓶颈(如果可能的话)。首先,分析您的代码,以便在一台计算机上实现最佳性能。然后使用默认配置在集群中使用芹菜,并设置CELERY_RESULT_PERSISTENT=True

代码语言:javascript
复制
from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

在执行过程中,打开您最喜欢的监控工具,我使用默认的rabbitMQ和花卉作为芹菜和顶部的cpus,您的结果将保存在您的后端。网络瓶颈的一个例子是任务队列增长过大以至于延迟执行,您可以继续更改模块或芹菜配置,如果不是瓶颈在其他地方。

票数 13
EN

Stack Overflow用户

发布于 2014-05-30 07:48:48

为什么不使用group芹菜任务呢?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

基本上,您应该将ids划分为块(或范围),并将它们分配给group中的一组任务。

为了获得更复杂的结果,比如聚合特定芹菜任务的结果,我成功地将chord任务用于类似的目的:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

settings.CELERYD_CONCURRENCY增加到一个合理的、你负担得起的数字,然后那些芹菜工人会一直在一个小组或一个和弦中执行你的任务,直到完成为止。

注意:由于kombu中的一个bug,过去在大量任务中重用工作人员遇到了麻烦,我不知道现在是否已经修复了。也许是,但如果不是,减少CELERYD_MAX_TASKS_PER_CHILD。

基于我运行的简化和修改代码的示例:

代码语言:javascript
复制
@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarize获取所有single_batch_processor任务的结果。每个任务都运行在任何芹菜工人身上,kombu会协调这一点。

现在我明白了:single_batch_processorsummarize也必须是芹菜任务,而不是常规函数--否则它当然不会被并行化(如果不是芹菜任务,我甚至不确定chord构造函数会接受它)。

票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23916413

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档