首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Quart (异步Flask)应用程序中的Autobahn websocket客户端

Quart (异步Flask)应用程序中的Autobahn websocket客户端
EN

Stack Overflow用户
提问于 2019-03-29 01:10:11
回答 1查看 681关注 0票数 2

大家晚上好。我对这个地方并不是很陌生,但最终还是决定注册并寻求帮助。我使用Quart框架(异步Flask)开发了一个web应用程序。现在,随着应用程序变得更大、更复杂,我决定将不同的过程分离到不同的服务器实例,这主要是因为我想保持web服务器的干净,更抽象,没有计算负载。

因此,我计划使用一台web服务器和几台(如果需要)相同的过程服务器。所有的服务器都基于quart框架,目前只是为了开发的简单性。我决定使用Crossbar.io路由器和autobahn将所有服务器连接在一起。

这时问题就出现了。我关注了这篇文章:

Running several ApplicationSessions non-blockingly using autbahn.asyncio.wamp

How can I implement an interactive websocket client with autobahn asyncio?

How I can integrate crossbar client (python3,asyncio) with tkinter

How to send Autobahn/Twisted WAMP message from outside of protocol?

似乎我尝试了所有可能的方法来在我的quart应用程序中实现autobahn websocket客户端。我不知道如何让它成为可能,这样两件事都可以工作,是否Quart应用程序可以工作而autobahn WS客户端不能,或者反之亦然。

简化的我的quart应用程序如下所示:

代码语言:javascript
复制
from quart import Quart, request, current_app
from config import Config
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

import concurrent.futures

class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    print ("before autobahn start")
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        runner = ApplicationRunner('ws://127.0.0.1:8080 /ws', 'realm1')
        future = executor.submit(runner.run(Component))
    print ("after autobahn started")

    return app


from app import models

在这种情况下,应用程序被卡在runner循环中,并且整个应用程序无法工作(不能服务于请求),只有当我通过Ctrl-C中断runner(Autobahn)循环时才有可能。

启动后的命令:

代码语言:javascript
复制
(quart-app) user@car:~/quart-app$ hypercorn --debug --error-log - --access-log - -b 0.0.0.0:8001 tengine:app
Running on 0.0.0.0:8001 over http (CTRL + C to quit)
before autobahn start
Ok, registered procedure with registration ID 4605315769796303

按下ctrl-C后:

代码语言:javascript
复制
...
^Cafter autobahn started
2019-03-29T01:06:52 <Server sockets=[<socket.socket fd=11, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]> is serving

如何让quart应用程序与autobahn客户端一起以非阻塞的方式工作?因此,autobahn打开并保持与Crossbar路由器的websocket连接,并在后台静默监听。

EN

回答 1

Stack Overflow用户

发布于 2019-03-30 18:55:24

经过几个不眠之夜,我终于找到了一个解决这个难题的好方法。

多亏了这篇文章C-Python asyncio: running discord.py in a thread

因此,我像这样重写了我的代码,并能够在其中运行我的Quart应用程序和autobahn客户端,两者都在以非阻塞的方式积极工作。整个__init__.py看起来像这样:

代码语言:javascript
复制
from quart import Quart, request, current_app
from config import Config


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    return app


# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading


class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))


    def onDisconnect(self):
        print('Autobahn disconnected')

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


async def start():
    runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
    await runner.run(Component) # use client.start instead of client.run

def run_it_forever(loop):
    loop.run_forever()

asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

在此场景中,我们使用autobahn的runner.run创建任务,并将其附加到当前循环,然后在新线程中永远运行此循环。

我对目前的解决方案相当满意……但是后来发现这个解决方案有一些缺点,这对我来说是至关重要的,例如:如果连接断开,则重新连接(即交叉开关路由器变得不可用)。使用这种方法,如果连接初始化失败或在一段时间后断开,它将不会尝试重新连接。此外,对我来说,如何注册/调用ApplicationSession API并不是很明显,比如在我的quart应用程序中注册/调用RPC。

幸运的是,我发现了autobahn在他们的文档中使用的另一个新组件API:https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py

它具有自动重新连接功能,并且很容易使用装饰器@component.register('com.something.do')为RPC注册函数,你只需要在此之前使用import component

下面是__init__.py解决方案的最终视图:

代码语言:javascript
复制
from quart import Quart, request, current_app
from config import Config

def create_app(config_class=Config):
    ...
    return app

from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading


component = Component(
    transports=[
        {
            "type": "websocket",
            "url": u"ws://localhost:8080/ws",
            "endpoint": {
                "type": "tcp",
                "host": "localhost",
                "port": 8080,
            },
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm=u"realm1",
)

@component.on_join
def join(session, details):
    print("joined {}".format(details))

async def start():
    await component.start() #used component.start() instead of run([component]) as it's async function

def run_it_forever(loop):
    loop.run_forever()

loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

我希望它能帮助一些人。干杯!

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55403347

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档