首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >扭曲:等待子任务完成

扭曲:等待子任务完成
EN

Stack Overflow用户
提问于 2013-12-02 19:41:24
回答 1查看 2.7K关注 0票数 6

在我的代码中,我有两个假设的任务:一个从生成器获取urls并使用Twisted的Cooperator批量下载它们,另一个使用下载的源代码并异步地解析它。我试图将所有的提取和解析任务封装到一个延迟对象中,这个对象在下载所有页面和解析所有源时调用。

我想出了以下解决方案:

代码语言:javascript
复制
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

代码可以工作,但我感觉好像我忽略了一些显而易见的东西,或者不知道一个简单的扭曲模式,这会使事情变得更简单。是否有更好的方法来返回在所有获取和解析完成后调用的单个延迟?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-12-04 13:04:19

正如目前所写的,在我看来,这段代码的并行下载数量有限,但并行解析作业的数量却是无限的。这是故意的吗?我将假定为“否”,因为如果您的网络碰巧速度快,解析器的速度恰好慢,因为URL的数量接近无穷大,您的内存使用量也是如此:)。

因此,这里有一件事将具有有限的并行性,但是可以对下载执行顺序分析,而不是:

代码语言:javascript
复制
from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

这是因为parse (显然)返回一个Deferred,所以将它作为回调添加到getPage返回的回调会导致一个Deferred,在parse完成其业务之前不会调用coiterate添加的回调。

由于您询问的是惯用的Twisted代码,我还冒昧地对其进行了现代化(使用task.react而不是手动运行反应堆、内联表达式以使事情变得更简短等等)。

如果您确实希望拥有比并行获取更多的并行解析,那么这样的操作可能会更好:

代码语言:javascript
复制
from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

您可以看到,parseWhenReady返回了从acquire返回的Deferred,因此,并行抓取将在并行解析一开始就继续进行,因此即使解析器重载,也不会继续任意提取。但是,parallelParse谨慎地避免返回parserelease返回的Deferred,因为获取应该能够在这些过程中继续。

(请注意,由于您最初的示例无法运行,所以我根本没有测试过这两个示例。希望即使存在bug,意图也是明确的。)

票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20336476

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档