很久没用过龙卷风了。我想有一个从龙卷风运行的主机的串行设备上获取更新的网络插座。因此,我尝试使用tornado进行多进程处理,但该进程无法访问tornado websocket。我试着将它合并为协程,但似乎没有产生。
class WebApplication(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', IndexPageHandler),
(r"/config", ConfigHandler),
(r"/shutdown", ShutdownHandler),
(r'/websocket', WebSocketHandler),
(r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
]
settings = {
'debug': debug,
'static_path': resourcesWeb,
'template_path': 'templates'
}
tornado.web.Application.__init__(self, handlers, **settings)
@gen.coroutine
def serial_reader(self):
log('serial_reader: start')
done = False
while not done:
sh.read()
serial_data_from = str(sh.data)
if len(serial_data_from) > 0:
if debug:
log('serial read:' + serial_data_from)
yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
yield gen.sleep(0.3)
log('serial_reader: exit')Python 3.8.5,Tornad 6.1
我如何正确地不断地用tornado应用程序外部的数据更新websocket
发布于 2021-02-03 22:12:05
因为sh.read是阻塞的,所以你需要在一个执行器中运行它。为了通知主线程中的客户端,您需要使用IOLoop.add_callback (从任何线程调用都是安全的)。这也意味着阅读器方法变成了常规的同步方法。
示例:
from concurrent.futures import ThreadPoolExecutor
import functools
from tornado import web, websocket, ioloop
log = print
class IndexHandler(web.RequestHandler):
def get(self):
self.write("""<html>
<textarea cols="30" rows="10" id="output">%s</textarea><br />
<a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
<a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
<iframe name="f" width="100" height="30"></iframe>
<script>
ws = new WebSocket("ws://localhost:8888/stream");
out_el = document.getElementById("output");
function log(data) {out_el.value = data + "\\n" + out_el.value;}
ws.onmessage = function (ev) {log(ev.data);}
</script>""" % "\n".join(map(str, reversed(self.application.read_data))))
class StartHandler(web.RequestHandler):
def get(self):
self.application.start_reader()
self.write("Started")
class StopHandler(web.RequestHandler):
def get(self):
self.application.stop_reader()
self.write("Stopped")
class WebSocketHandler(websocket.WebSocketHandler):
connections = set()
def open(self):
WebSocketHandler.connections.add(self)
def on_close(self):
if self in WebSocketHandler.connections:
WebSocketHandler.connections.remove(self)
class WebApplication(web.Application):
def __init__(self, autostart=False):
handlers = [
(r"/", IndexHandler),
(r"/start", StartHandler),
(r"/stop", StopHandler),
(r'/stream', WebSocketHandler),
]
web.Application.__init__(self, handlers)
self._reader_executor = ThreadPoolExecutor(1)
self._keep_reading = None
self.read_data = []
if autostart:
self.start_reader()
def start_reader(self):
if not self._keep_reading:
self._keep_reading = True
loop = ioloop.IOLoop.current()
self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
def stop_reader(self):
if self._keep_reading:
self._keep_reading = False
self._reader_future.cancel()
def notify_clients(self, data=None):
for con in WebSocketHandler.connections:
try:
con.write_message("{}".format(data))
except Exception as ex:
log("error sending to {}".format(con))
def reader(self, main_loop):
import random
import time
while self._keep_reading:
time.sleep(1 + random.random()) # simulate read - block for some time
data = random.getrandbits(32)
print("reader: data={}".format(data))
if data:
main_loop.add_callback(self.notify_clients, data)
self.read_data.append(data)
time.sleep(0.1)
if __name__ == "__main__":
app = WebApplication(True)
app.listen(8888)
loop = ioloop.IOLoop.current()
try:
loop.start()
except KeyboardInterrupt as ex:
app.stop_reader()
for con in WebSocketHandler.connections:
con.close()
loop.stop()https://stackoverflow.com/questions/66008154
复制相似问题