首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何改进异步锁机制?

如何改进异步锁机制?
EN

Stack Overflow用户
提问于 2021-12-30 08:23:07
回答 1查看 144关注 0票数 0

下面是我实现异步锁机制的代码,该机制应该在方法中阻止请求,如果具有相同名称的请求已经在执行中,并且尚未完成,但问题是,如果请求带有不同的名称,同时也会被阻塞,这是不理想的,理想的情况是,如果请求带有不同的请求名称,则不需要等待即可开始执行。

代码语言:javascript
复制
import asyncio
from contextlib import asynccontextmanager
 
@asynccontextmanager
async def get_lock(req_name_):
    locks = {}
    logger.info(f"Creating lock for stack {req_name_} if not created")
    if not locks.get(req_name_):
        logger.info("creating key for a lock")
        locks[req_name_] = asyncio.Lock()
    async with locks[req_name_]:
        yield
    if len(locks[req_name_]._waiters) == 0:
        del locks[req_name_]
        logger.info(f"lock released")
    logger.info(len(locks))

async def handle_lock_request(req_json_):
    logger.info(f"ocupying lock")
    req_name = req_json_.get('req_name')
    async with get_lock(req_name):
        logger.info(f"lock acquired by stack {req_name}")
        await _handle_request(req_json_)

async def _req_handler():
    tasks = []
    loop = asyncio.get_running_loop()
    logger.debug("Await receiver.recv_string")
    req = await receiver.recv_string()
    logger.debug(f"Request received {req}")
    req_json = json.loads(req)
    logger.debug("Await create_task")
    tasks.append(loop.create_task(handle_lock_request(req_json)))
    await asyncio.gather(*[task for task in tasks if not task.done()])
  
def _handle_request(req_json_):
    # ...
    # ...
    logger.info(f"Request finished with req name {req_name} for action patch stack")
EN

回答 1

Stack Overflow用户

发布于 2022-01-02 08:43:41

更新后,从代码中删除asyncio.gather()部分,只需调用create_task

代码语言:javascript
复制
import asyncio
from contextlib import asynccontextmanager

locks = {}
    @asynccontextmanager
    async def get_lock(req_name_):
        logger.info(f"Creating lock for stack {req_name_} if not created")
        if not locks.get(req_name_):
            logger.info("creating key for a lock")
            locks[req_name_] = asyncio.Lock()
        async with locks[req_name_]:
            yield
        if len(locks[req_name_]._waiters) == 0:
            del locks[req_name_]
            logger.info(f"lock released")
        logger.info(len(locks))


    async def handle_lock_request(req_json_):
        logger.info(f"ocupying lock")
        req_name = req_json_.get('req_name')
        async with get_lock(req_name):
            logger.info(f"lock acquired by stack {req_name}")
            await _handle_request(req_json_)
     
     async def _req_handler():
         tasks = []
         loop = asyncio.get_running_loop()
         logger.debug("Await receiver.recv_string")
          req = await receiver.recv_string()
          logger.debug(f"Request received {req}")
          req_json = json.loads(req)
          logger.debug("Await create_task")
          loop.create_task(handle_lock_request(req_json))
          
  
     def _handle_request(req_json_):
       ...
       ...
       logger.info(f"Request finished with req name {req_name} for action patch stack")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70529093

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档