下面是我实现异步锁机制的代码,该机制应该在方法中阻止请求,如果具有相同名称的请求已经在执行中,并且尚未完成,但问题是,如果请求带有不同的名称,同时也会被阻塞,这是不理想的,理想的情况是,如果请求带有不同的请求名称,则不需要等待即可开始执行。
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_lock(req_name_):
locks = {}
logger.info(f"Creating lock for stack {req_name_} if not created")
if not locks.get(req_name_):
logger.info("creating key for a lock")
locks[req_name_] = asyncio.Lock()
async with locks[req_name_]:
yield
if len(locks[req_name_]._waiters) == 0:
del locks[req_name_]
logger.info(f"lock released")
logger.info(len(locks))
async def handle_lock_request(req_json_):
logger.info(f"ocupying lock")
req_name = req_json_.get('req_name')
async with get_lock(req_name):
logger.info(f"lock acquired by stack {req_name}")
await _handle_request(req_json_)
async def _req_handler():
tasks = []
loop = asyncio.get_running_loop()
logger.debug("Await receiver.recv_string")
req = await receiver.recv_string()
logger.debug(f"Request received {req}")
req_json = json.loads(req)
logger.debug("Await create_task")
tasks.append(loop.create_task(handle_lock_request(req_json)))
await asyncio.gather(*[task for task in tasks if not task.done()])
def _handle_request(req_json_):
# ...
# ...
logger.info(f"Request finished with req name {req_name} for action patch stack")发布于 2022-01-02 08:43:41
更新后,从代码中删除asyncio.gather()部分,只需调用create_task
import asyncio
from contextlib import asynccontextmanager
locks = {}
@asynccontextmanager
async def get_lock(req_name_):
logger.info(f"Creating lock for stack {req_name_} if not created")
if not locks.get(req_name_):
logger.info("creating key for a lock")
locks[req_name_] = asyncio.Lock()
async with locks[req_name_]:
yield
if len(locks[req_name_]._waiters) == 0:
del locks[req_name_]
logger.info(f"lock released")
logger.info(len(locks))
async def handle_lock_request(req_json_):
logger.info(f"ocupying lock")
req_name = req_json_.get('req_name')
async with get_lock(req_name):
logger.info(f"lock acquired by stack {req_name}")
await _handle_request(req_json_)
async def _req_handler():
tasks = []
loop = asyncio.get_running_loop()
logger.debug("Await receiver.recv_string")
req = await receiver.recv_string()
logger.debug(f"Request received {req}")
req_json = json.loads(req)
logger.debug("Await create_task")
loop.create_task(handle_lock_request(req_json))
def _handle_request(req_json_):
...
...
logger.info(f"Request finished with req name {req_name} for action patch stack")https://stackoverflow.com/questions/70529093
复制相似问题