不久前,我写了一个应用程序来计划和管理无人机比赛。(这可能是任何通过检查站的身份号码的东西。)当时,大门(检查点)与运行此应用程序的设备之间的通信是通过XBee无线电完成的。当门检测到无人驾驶飞机时,它发送一个XBee数据,其中包含门识别信和检测到的无人机的编号ID。
应用程序负责读取和解释这些XBee消息。这是通过xbee模块完成的。我还包括了一个用于调试/测试的stdin阅读器。
对于一个新事件,决定从XBee通信切换到WiFi。因此,我被要求通过UDP数据报处理消息。我想把数据阅读器和他们的选择在启动审查。整个代码都是可访问的论GitHub,其他一些部分在不久的将来可能会成为新的问题。
import os
from argparse import ArgumentParser
import drone_racer
# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))
parser = ArgumentParser(description='Interface graphique "Drone Racer"')
# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')
# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
# UDP args
parser.add_argument('--use-udp', dest='udp', action='store_true',
help='Spécifie si la communication doit se faire '
'par datagrames UDP.')
parser.add_argument('--port', dest='port', metavar='NUM', type=int,
default=4387, help='Port à utiliser pour l’écoute UDP')
# Choose the appropriate reader
args = parser.parse_args()
if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReader
# Launch the GUI (which will, in turn, start the reader)
app = drone_racer.Application(reader, args.fancy)
app.run()在drone_racer.Application的某个时候,reader将被实例化为回调函数:reader(self.console.compute_data)。
from .ui import DroneRacer as Application
from .threads import StdInReader, XBeeReader, UDPReaderimport os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None
class BaseReader(Thread):
"""Base class for custom data readers."""
def __init__(self, update_function):
"""Spawn a thread that continuously read data for drones statuses.
Parameter:
- update_function: the function that will be called each time a
valid data is read.
"""
super().__init__(name="reader")
self._update_data = update_function
self._should_continue = True
self.start()
def run(self):
"""The main action of the thread.
Wait for data, read them and send them to the rest of the application
for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)
def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier, drone
number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")
def _process_value(self, gate, drone):
"""Send input data to the rest of the application.
Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone < 0:
return
self._update_data(gate, drone)
class StdInReader(BaseReader):
"""Read data from stdin. Primarily used for tests and debug."""
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number).
Convert data such as "0 1" to the tuple ('A', 1).
"""
raw = input('[@] ').split()
try:
gate, drone = raw
return chr(int(gate) + ord('A')), int(drone)
except ValueError:
pass
class _UDPReader(BaseReader):
"""Read data from UDP datagrams. Used when communicating via WiFi
with the gates.
"""
def __init__(self, iface, port, update_function):
"""Spawn a thread that continuously read data for drones statuses.
Parameter:
- iface: the address of the interface to listen on.
- port: the socket port to listen on.
- update_function: the function that will be called each time a
valid data is read.
"""
com = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
com.bind((iface, port))
self._socket = [com]
super().__init__(update_function)
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number).
Decode an UDP datagram containing b"C:3" to the tuple ('C', 2).
"""
# Non-blocking read so this thread will shut down with the application
ready, _, _ = select(self._socket, [], [], 0)
for socket in ready:
msg = socket.recv(128) # Way too much for messages like <A:1>
try:
gate, drone = msg.split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', msg, 'a été reçu mais n’est pas'
'compris par l’application.', file=sys.stderr)
print(e, file=sys.stderr)
else:
return gate, drone
class UDPReader:
"""Factory of _UDPReaders."""
def __init__(self, port):
"""Save parameters for future use.
Parameter:
- port: the socket port to listen on.
"""
self._port = port
def __call__(self, callback):
"""Generate the appropriate class to read data.
Parameter:
- callback: the function that will be called each
time a valid data is read.
"""
return _UDPReader(socket.gethostname(), self._port, callback)
if XBee is None:
class _BeeReader(BaseReader):
"""Read data from a serial port bound to an XBee.
Dummy implementation because xbee module could not be loaded.
"""
def read_new_value(self):
"""Cancel this thread to avoid burning resources."""
self._should_continue = False
def XBeeReader(*args, **kwargs):
"""Wrapper around the xbee module to integrate our _BeeReaderMixin
into the appropriate base class.
Dummy implementation because xbee module could not be loaded.
"""
print('Le module XBee est instrouvable. Aucune donnée ne pourra',
'être lue', file=sys.stderr)
return _BeeReader
else:
class _BeeReaderMixin:
"""Read data from a serial port bound to an XBee."""
def __init__(self, serial, callback):
"""Initialize the XBee reader thanks to the mro.
Parameters:
- serial: the serial port object to read data from
- callback: the function that will be called each
time a valid data is read.
"""
self._update_data = callback
super().__init__(serial, callback=self._process_value)
def _process_value(self, response_dict):
"""Convert a raw data received in a frame by the XBee
into suitable data for the application.
Should be called each time a frame is read by the XBee.
"""
try:
gate, drone = response_dict['rf_data'].split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', response_dict['rf_data'],
'a été reçu mais n’est pas compris par l’application.',
file=sys.stderr)
print(e, file=sys.stderr)
except KeyError as e:
print('Un message ne contenant pas de données a été reçu.',
file=sys.stderr)
print(e, file=sys.stderr)
else:
self._update_data(gate, drone)
def stop(self):
"""Halt the thread from reading its input and close the
underlying serial port.
"""
self.halt()
self.serial.close()
class XBeeReader:
"""Wrapper around the xbee module to integrate our _BeeReaderMixin
into the appropriate base class.
"""
def __init__(self, *args, **kwargs):
"""Save parameters for future use.
Everything is used to initialize a serial.Serial object
except for the named attribute 'zigbee' which define the
base class to use.
Parameter:
- zigbee: whether to use the xbee.ZigBee base class or
the xbee.XBee one
"""
zigbee = kwargs.pop('zigbee', False)
self._args = args
self._kwargs = kwargs
self._base_cls = ZigBee if zigbee else XBee
def __call__(self, callback):
"""Generate the appropriate class to read data.
Parameter:
- callback: the function that will be called each
time a valid data is read.
"""
serial = Serial(*self._args, **self._kwargs)
self._args = None
self._kwargs = None
return type('XBeeReader', (_BeeReaderMixin, self._base_cls), {})(
serial, callback)预期的调用包括:
python droneracer.py --use-udp --port 6329python droneracer.py --serial-port /dev/ttyUSB0 --baudrate 9600python droneracer.py我最关心的是这些部分:
UDPReader是否遵循非阻塞套接字管理的良好实践(可能更多超时)?发布于 2016-01-04 19:55:46
WRT直观的CLI参数解析,您可以考虑签出点击库,如果不是这个项目,那么下一个项目。
您的CLI代码可能如下所示:
import click
@click.command()
@click.option("--fancy-title", default=False, help="Utilise une barre de titre"
" un peu plus Gtk3")
@click.option("--serial-port", type=type=click.Path(exists=True),
help="Spécifie le port série à utiliser pour récupérer les "
"informations provenant du XBee")
@click.option("--zigbee", default=False, help="Spécifie si le module XBee"
" est un ZigBee")
@click.option("--baudrate", default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
@click.option("--use-udp", default=False, help='Spécifie si la communication doit se faire '
'par datagrames UDP.')
@click.option("--port", default=4387, help='ort à utiliser pour l’écoute UDP')
def cli(fancy_title, serial_port, zigbee, baudrate, use_udp, port):
if serial_port is not None:
reader = drone_racer.XBeeReader(serial_port, baudrate, zigbee=zigbee)
elif use_udp:
reader = drone_racer.UDPReader(port)
else:
reader = drone_racer.StdInReader
app = drone_racer.Application(reader, fancy_title)(不,不是作者,甚至是贡献者,只是一个快乐的用户)
写这篇文章时,我注意到另一件事:--use-X模式可能不一致。调用
python droneracer.py --serial-port /dev/ttyUSB0 --baudrate 9600 --use-udp --port 1234是可能的,其中一半的参数都未使用。连接类型可以更好地看作是子命令:
python droneracer.py serial_port /dev/ttyUSB0 9600
python droneracer.py udp 6329
python droneracer.py stdin看起来更像是:
import click
@click.command()
@click.option("--fancy-title", default=False, help="Utilise une barre de titre"
" un peu plus Gtk3")
@click.pass_context
def cli(ctx, fancy_title):
"""Interface graphique "Drone Racer" """
pass
@cli.result_callback(reader)
def cli_callback(reader, fancy_title)
app = drone_racer.Application(reader, fancy_title)
@cli.group()
@click.argument("device", type=type=click.Path(exists=True),
help="Spécifie le port série à utiliser pour récupérer les "
"informations provenant du XBee", required=True)
@click.argument("baudrate", default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee', required=False)
@click.option("--zigbee", default=False, help="Spécifie si le module XBee"
" est un ZigBee")
def serial_port(device, baudrate, zigbee):
return drone_racer.XBeeReader(device, baudrate, zigbee=zigbee)
@cli.group()
@click.argument("port", default=4387, required=False)
def udp(port):
"""Spécifie si la communication doit se faire par datagrames UDP."""
return drone_racer.UDPReader(port)
@cli.group()
def stdin():
"""Specify the use of a serial port"""
return drone_racer.StdInReader作为第二种选择,您可以设计一些类似URL的规范并对其进行解析。考虑一下CLI:
python droneracer.py udp:4387
python droneracer.py /dev/USB0:9600它并不漂亮,而且会涉及自定义解析,但至少它是明确的,避免了自我矛盾的可能性。
我想我更喜欢子命令体系结构;最后一个建议是,如果存在对端点有标准的URL格式,那么就可以了,但是没有。
发布于 2016-01-01 13:33:09
在某些情况下,鸭子输入可能很有用,但是让那些想为自己的设备提供自己的阅读器的用户了解您特定的鸭类型的内部结构并不是很友好。
看看如何在droneracer.py中定义读者:
if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReader为什么在前两种情况下构造(工厂)对象,而在第三种情况中使用类对象?我们怎么才能把这两者区别开来呢?
相反,您可以使用回调函数调用BaseReader对象(而不是可初始化的),并且此时只启动线程。它使您在如何初始化对象方面具有更大的灵活性,并具有两个优点:
UDPReader工厂);xbee.XBee或xbee.ZigBee)的类可以使用工厂,并提供与BaseReader派生类完全相同的接口。PEP8建议将docstring长度限制为72个字符。
您记录类和方法,但不提供用于填充模块__doc__的docstring。
您可以在__all__中使用drone_racer/__init__.py变量来改进模块的帮助。在使用help(drone_racer)时,您在此列表中包含的类将使它们的文档合并到模块中的一个。它还将限制使用/如果使用from drone_racer import *时导入的对象数量。
"""Collection of classes to crete threaded objects allowing to read
data from various sources.
Readers should be created with whatever parameter they require and
then allow to be called with a callback function. This call return
the threaded object reading data.
These threaded objects are started immediatly and monitor incomming
data to normalize them before feeding them into the callback function.
They can easily be halted using their `stop` method.
"""
import os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None
class BaseReader(Thread):
"""Base class for custom data readers."""
def __init__(self):
"""Spawn a thread that will continuously read data for drones
statuses.
"""
super().__init__(name="reader")
def __call__(self, update_function):
"""Starts the thread with the given callback function to
process data with.
Parameter:
- update_function: the function that will be called each time
a valid data is read.
"""
self._update_data = update_function
self._should_continue = True
self.start()
# Return ourselves to allow for duck typing and other classes
# to return other kind of objects (see XBeeReader).
return self
def run(self):
"""The main action of the thread.
Wait for data, read them and send them to the rest of the
application for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)
def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")
def _process_value(self, gate, drone):
"""Send input data to the rest of the application.
Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone < 0:
return
self._update_data(gate, drone)
class StdInReader(BaseReader):
"""Read data from stdin. Primarily used for tests and debug."""
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number).
Convert data such as "0 1" to the tuple ('A', 1).
"""
raw = input('[@] ').split()
try:
gate, drone = raw
return chr(int(gate) + ord('A')), int(drone)
except ValueError:
pass
class UDPReader(BaseReader):
"""Read data from UDP datagrams. Used when communicating via
WiFi with the gates.
"""
def __init__(self, port):
"""Spawn a thread that continuously read data for drones
statuses.
Parameter:
- port: the socket port to listen on.
"""
super().__init__()
com = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
iface = socket.gethostname()
com.bind((iface, port))
self._socket = [com]
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number).
Decode an UDP datagram containing b"C:3" to the tuple ('C', 2).
"""
# Non-blocking read so this thread will shut down with the application
ready, _, _ = select(self._socket, [], [], 0.05)
for socket in ready:
msg = socket.recv(128) # Way too much for messages like <A:1>
try:
gate, drone = msg.split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', msg, 'a été reçu mais n’est pas'
'compris par l’application.', file=sys.stderr)
print(e, file=sys.stderr)
else:
return gate, drone
if XBee is None:
class XBeeReader(BaseReader):
"""Read data from a serial port bound to an XBee.
Dummy implementation because xbee module could not be loaded.
"""
def __init__(self, *args, **kwargs):
"""Accepts arguments to be compatible with the "real"
XBeeReader but prints a warning and terminate gracefully
instead.
"""
super().__init__()
print('Le module XBee est instrouvable. Aucune donnée ne pourra',
'être lue', file=sys.stderr)
def read_new_value(self):
"""Cancel this thread to avoid burning resources."""
self._should_continue = False
else:
class _BeeReaderMixin:
"""Read data from a serial port bound to an XBee."""
def __init__(self, serial, callback):
"""Initialize the XBee reader thanks to the mro.
Parameters:
- serial: the serial port object to read data from
- callback: the function that will be called each
time a valid data is read.
"""
self._update_data = callback
super().__init__(serial, callback=self._process_value)
def _process_value(self, response_dict):
"""Convert a raw data received in a frame by the XBee
into suitable data for the application.
Should be called each time a frame is read by the XBee.
"""
try:
gate, drone = response_dict['rf_data'].split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', response_dict['rf_data'],
'a été reçu mais n’est pas compris par l’application.',
file=sys.stderr)
print(e, file=sys.stderr)
except KeyError as e:
print('Un message ne contenant pas de données a été reçu.',
file=sys.stderr)
print(e, file=sys.stderr)
else:
self._update_data(gate, drone)
def stop(self):
"""Halt the thread from reading its input and close the
underlying serial port.
"""
self.halt()
self.serial.close()
class XBeeReader:
"""Wrapper around the xbee module to integrate our
_BeeReaderMixin into the appropriate base class.
"""
def __init__(self, *args, **kwargs):
"""Save parameters for future use.
Every parameter is used to initialize a serial.Serial
object except for the named attribute 'zigbee' which
define the base class to use.
Parameter:
- zigbee: whether to use the xbee.ZigBee base class or
the xbee.XBee one
"""
zigbee = kwargs.pop('zigbee', False)
base_cls = ZigBee if zigbee else XBee
self._serial = Serial(*args, **kwargs)
self._cls = type('XBeeReader', (_BaseReaderMixin, base_cls), {})
def __call__(self, callback):
"""Generate the appropriate object to read data.
Parameter:
- callback: the function that will be called each
time a valid data is read.
"""
return self._cls(self._serial, callback)"""Pubilc interface to the various components defined in this package.
Allows to construct the GUI responsible of the whole application
and to select a reader from the built-in ones.
"""
from .ui import DroneRacer as Application
from .threads import StdInReader, XBeeReader, UDPReader
__all__ = [
'Application',
'StdInReader',
'XBeeReader',
'UDPReader',
]"""Drone Racer is a project primarily developed for the DroneFest
organized as part of the FabLab Festival 2015. Its aim is to provide
an all-in-one interface for races organizers to:
- create different events for drones competition;
- register contestants and their associated drones;
- classify drones into categories;
- create several routes with their own set of rules for each event;
- setup and monitor races on a designated route;
- gather statistics on races for drivers, event or kind of route.
To reduce the overhead of having extraneous services for database
access, Drone Racer makes use of the python's built-in sqlite module.
It uses it to store informations on the contestants, the drones, the
different type of routes and the races leaderboards.
Additionally, setup, updates & leaderboard for each race can be sent
to a RESTful API for the audience.
"""
import os
from argparse import ArgumentParser
import drone_racer
# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))
parser = ArgumentParser(description='Interface graphique "Drone Racer"')
# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')
# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
# UDP args
parser.add_argument('--use-udp', dest='udp', action='store_true',
help='Spécifie si la communication doit se faire '
'par datagrames UDP.')
parser.add_argument('--port', dest='port', metavar='NUM', type=int,
default=4387, help='Port à utiliser pour l’écoute UDP')
# Choose the appropriate reader
args = parser.parse_args()
if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReader()
# Launch the GUI (which will, in turn, start the reader)
app = drone_racer.Application(reader, args.fancy)
app.run()为了提高可读性和共享性,您可能有兴趣了解一下gettext模块。
https://codereview.stackexchange.com/questions/115181
复制相似问题