首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法从.get()从multiprocessing.Queue

无法从.get()从multiprocessing.Queue
EN

Stack Overflow用户
提问于 2014-12-09 17:05:55
回答 1查看 782关注 0票数 3

我正在构建一个web应用程序,用于处理60000个(和不断增长的)大型文件,执行一些分析,并返回一个需要用户验证的“最佳猜测”。这些文件将按类别进行细化,以避免加载每个文件,但我仍然需要一次处理1000+文件的场景。

这些是大文件,每个文件需要8到9秒的时间来处理,在1000+文件的情况下,让用户在评审之间等待8秒或在处理文件之前等待2 hours+是不切实际的。

为了克服这一问题,我决定使用多重处理来生成几个工作人员,每个工作人员将从一个文件队列中选择、处理并插入到一个输出队列中。我有另一个方法,它基本上轮询输出队列中的项目,然后在其中一个可用时将其流到客户端。

这很好,直到队列任意停止返回项时,整个过程中的一部分。我们在我们的环境中使用带Django和uwsgi的gevent,我知道在gevent上下文中通过多进程创建子进程会在子进程中产生一个不想要的事件循环状态。在分叉之前,绿林就在孩子身上复制了。因此,我决定使用gipc来帮助处理子进程。

我的代码示例(我不能发布实际代码):

代码语言:javascript
复制
import multiprocessing
import gipc
from item import Item

MAX_WORKERS = 10

class ProcessFiles(object):

    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.file_count = 0

    def query_for_results(self):
        # Query db for records of files to process.
        # Return results and set self.file_count equal to
        # the number of records returned.
        pass

    # The subprocess.
    def worker(self):
        # Chisel away at the input queue until no items remain.
        while True:
            if self.no_items_remain():
                return

            item = self.input_queue.get(item)
            item.process()
            self.output_queue.put(item)

    def start(self):
        # Get results and store in Queue for processing
        results = self.query_for_results()
        for result in results:
             item = Item(result)
             self.input_queue.put(item)

        # Spawn workers to process files.
        for _ in xrange(MAX_WORKERS):
            process = gipc.start_process(self.worker)

        # Poll for items to send to client.
        return self.get_processed_items()

    def get_processed_items(self):

        # Wait for the output queue to hold at least 1 item.
        # When an item becomes available, yield it to client.
        count = 0
        while count != self.file_count:
            #item = self._get_processed_item()
            # Debugging:
            try:
                item = self.output_queue.get(timeout=1)
            except:
                print '\tError fetching processed item. Retrying...'
                continue

            if item:
                print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
                count += 1
                yield item
        yield 'end'

我期望输出在处理和生成项之后显示队列的当前计数:

代码语言:javascript
复制
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1

但是,脚本只能在失败之前生成几个项目:

代码语言:javascript
复制
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    ...

我的问题是:到底发生了什么?为什么我不能从队列中get?我如何返回我期望的项目,并避免这种情况?

EN

回答 1

Stack Overflow用户

发布于 2014-12-09 21:09:13

当您无法获得一个项目时,抛出的实际异常是什么?你在盲目地捕捉所有可能抛出的异常。此外,为什么不直接使用get而没有超时呢?你立刻再试一次,什么也不做。可能只是调用一个块,直到一个项目准备好为止。

关于这个问题,我认为正在发生的事情是,gipc关闭了与您的队列相关的管道,从而中断了队列。我预计会抛出一个OSError,而不是queue.Empty。有关详细信息,请参阅此错误报告

作为另一种选择,您可以使用流程池,在发生任何gevent事件之前启动池(这意味着您不必担心事件循环问题)。使用imap_unordered将作业提交到池中,您应该会很好。

您的开始函数看起来如下所示:

代码语言:javascript
复制
def start(self):
    results = self.query_for_results()
    return self.pool.imap_unordered(self.worker, results, 
        chunksize=len(results) // self.num_procs_in_pool)

@staticmethod
def worker(item):
    item.process()
    return item
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27384647

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档