首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >龙卷风异步协程

龙卷风异步协程
EN

Stack Overflow用户
提问于 2021-02-02 18:47:50
回答 1查看 78关注 0票数 1

很久没用过龙卷风了。我想有一个从龙卷风运行的主机的串行设备上获取更新的网络插座。因此,我尝试使用tornado进行多进程处理,但该进程无法访问tornado websocket。我试着将它合并为协程,但似乎没有产生。

代码语言:javascript
复制
class WebApplication(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexPageHandler),
            (r"/config", ConfigHandler),
            (r"/shutdown", ShutdownHandler),
            (r'/websocket', WebSocketHandler),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
        ]

        settings = {
            'debug': debug,
            'static_path': resourcesWeb,
            'template_path': 'templates'
        }
        tornado.web.Application.__init__(self, handlers, **settings)

    @gen.coroutine
    def serial_reader(self):
        log('serial_reader: start')
        done = False
        while not done:
            sh.read()
            serial_data_from = str(sh.data)
            if len(serial_data_from) > 0:
                if debug:
                    log('serial read:' + serial_data_from)
                    yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
            yield gen.sleep(0.3)
        log('serial_reader: exit')

Python 3.8.5,Tornad 6.1

我如何正确地不断地用tornado应用程序外部的数据更新websocket

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-03 22:12:05

因为sh.read是阻塞的,所以你需要在一个执行器中运行它。为了通知主线程中的客户端,您需要使用IOLoop.add_callback (从任何线程调用都是安全的)。这也意味着阅读器方法变成了常规的同步方法。

示例:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import functools

from tornado import web, websocket, ioloop

log = print


class IndexHandler(web.RequestHandler):
    def get(self):
        self.write("""<html>
            <textarea cols="30" rows="10" id="output">%s</textarea><br />
            <a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
            <a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
            <iframe name="f" width="100" height="30"></iframe>
            <script>
                ws = new WebSocket("ws://localhost:8888/stream");
                out_el = document.getElementById("output");
                function log(data) {out_el.value = data + "\\n" + out_el.value;}
                ws.onmessage = function (ev) {log(ev.data);}
            </script>""" % "\n".join(map(str, reversed(self.application.read_data))))


class StartHandler(web.RequestHandler):
    def get(self):
        self.application.start_reader()
        self.write("Started")


class StopHandler(web.RequestHandler):
    def get(self):
        self.application.stop_reader()
        self.write("Stopped")


class WebSocketHandler(websocket.WebSocketHandler):
    connections = set()

    def open(self):
        WebSocketHandler.connections.add(self)

    def on_close(self):
        if self in WebSocketHandler.connections:
            WebSocketHandler.connections.remove(self)


class WebApplication(web.Application):
    def __init__(self, autostart=False):
        handlers = [
            (r"/", IndexHandler),
            (r"/start", StartHandler),
            (r"/stop", StopHandler),
            (r'/stream', WebSocketHandler),
        ]
        web.Application.__init__(self, handlers)
        self._reader_executor = ThreadPoolExecutor(1)
        self._keep_reading = None
        self.read_data = []
        if autostart:
            self.start_reader()
    
    def start_reader(self):
        if not self._keep_reading:
            self._keep_reading = True
            loop = ioloop.IOLoop.current()
            self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
    
    def stop_reader(self):
        if self._keep_reading:
            self._keep_reading = False
            self._reader_future.cancel()
    
    def notify_clients(self, data=None):
        for con in WebSocketHandler.connections:
            try:
                con.write_message("{}".format(data))
            except Exception as ex:
                log("error sending to {}".format(con))
    
    def reader(self, main_loop):
        import random
        import time
        while self._keep_reading:
            time.sleep(1 + random.random())  # simulate read - block for some time
            data = random.getrandbits(32)
            print("reader: data={}".format(data))
            if data:
                main_loop.add_callback(self.notify_clients, data)
                self.read_data.append(data)
            time.sleep(0.1)


if __name__ == "__main__":
    app = WebApplication(True)
    app.listen(8888)
    loop = ioloop.IOLoop.current()
    try:
        loop.start()
    except KeyboardInterrupt as ex:
        app.stop_reader()
        for con in WebSocketHandler.connections:
            con.close()
        loop.stop()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66008154

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档