我需要从RFID读卡器那里得到RFID卡号。这是客户端发起的。客户端AJAX发布到"/user/rfid“路由。这条路线要么返回“错误.”或者是rfid卡号。服务器打开检索此编号的子进程。如果服务器停留一段时间,用户尝试使用RFID登录,服务器就会中断。我需要一个更好的方法来完成这个任务,或者至少清除子进程?
有什么建议吗?
烧瓶路线
@app.route("/user/rfid", methods=["POST"])
def user_rfid():
rfid = subprocess.Popen(["python", "rfidSimple.py"], stdout=subprocess.PIPE).stdout.read()
print("rfid read: %s" % rfid)
if(rfid == "Exiting...."):
return json.dumps({"error":"rfid reader failure"})
else:
user = db.getUserByRFID(rfid.replace(" ",""))
if("id" in user.keys()):
# if credentials match, add user to session
session["id"] = user["id"]
# return redirect url
return json.dumps({"url":url_for("kiosk")})
else:
return json.dumps({"error":"rfid reader failure"})rfidSimple.py
#!/usr/bin/env python
#Basic imports
from ctypes import *
import sys
#Phidget specific imports
from Phidgets.PhidgetException import PhidgetErrorCodes, PhidgetException
from Phidgets.Events.Events import AttachEventArgs, DetachEventArgs, ErrorEventArgs, OutputChangeEventArgs, TagEventArgs
from Phidgets.Devices.RFID import RFID, RFIDTagProtocol
# Rudimentary wait system variable
x = True
def rfidTagGained(e):
global x
source = e.device
sys.stdout.write(e.tag)
x = False
def main():
#Create an RFID object
try:
rfid = RFID()
except RuntimeError as e:
sys.stdout.write("Exiting...")
try:
rfid.setOnTagHandler(rfidTagGained)
except PhidgetException as e:
sys.stdout.write("Exiting...")
try:
rfid.openPhidget()
except PhidgetException as e:
sys.stdout.write("Exiting...")
try:
rfid.waitForAttach(10000)
except PhidgetException as e:
# print("Phidget Exception %i: %s" % (e.code, e.details))
try:
rfid.closePhidget()
except PhidgetException as e:
sys.stdout.write("Exiting...")
rfid.setAntennaOn(True)
while x:
# wait wait wait!
pass
exit(1)
if __name__ == '__main__':
main()发布于 2014-05-16 00:16:07
您可以启动一个单独的线程来读取进程规范。在线程上执行连接(超时),如果线程没有终止,您知道调用花费的时间太长,可以终止它。下面是一个示例,其中所有的内容都封装在一个类中。我没有测试它,它可能有错误,但您可能会发现这个想法很有用。
import threading
import subprocess
import time
class RfidSimple(threading.Thread):
def __init__(self, timeout=10):
super(theading.Thread, self).__init__()
self.end = time.time() + timeout
self.proc = subprocess.Popen(["python", "rfidSimple.py"],
stdout=subprocess.PIPE)
self.start()
def run(self):
self._result = self.proc.stdout.read()
@property
def result(self):
timeout = self.end - time.time()
if timeout > 0:
self.join(timeout)
if not self.isAlive():
self.proc.wait()
return self._result
self.proc.terminate()
self.join(5)
if not self.isAlive():
self.proc.wait()
return "Exiting..."
raise Exception("could not kill subprocess")
def user_rfid():
rfid = RfidSimple().result
print("rfid read: %s" % rfid)
if(rfid == "Exiting...."):
return json.dumps({"error":"rfid reader failure"})
else:
user = db.getUserByRFID(rfid.replace(" ",""))
if("id" in user.keys()):
# if credentials match, add user to session
session["id"] = user["id"]
# return redirect url
return json.dumps({"url":url_for("kiosk")})
else:
return json.dumps({"error":"rfid reader failure"})https://stackoverflow.com/questions/23689993
复制相似问题