我想在multiprocessing中做一些事情,我想推迟得到结果,如下所示:
from multiprocessing import Pool
from twisted.internet import defer
import time
def f(x):
time.sleep(0.5)
print(x)
return x*x
pool = Pool(processes=4) # start 4 worker processes
def get_result(i):
res = pool.apply_async(f, (i, )) # do work in process pool
return defer.Deferred(res.get()) # now, I want to make process do something else, so it should not be blocked
def main():
from twisted.internet import reactor
@defer.inlineCallbacks
def _run():
for i in range(4):
yield get_result(i)
reactor.stop()
reactor.callLater(1, _run)
reactor.run()
if __name__ == '__main__':
main()发布于 2017-08-30 20:18:46
Pool.apply_async()有一个callback arg,您可以利用它在Deferred中启动回调链。捕获(这是绝对关键的记住)是池回调函数将在另一个线程中执行!因此,在将结果应用到Deferred时必须调用reactor.callFromThread,以便回调链与reactor在同一个线程中发生。如果不这样做,就会在反应堆没有上下文的不同线程中执行回调。下面是一个稍微修改过的示例:
from functools import partial
from multiprocessing import Pool
import threading
import time
from twisted.internet import defer, reactor
def f(x):
time.sleep(5)
return x*x
def get_result(pool, i):
deferred = defer.Deferred() # create a Deferred that will eventually provide a result
_set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback
pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn
return deferred
def set_result(result, deferred):
"""
Set the result in the deferred
"""
print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result))
reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread
def display_result(result):
"""
Just display the result
"""
print('Thread ID: %d, Display %d' % (threading.get_ident(), result))
def kill_reactor(null):
print('Thread ID: %d, Stopping reactor' % threading.get_ident())
reactor.stop()
def main():
print('Thread ID: %d, Main' % threading.get_ident())
pool = Pool(processes=4)
d = get_result(pool, 3)
d.addCallback(display_result)
d.addCallback(kill_reactor)
reactor.run()
main()
#---------- OUTPUT ----------#
# Thread ID: 803872, Main
# Thread ID: 533632, Setting result 9
# Thread ID: 803872, Display 9
# Thread ID: 803872, Stopping reactor我打印了线程id,这样您就可以看到在另一个线程中确实调用了set_result() (进程?)而不是在主螺纹(反应堆螺纹)。在本例中,Ommiting reactor.callFromThread(deferred.callback, result)将导致在reactor.stop()无法工作的线程中执行回调,Twisted在任何地方都会抛出呕吐物(跟踪)!考虑在reactor.spawnProcess使用,因为这将限制您(或我)在其他方面可能犯的错误。和往常一样,如果你可以在一个线程中做任何你要做的事情,而忽略了multiprocessing或threading,我建议你去做。
https://stackoverflow.com/questions/45930518
复制相似问题