我有一个REST包装器,它应该在交互式Python会话中运行。HTTP请求是通过自动后台线程(使用API包装器)和最终用户通过交互会话手动发出的。我正在尝试将所有的HTTP请求管理从前一个新的每个请求线程方法迁移到异步,但是由于我无法在主线程中运行异步循环(它必须是免费的,以便可以进行即席Python命令/请求),所以我编写了以下代码来在后台线程中运行它:
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
def start_thread_loop(pool=None):
"""Starts thread with running loop, bounding the loop to the thread"""
def init_loop(loop):
asyncio.set_event_loop(loop) # bound loop to thread
loop.run_forever()
_pool = ThreadPoolExecutor() if pool is None else pool
loop = asyncio.new_event_loop()
future = _pool.submit(init_loop, loop)
return future, loop
def send_to_loop(coro, loop):
"""Wraps couroutine in Task object and sends it to given loop"""
return asyncio.run_coroutine_threadsafe(coro, loop=loop)实际的API包装器如下所示:
class Foo:
def __init__(self):
_, self.loop = start_thread_loop()
self.session = aiohttp.ClientSession(loop=self.loop)
self.loop.set_debug(True)
def send_url(self, url):
async def _request(url):
print('sending request')
async with self.session.get(url) as resp:
print(resp.status)
return send_to_loop(_request(url), self.loop)但是,aiohttp强烈建议在初始化asyncio引发RuntimeError之前,不要在coroutine之外设置ClientSession并打开asyncio调试模式。因此,我尝试使用asycio.Queue制作一个稍微不同的版本,以避免在coroutine中生成ClientSession:
class Bar:
def __init__(self):
_, self.loop = start_thread_loop()
self.q = asyncio.Queue(loop=self.loop)
self.status = send_to_loop(self.main(), loop=self.loop)
async def main(self):
async with aiohttp.ClientSession(loop=self.loop) as session:
while True:
url = await self.q.get()
print('sending request')
asyncio.ensure_future(self._process_url(url, session), loop=self.loop)
def send_url(self, url):
send_to_loop(self.q.put(url), loop=self.loop)
@staticmethod
async def _process_url(url, session):
async with session.get(url) as resp:
print(resp.status)然而,这种方法更加复杂/冗长,我并不真正理解它是否真的有必要。
问题:
ClientSession是个问题?发布于 2017-07-18 18:17:06
为什么在协同线之外启动ClientSession是个问题?
这就是构建aiohttp的方式,理论上,应该可以在循环ie之外初始化某种类型的客户端会话。但这并不是aiohttp的建造方式。在提出这一警告的问题中,这是因为a)很难测试b)它容易出错
排队方式更好/更安全吗?如果是,为什么?
我不明白你想达到什么目的,所以我不知道如何回答。可能您遇到的问题是,您试图在构造函数(也就是构造函数)中初始化一个ClientSession。另一类的__init__。在这种情况下,您应该通过创建一个辅助方法来解决这个问题,该方法是完成类初始化的协同线。在使用异步代码时,这是已知的模式。
在我的方法中,在后台线程中启动循环有问题吗?
完全没问题。
https://stackoverflow.com/questions/43108418
复制相似问题