我正在尝试在python中实现ocpp libary。有两个函数,在which循环中连续运行,用于日志记录的cp.start()和作为协议实习生心跳的cp.heartbeat。当我想将它们正常地实现到我的例程中时,while循环将阻塞事件循环,因此我希望它们是线程。但柳叶草似乎有问题。
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_3',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_3', ws)
def start_logging(loop):
asyncio.set_event_loop(loop)
loop.create_task(cp.start())
loop.run_forever()
loop = asyncio.get_event_loop()
t = threading.Thread(target=start_logging, args=(loop,))
t.start()
await asyncio.gather(cp.send_heartbeat())
if __name__ == '__main__':
asyncio.run(main())错误:
ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen
AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>
AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>即使我将线程设置为守护进程,心跳也会正常工作,但我不能再关闭程序了。最后的目标是让cp.start()和心跳在线程中运行,这样我就可以在另一个逻辑中控制电动汽车的充电过程。
发布于 2021-12-12 21:25:07
查看github上的代码基础,您想要调用的所有函数都是协同函数。它们可能包含无限循环,但它们中有await语句,这使它们将控制权返回给事件循环。因此,据我所知,没有必要为任何事情使用线程。摘自版本1.6的例子:
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws)
await asyncio.gather(cp.start(), cp.send_boot_notification())我想这会让你开始的。
编辑:
好的,上面的情况仍然存在。我回答了您的问题,但您真正需要的是了解这个API应该如何工作。我告诉您,他们的例子有点令人困惑,我认为您将无法阅读他们的文档。但是,我从代码中了解到的要点是,您需要对中央类ChargePoint进行子类分类,从示例中可以看出这一点,因为它们将子类命名为与它们的基本类相同。我会尽量让他们的例子更清楚。我希望我理解得对.
# simplified and commented version of the v1.6 example
import asyncio
import logging
import websockets
from ocpp.routing import on
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
from ocpp.v16.enums import Action, RegistrationStatus
logging.basicConfig(level=logging.INFO)
class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)
@on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
def on_some_message(*args, **kwargs):
pass # do something which probably got something to do with charging something
asyncio.create_task(self.some_coro()) # create async task from sync code
# add more decorated functions to implement your logic
async def some_coro(self):
pass # do something with I/O
async def send_boot_notification(self):
request = call.BootNotificationPayload(
charge_point_model="Optimus",
charge_point_vendor="The Mobility House"
)
response = await self.call(request)
if response.status == RegistrationStatus.accepted:
print("Connected to central system.")
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/
# this seems initializing, maybe not do it concurrently
await cp.send_boot_notification()
# this starts the infinite loop which receives and relays
# messages to their respective hooks
# (you get the concurrency you wanted out of threads by registering
# your own hooks (pieces of code)
await cp.start() # main() stays here until you somehow shut it down
if __name__ == '__main__':
asyncio.run(main())所以很明显,我不能测试这个,也不能保证这是他们想要的,但我希望它能有所帮助。
https://stackoverflow.com/questions/70319153
复制相似问题