我要替换现有程序的一部分。最初的程序使用线程。有一个特殊的类,它从threading.Thread继承我需要替换的功能,但是我需要保持接口不变。
我正在集成的功能被打包在一个经常使用asyncio的库中。
我要替换的类的最初调用如下所示:
network = Network()
network.start()
network.fetch_something() # crashes!
network.stop()我的替换类也继承了threading.Thread,并且可以通过客户端库从run方法连接到后端:
class Network(threading.Thread):
def __init__(self):
self._loop = asyncio.new_event_loop()
self._client = Client() # this is the library
def run(self):
self._loop.run_until_complete(self.__connect()) # works dandy, implementation not shown
self._loop.run_forever()
def fetch_something(self):
return self._loop.run_until_complete(self._client.fetch_something())运行此代码会引发异常:
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one我有点明白这是怎么回事。在run方法中,由于运行事件循环的同一个线程是调用方,所以事情得到了解决。在另一种情况下,另一个线程是调用者,因此出现了问题。正如您可能已经注意到的,我希望通过使用相同的事件循环来解决问题。唉,这不管用。
我真的很想让界面保持原样,否则的话,我将在今年剩下的时间里进行重构。相对来说,我可以轻松地将参数传递给Network类的构造函数。我尝试传递在主线程上创建的事件循环,但结果是一样的。
(请注意,这是作者遇到的相反的问题:Call coroutine within Thread)
发布于 2018-06-29 06:25:28
当调度来自不同线程的协同线时,必须使用asyncio.run_coroutine_threadsafe。例如:
def fetch_something(self):
future = asyncio.run_coroutine_threadsafe(
self._client.fetch_something(), loop)
return future.result()run_coroutine_threadsafe以线程安全的方式调度带有事件循环的协同线,并返回concurrent.futures.Future。您可以使用返回的未来简单地等待上面所示的结果,但也可以将其传递给其他函数、轮询结果是否到达或实现超时。
在组合线程和异步时,请记住确保所有来自其他线程的事件循环接口(甚至调用像loop.stop这样简单的东西来实现Network.stop)都是使用loop.call_soon_threadsafe和asyncio.run_coroutine_threadsafe完成的。
https://stackoverflow.com/questions/51090555
复制相似问题