首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Connexion处理AioHttp上的错误

如何使用Connexion处理AioHttp上的错误
EN

Stack Overflow用户
提问于 2020-11-03 11:27:31
回答 1查看 224关注 0票数 0

我希望在我的python web中使用AioHttp和Connexion来处理错误,就像Flask通过@app.errorhandler(Exception)一样

换句话说,假设我的服务引发了SomethingAlreadyExists,而我想返回409冲突,而不是在我所有的apis中添加以下代码:

代码语言:javascript
复制
try:
  myservice.create_something(..)
except SomethingAlreadyExists as error:     # Repeated code -> DRY
  return json_response({"message": str(error)}, status=409)

我只想在API层调用myservice.create_something(..),对于SomethingAlreadyExists异常,错误处理将返回409;对于SomethingNotFound,错误处理将返回404。

注意:在Flask land中,它将类似于以下内容:

代码语言:javascript
复制
import connexion


def create_api_app(version='api'):
    connexion_app = connexion.FlaskApp(__name__, specification_dir='../api/')
    connexion_app.add_api('openapi.yaml', validate_responses=True)
    app = connexion_app.app

    # It intercepts the specific exception and returns the respective status_code
    @app.errorhandler(InvalidValueException)
    def bad_request_handler(error):
        return 'Bad Request: {}'.format(str(error)), 400

    @app.errorhandler(NotFoundException)
    def not_found_handler(error):
        return 'Not found: {}'.format(str(error)), 404

    @app.errorhandler(AlreadyExistsException)
    def conflict_handler(error):
        return 'Conflict: {}'.format(str(error)), 409


# my_service.py
def get_model(i):

    model = get_model_or_none(id)
    if btask is None:
        raise NotFoundException(f"Model id:{id} not found.")
    ...

# api.py
def get_model(id):
    model = my_service.get_model(id)
    # handle errors not required ;)
    return btask.to_dict()

我想在我的AioHttp连接应用程序中做同样的事情:

代码语言:javascript
复制
from connexion import AioHttpApp


def create_app():
    connexion_app = AioHttpApp(__name__, port=8000, specification_dir="../", only_one_api=True)
    connexion_app.add_api("openapi.yaml", pass_context_arg_name="request")

    # Do something here.. like
    # web.Application(middlewares=[handle_already_exists_errors]) --> doesn't work
    # OR
    # connex_app.add_error_handler(
    #       AlreadyExistsException, handle_already_exists_errors) --> doesn't work too 
    return connexion_app

干杯,我将感谢任何帮助!

罗杰

EN

回答 1

Stack Overflow用户

发布于 2020-11-04 04:57:48

我深入研究了connexionaiohttp代码,并想出了一种使用中间件实现这一点的方法:

代码语言:javascript
复制
import json
from aiohttp.web import middleware
from connexion.lifecycle import ConnexionResponse
from connexion import AioHttpApp
from .exceptions import NotFoundException, AlreadyExistsException


def create_app():
    connexion_app = AioHttpApp(
        __name__, port=8000, specification_dir="../", only_one_api=True
    )
    connexion_app.add_api("openapi.yaml", pass_context_arg_name="request")
    connexion_app.app.middlewares.append(errors_handler_middleware)
    return connexion_app


@middleware
async def errors_handler_middleware(request, handler):
    """ Handle standard errors returning response following the connexion style messages"""
    try:
        response = await handler(request)
        return response

    except NotFoundException as error:
        return json_error(message=str(error), title='Not Found', status_code=404)
    except AlreadyExistsException as error:
        return json_error(message=str(error), title='Conflict', status_code=409)


def json_error(message, title, status_code):
    return ConnexionResponse(
        body=json.dumps({
            'title': title,
            'detail': message,
            'status': status_code,
        }).encode('utf-8'),
        status_code=status_code,
        content_type='application/json'
    )
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64656413

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档