我有一个小的事件异步事件系统,如下所示:
from collections import defaultdict
from uuid import uuid4
class EventSystem:
def __init__(self):
self.handlers = defaultdict(dict)
def register_handler(self, event, callback, register_id=None):
register_id = register_id or uuid4()
self.handlers[event][register_id] = callback
return register_id
def unregister_handler(self, event, register_id):
del self.handlers[event][register_id]
def clear_handlers(self, event):
handler_register_ids = list(self.handlers[event].keys())
for register_id in handler_register_ids:
self.unregister_handler(event, register_id)
async def fire_event(self, event, data):
handlers = self.handlers[event]
for register_id, callback in handlers.items():
await callback(data)
return len(handlers)它当前强制处理程序是async函数。
我不能决定哪个更有pythonic风格,执行这个策略,并为同步函数使用async2sync包装器:
async def async2sync(func, *args, **kwargs):
return func(*args, **kwargs)或者使用inspect.isawaitable将fire_event更改为检查处理程序类型
async def fire_event(self, event, data):
handlers = self.handlers[event]
for register_id, callback in handlers.items():
ret = callback(data)
if inspect.isawaitable(ret):
await ret
return len(handlers)我不担心长时间运行或阻塞同步函数。
发布于 2020-07-29 05:52:55
既然第一种方法中的包装器将同步函数包装到异步中,那么它不应该被称为sync2async而不是async2sync吗
如果不关心长时间运行或阻塞同步函数,这两种方法都可以。两者都有好处和缺点。第一种方法更简约,更容易理解。第二种方法更聪明(它可以在你最意想不到的时候咬你),但也更容易使用,因为你可以只为handler编写任何一种函数,事情就会“正常工作”。如果您的API的用户不是您自己,他们可能会喜欢它。
TL;DR都不错;我个人可能会选择第二种。
https://stackoverflow.com/questions/63100765
复制相似问题