首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >服务器长时间轮询并开始子处理每个呼叫的RFID卡号。如果时间太长,我的服务器就会崩溃。我怎么才能做得更好?

服务器长时间轮询并开始子处理每个呼叫的RFID卡号。如果时间太长,我的服务器就会崩溃。我怎么才能做得更好?
EN

Stack Overflow用户
提问于 2014-05-15 22:51:02
回答 1查看 147关注 0票数 1

我需要从RFID读卡器那里得到RFID卡号。这是客户端发起的。客户端AJAX发布到"/user/rfid“路由。这条路线要么返回“错误.”或者是rfid卡号。服务器打开检索此编号的子进程。如果服务器停留一段时间,用户尝试使用RFID登录,服务器就会中断。我需要一个更好的方法来完成这个任务,或者至少清除子进程?

有什么建议吗?

烧瓶路线

代码语言:javascript
复制
@app.route("/user/rfid", methods=["POST"])
def user_rfid():
    rfid = subprocess.Popen(["python", "rfidSimple.py"], stdout=subprocess.PIPE).stdout.read()
    print("rfid read: %s" % rfid)
    if(rfid == "Exiting...."):
        return json.dumps({"error":"rfid reader failure"})
    else:
        user = db.getUserByRFID(rfid.replace(" ",""))
        if("id" in user.keys()):
            # if credentials match, add user to session
            session["id"]       = user["id"]
            # return redirect url
            return json.dumps({"url":url_for("kiosk")})
        else:
            return json.dumps({"error":"rfid reader failure"})

rfidSimple.py

代码语言:javascript
复制
#!/usr/bin/env python

#Basic imports
from ctypes import *
import sys
#Phidget specific imports
from Phidgets.PhidgetException import PhidgetErrorCodes, PhidgetException
from Phidgets.Events.Events import AttachEventArgs, DetachEventArgs, ErrorEventArgs, OutputChangeEventArgs, TagEventArgs
from Phidgets.Devices.RFID import RFID, RFIDTagProtocol

# Rudimentary wait system variable
x = True
def rfidTagGained(e):
    global x
    source = e.device
    sys.stdout.write(e.tag)
    x = False

def main():
     #Create an RFID object
    try:
        rfid = RFID()
    except RuntimeError as e:
        sys.stdout.write("Exiting...")
    try:
        rfid.setOnTagHandler(rfidTagGained)
    except PhidgetException as e:
        sys.stdout.write("Exiting...")
    try:
        rfid.openPhidget()
    except PhidgetException as e:
        sys.stdout.write("Exiting...")

    try:
        rfid.waitForAttach(10000)
    except PhidgetException as e:
        # print("Phidget Exception %i: %s" % (e.code, e.details))
        try:
            rfid.closePhidget()
        except PhidgetException as e:
            sys.stdout.write("Exiting...")
    rfid.setAntennaOn(True)
    while x:
        # wait wait wait!
        pass
    exit(1)

if __name__ == '__main__':
    main()
EN

回答 1

Stack Overflow用户

发布于 2014-05-16 00:16:07

您可以启动一个单独的线程来读取进程规范。在线程上执行连接(超时),如果线程没有终止,您知道调用花费的时间太长,可以终止它。下面是一个示例,其中所有的内容都封装在一个类中。我没有测试它,它可能有错误,但您可能会发现这个想法很有用。

代码语言:javascript
复制
import threading
import subprocess
import time

class RfidSimple(threading.Thread):

    def __init__(self, timeout=10):
        super(theading.Thread, self).__init__()
        self.end = time.time() + timeout
        self.proc = subprocess.Popen(["python", "rfidSimple.py"],
            stdout=subprocess.PIPE)
        self.start()

    def run(self):
        self._result = self.proc.stdout.read()

    @property
    def result(self):
        timeout = self.end - time.time()
        if timeout > 0:
            self.join(timeout)
            if not self.isAlive():
                self.proc.wait()
                return self._result
        self.proc.terminate()
        self.join(5)
        if not self.isAlive():
            self.proc.wait()
            return "Exiting..."
        raise Exception("could not kill subprocess")

def user_rfid():
    rfid = RfidSimple().result
    print("rfid read: %s" % rfid)
    if(rfid == "Exiting...."):
        return json.dumps({"error":"rfid reader failure"})
    else:
        user = db.getUserByRFID(rfid.replace(" ",""))
        if("id" in user.keys()):
            # if credentials match, add user to session
            session["id"]       = user["id"]
            # return redirect url
            return json.dumps({"url":url_for("kiosk")})
        else:
            return json.dumps({"error":"rfid reader failure"})
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23689993

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档