首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >芹菜:不能让精灵进程生孩子。

芹菜:不能让精灵进程生孩子。
EN

Stack Overflow用户
提问于 2015-06-03 15:21:46
回答 4查看 17.5K关注 0票数 22

在Python (2.7)中,我尝试在芹菜任务(芹菜3.1.17)中创建进程(使用多处理),但它给出了错误:

代码语言:javascript
复制
daemonic processes are not allowed to have children

在谷歌上,我发现最新版本的台球修复了"bug“,但我有最新版本(3.3.0.20),而且错误仍在发生。我也尝试在我的芹菜任务中实现这个解决办法,但是它也会产生同样的错误。

有人知道怎么做吗?任何帮助都是感激的,帕特里克

编辑:代码片段

任务:

代码语言:javascript
复制
from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder

@shared_task
def embedder_update_task(embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.test()

人工测试函数(从这里开始):

代码语言:javascript
复制
def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = mp.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test(self):
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

我真正的职责是:

代码语言:javascript
复制
import mulitprocessing as mp

def test(self):
    self.init()
    for saveindex in range(self.start_index,self.start_index+self.nsaves):
        self.create_storage(saveindex)
        # process creation:
        procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)]
        for p in procs: p.start()
        for p in procs: p.join()
    print "End of task"

init函数定义了一个多处理数组和一个共享相同内存的对象,以便我的所有进程可以同时更新这个数组:

代码语言:javascript
复制
mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V

调用任务时生成的错误:

代码语言:javascript
复制
[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process.
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]     raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
   R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
   return self.run(*args, **kwargs)
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task
    embedder.test()
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test
    pool = MyPool(5)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
    w.start()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2015-06-04 14:01:23

billiardmultiprocessing是不同的库-- billiard是芹菜项目的multiprocessing分支。您将需要导入billiard并使用它而不是multiprocessing

然而,更好的答案可能是您应该重构您的代码,以便您产生更多的芹菜任务,而不是使用两种不同的方式分配您的工作。

你可以用芹菜画布做这件事

代码语言:javascript
复制
from celery import group

@app.task
def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)])

def test(self):
    my_group = group(work(randint(1, 5)) for x in range(5))
    result = my_group.apply_async()
    result.get()

我尝试制作一个使用画布原语而不是多处理的代码的工作版本。然而,由于你的例子是相当人工的,所以想出一些有意义的东西是不容易的。

更新:

下面是使用芹菜画布的真实代码的翻译:

tasks.py

代码语言:javascript
复制
@shared_task
run_training_method(saveindex, embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.training_method(saveindex)

models.py

代码语言:javascript
复制
from tasks import run_training_method
from celery import group

class Embedder(Model):

    def embedder_update_task(self):
        my_group = []

        for saveindex in range(self.start_index, self.start_index + self.nsaves):
            self.create_storage(saveindex)
            # Add to list
            my_group.extend([run_training_method.subtask((saveindex, self.id)) 
                         for i in range(self.nproc)])

        result = group(my_group).apply_async()
票数 13
EN

Stack Overflow用户

发布于 2019-02-28 02:44:08

我在django的芹菜任务中调用多处理方法时也遇到了类似的错误。我用台球代替了多处理

代码语言:javascript
复制
import billiard as multiprocessing

希望能帮上忙。

票数 15
EN

Stack Overflow用户

发布于 2020-03-09 18:00:13

如果您使用的子模块/库中已经包含了多个处理,那么设置工作人员的-P threads参数可能更有意义:

代码语言:javascript
复制
celery worker -P threads

https://github.com/celery/celery/issues/4525#issuecomment-566503932

更新:芹菜< v5.1.1中的命令行解析中存在一个错误,即使支持它,也不允许使用-P threads。它是用>= v5.1.1修复的。它从v4.4开始就得到了官方的支持。

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30624290

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档