首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >池连接的Asyncio & Asyncpg & aiohttp单圈事件

池连接的Asyncio & Asyncpg & aiohttp单圈事件
EN

Stack Overflow用户
提问于 2022-02-20 19:33:11
回答 1查看 593关注 0票数 2

我一直在努力想办法解决我的问题,我希望我来到了正确的地方。

我有一个django rest框架API,它连接到postgresql,并在我自己的API上运行机器人来完成一些事情。这是我的代码:

代码语言:javascript
复制
def get_or_create_eventloop():
    """Get the eventLoop only one time (create it if does not exist)"""
    try:
        return asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return asyncio.get_event_loop()

My类,它使用异步DB连接/创建一个池:

代码语言:javascript
复制
Class DB():
    def __init__(self,loop):
        self.pool = loop.run_until_complete(self.connect_to_db())

    def connect_to_db():
        return await asyncpg.create_pool(host="host",
                                     database="database",
                                     user="username",
                                     password="pwd",
                                     port=5432)

我的API类:

代码语言:javascript
复制
Class Api(APIView):
    #create a loop event since its not the main thread
    loop = get_or_create_eventloop()
    nest_asyncio.apply() #to avoid the <loop already running>  problem

    #init my DB pool directly so I wont have to connect each time
    db_object = DB(loop)
    
    
    def post(self,request):
        ... #I want to be able to call "do_something()"
    
    async def do_something(self):
        ...

我让我的机器人运行并通过aiohttp向django api发送post/get请求。

我面临的问题是:

  • 如何在API中实现post函数,以便他能够在每次创建新线程时处理多个请求,因此每次创建一个新的事件循环,都会将异步it中创建的池链接到当前的事件循环,即不能创建新的事件循环,我需要继续处理最初创建的事件,以便以后(通过pool.acquire等)访问数据库

到目前为止,我一直在努力,但没有成功:

代码语言:javascript
复制
def post(self,request):
    self.loop.run_until_complete(self.do_something())

这就产生了:

代码语言:javascript
复制
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

据我所知,我们正在尝试从另一个线程调用事件循环。

我还尝试使用DJANGO的asyng_to_sync:

代码语言:javascript
复制
@async_to_sync
async def post(..):
      resp = await self.do_something()

这里的问题是,当执行async_to_sync时,它会为线程创建一个新的事件循环,因此我将无法访问我的DB池编辑: cf https://github.com/MagicStack/asyncpg/issues/293 (我很想实现类似的东西,但找不到方法)

下面是我的一个机器人(基本功能)的一个快速示例:

代码语言:javascript
复制
import asyncio
from aiohttp import ClientSession

    async def send_req(url, session):
        async with session.post(url=url) as resp:
            return await resp.text()

async def run(r):
    url = "http://localhost:8080/"
    tasks = []

    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.asyncio.create_task(send_req(url, session))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        print(responses)


if __name__ == '__main__':
    asyncio.run(main())

提前谢谢你

EN

回答 1

Stack Overflow用户

发布于 2022-02-21 13:06:01

经过几天的寻找答案,我找到了我的问题的解决方案。我只使用了包psycopg3,而不是异步can (现在我可以将@async_to_sync放到post函数中,并且它可以工作)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71198036

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档