现在我有了一个包,它的类如下所示:
class Opc(object):
def __init__(self):
client = Client("server_url")
client.connect()
opc = Opc()现在我想使用opcua-异步库,所以我需要使用一个异步函数来连接到服务器,但是我不能等待init提供它。在导入包之后,如何使用异步连接函数连接到服务器?
发布于 2022-01-08 11:23:24
有多种解决方案:
使用类方法的
class Opc:
@classmethod
async def create(cls):
self = Opc()
self.client = Client("server_url")
await self.client.connect()
opc = await Opc.create()使用异步上下文管理器的
class Opc:
def __init__(self) -> None:
self.client = Client("server_url")
async def __aenter__(self):
await self.client.connect()
async def __aexit__(self, exc_type, exc, tb):
await self.client.disconnect()
opc = Opc()
async with opc:
...发布于 2022-06-11 08:17:33
现在我想使用opcua-异步库,所以我需要使用异步函数连接到服务器,
opcua-异步有同步包装器,所以您只需保留完全相同的代码:https://github.com/FreeOpcUa/opcua-asyncio/blob/master/asyncua/sync.py。
https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/sync/client-minimal.py
现在,如果您想要移植到异步代码,可以像在https://stackoverflow.com/a/70631847/2033030中那样做。
或添加连接方法。
class Opc:
def __init__(self):
self.client = Client("server_url")
async def connect(self):
await self.client.connect()
opc = Opc()
await opc.connect()https://stackoverflow.com/questions/70627170
复制相似问题