首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使扭曲的延迟得到功能效果?

如何使扭曲的延迟得到功能效果?
EN

Stack Overflow用户
提问于 2017-08-29 04:05:37
回答 1查看 953关注 0票数 1

我想在multiprocessing中做一些事情,我想推迟得到结果,如下所示:

代码语言:javascript
复制
from multiprocessing import Pool
from twisted.internet import defer
import time

def f(x):
    time.sleep(0.5)
    print(x)
    return x*x

pool = Pool(processes=4)              # start 4 worker processes
def get_result(i):
    res = pool.apply_async(f, (i, )) # do work in process pool
    return defer.Deferred(res.get()) # now, I want to make process do something else, so it should not be blocked

def main():
    from twisted.internet import reactor

    @defer.inlineCallbacks
    def _run():
        for i in range(4):
            yield get_result(i)

        reactor.stop()
    reactor.callLater(1, _run)
    reactor.run()


if __name__ == '__main__':
    main()
EN

回答 1

Stack Overflow用户

发布于 2017-08-30 20:18:46

Pool.apply_async()有一个callback arg,您可以利用它在Deferred中启动回调链。捕获(这是绝对关键的记住)是池回调函数将在另一个线程中执行!因此,在将结果应用到Deferred时必须调用reactor.callFromThread,以便回调链与reactor在同一个线程中发生。如果不这样做,就会在反应堆没有上下文的不同线程中执行回调。下面是一个稍微修改过的示例:

代码语言:javascript
复制
from functools import partial
from multiprocessing import Pool
import threading
import time

from twisted.internet import defer, reactor


def f(x):
    time.sleep(5)
    return x*x

def get_result(pool, i):
    deferred = defer.Deferred()   # create a Deferred that will eventually provide a result
    _set_result = partial(set_result, deferred=deferred)  # pass the Deferred to the apply_async callback
    pool.apply_async(f, args=(i,), callback=_set_result)  # execute in a separate process, supply callback fn
    return deferred

def set_result(result, deferred):
    """
    Set the result in the deferred
    """
    print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result))
    reactor.callFromThread(deferred.callback, result)   # execute the Deferred callback chain from the reactor thread

def display_result(result):
    """
    Just display the result
    """
    print('Thread ID: %d, Display %d' % (threading.get_ident(), result))

def kill_reactor(null):
    print('Thread ID: %d, Stopping reactor' % threading.get_ident())
    reactor.stop()

def main():
    print('Thread ID: %d, Main' % threading.get_ident())
    pool = Pool(processes=4)
    d = get_result(pool, 3)
    d.addCallback(display_result)
    d.addCallback(kill_reactor)

    reactor.run()

main()


#---------- OUTPUT ----------#
# Thread ID: 803872, Main
# Thread ID: 533632, Setting result 9
# Thread ID: 803872, Display 9
# Thread ID: 803872, Stopping reactor

我打印了线程id,这样您就可以看到在另一个线程中确实调用了set_result() (进程?)而不是在主螺纹(反应堆螺纹)。在本例中,Ommiting reactor.callFromThread(deferred.callback, result)将导致在reactor.stop()无法工作的线程中执行回调,Twisted在任何地方都会抛出呕吐物(跟踪)!考虑在reactor.spawnProcess使用,因为这将限制您(或我)在其他方面可能犯的错误。和往常一样,如果你可以在一个线程中做任何你要做的事情,而忽略了multiprocessingthreading,我建议你去做。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45930518

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档