System: Windows 10 Enterprise
bluetoothctl -v)描述
我正在为Windows编写一个桌面应用程序来管理中间传感器。该应用程序是在PyQT5中开发的,建立了一个GUI与传感器连接的图形用户界面,并由uart进行发送/接收。因此,我想在Pyqt中的GUI中集成使用异步的黯淡库。
我可以毫无问题地扫描设备。但是当我想连接到传感器并运行无限循环读写字节时,比如https://github.com/hbldh/bleak/blob/develop/examples/uart_service.py
GUI被冻结,没有响应。为了在后台运行这个过程,我以不同的方式使用了异步例程,但没有成功。
任何人都有在Pyqt5 5/Pyside2 2中集成“荒凉”的经验。
我所做的
我有一个带有UI的main.py文件和两个简单的扫描和连接模块。我可以连接它并接收数据,但几秒钟后GUI没有响应。为此,我已经看到了一些类似qtrio的库,但我想知道Pyqt和bleak是否有很好的集成。
多谢你们的支持。
这里是UI中的初学者代码
import asyncio
import qasync
from PyQt5 import QtWidgets, uic
from PyQt5.QtWidgets import QTextEdit, QPushButton, QListWidgetItem, QListWidget, QApplication
from PyQt5 import QtBluetooth, QtCore
from PyQt5.QtCore import QThread
from PyQt5.QtBluetooth import QBluetoothAddress
import sys
from bleak import discover
from ble import discover
from ble import connect
from qasync import QEventLoop
#main.py
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
uic.loadUi('mainwindow.ui', self)
self.streamtext = self.findChild(QTextEdit, "streamtext")
self.listDevicesText = self.findChild(QListWidget, "addressList")
self.scanButton = self.findChild(QPushButton, "scanButton")
self.connectButton = self.findChild(QPushButton,"connectButton")
self.exitButton = self.findChild(QPushButton,"exitButton")
self.exitButton.clicked.connect(self.quit)
self.scanButton.clicked.connect(self.discover_devices)
self.connectButton.clicked.connect(self.connect_devices)
self.show()
def discover_devices(self):
list = {}
list = asyncio.run(discover.main(),debug=True)
self.listDevicesText.addItem(list.get("Sensor1"))
def connect_devices(self):
# Get the first line is macaddress
asyncio.run(connect.run(self.listDevicesText.item(0).text()))
def quit(self):
sys.exit(0)
if __name__ == "__main__":
try:
app = QtWidgets.QApplication(sys.argv)
window = MainWindow()
app.exec()
except Exception as e:
print(e)
#ble/connect.py
async def run(macaddress):
async with BleakClient(macaddress, disconnected_callback=handle_disconnect) as client:
while True:
# here is reading/writing gatt char process
#ble/discover.py
import asyncio
from bleak import BleakScanner
async def main():
devicelist = {}
devices = await BleakScanner.discover()
for d in devices:
devicelist.update({d.name:d.address})
return devicelist发布于 2022-02-25 17:26:39
编程并不是将代码片段放在一起,而是将它们按原样组合在一起。其思想是了解每个元素是如何工作的,并分析它们是如何共存和合作的。在本例中,最好是创建一个QObject,它允许处理below并使用q异步,以便Qt可以与异步共存,如下所示:
import asyncio
from dataclasses import dataclass
from functools import cached_property
import sys
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QComboBox,
QLineEdit,
QMainWindow,
QPlainTextEdit,
QPushButton,
QVBoxLayout,
QWidget,
)
import qasync
from bleak import BleakScanner, BleakClient
from bleak.backends.device import BLEDevice
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
UART_SAFE_SIZE = 20
@dataclass
class QBleakClient(QObject):
device: BLEDevice
messageChanged = pyqtSignal(bytes)
def __post_init__(self):
super().__init__()
@cached_property
def client(self) -> BleakClient:
return BleakClient(self.device, disconnected_callback=self._handle_disconnect)
async def start(self):
await self.client.connect()
await self.client.start_notify(UART_TX_CHAR_UUID, self._handle_read)
async def stop(self):
await self.client.disconnect()
async def write(self, data):
await self.client.write_gatt_char(UART_RX_CHAR_UUID, data)
print("sent:", data)
def _handle_disconnect(self) -> None:
print("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def _handle_read(self, _: int, data: bytearray) -> None:
print("received:", data)
self.messageChanged.emit(data)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.resize(640, 480)
self._client = None
scan_button = QPushButton("Scan Devices")
self.devices_combobox = QComboBox()
connect_button = QPushButton("Connect")
self.message_lineedit = QLineEdit()
send_button = QPushButton("Send Message")
self.log_edit = QPlainTextEdit()
central_widget = QWidget()
self.setCentralWidget(central_widget)
lay = QVBoxLayout(central_widget)
lay.addWidget(scan_button)
lay.addWidget(self.devices_combobox)
lay.addWidget(connect_button)
lay.addWidget(self.message_lineedit)
lay.addWidget(send_button)
lay.addWidget(self.log_edit)
scan_button.clicked.connect(self.handle_scan)
connect_button.clicked.connect(self.handle_connect)
send_button.clicked.connect(self.handle_send)
@cached_property
def devices(self):
return list()
@property
def current_client(self):
return self._client
async def build_client(self, device):
if self._client is not None:
await self._client.stop()
self._client = QBleakClient(device)
self._client.messageChanged.connect(self.handle_message_changed)
await self._client.start()
@qasync.asyncSlot()
async def handle_connect(self):
self.log_edit.appendPlainText("try connect")
device = self.devices_combobox.currentData()
if isinstance(device, BLEDevice):
await self.build_client(device)
self.log_edit.appendPlainText("connected")
@qasync.asyncSlot()
async def handle_scan(self):
self.log_edit.appendPlainText("Started scanner")
self.devices.clear()
devices = await BleakScanner.discover()
self.devices.extend(devices)
self.devices_combobox.clear()
for i, device in enumerate(self.devices):
self.devices_combobox.insertItem(i, device.name, device)
self.log_edit.appendPlainText("Finish scanner")
def handle_message_changed(self, message):
self.log_edit.appendPlainText(f"msg: {message.decode()}")
@qasync.asyncSlot()
async def handle_send(self):
if self.current_client is None:
return
message = self.message_lineedit.text()
if message:
await self.current_client.write(message.encode())
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
w = MainWindow()
w.show()
with loop:
loop.run_forever()
if __name__ == "__main__":
main()https://stackoverflow.com/questions/71268254
复制相似问题