我希望在我的python web中使用AioHttp和Connexion来处理错误,就像Flask通过@app.errorhandler(Exception)一样
换句话说,假设我的服务引发了SomethingAlreadyExists,而我想返回409冲突,而不是在我所有的apis中添加以下代码:
try:
myservice.create_something(..)
except SomethingAlreadyExists as error: # Repeated code -> DRY
return json_response({"message": str(error)}, status=409)我只想在API层调用myservice.create_something(..),对于SomethingAlreadyExists异常,错误处理将返回409;对于SomethingNotFound,错误处理将返回404。
注意:在Flask land中,它将类似于以下内容:
import connexion
def create_api_app(version='api'):
connexion_app = connexion.FlaskApp(__name__, specification_dir='../api/')
connexion_app.add_api('openapi.yaml', validate_responses=True)
app = connexion_app.app
# It intercepts the specific exception and returns the respective status_code
@app.errorhandler(InvalidValueException)
def bad_request_handler(error):
return 'Bad Request: {}'.format(str(error)), 400
@app.errorhandler(NotFoundException)
def not_found_handler(error):
return 'Not found: {}'.format(str(error)), 404
@app.errorhandler(AlreadyExistsException)
def conflict_handler(error):
return 'Conflict: {}'.format(str(error)), 409
# my_service.py
def get_model(i):
model = get_model_or_none(id)
if btask is None:
raise NotFoundException(f"Model id:{id} not found.")
...
# api.py
def get_model(id):
model = my_service.get_model(id)
# handle errors not required ;)
return btask.to_dict()我想在我的AioHttp连接应用程序中做同样的事情:
from connexion import AioHttpApp
def create_app():
connexion_app = AioHttpApp(__name__, port=8000, specification_dir="../", only_one_api=True)
connexion_app.add_api("openapi.yaml", pass_context_arg_name="request")
# Do something here.. like
# web.Application(middlewares=[handle_already_exists_errors]) --> doesn't work
# OR
# connex_app.add_error_handler(
# AlreadyExistsException, handle_already_exists_errors) --> doesn't work too
return connexion_app干杯,我将感谢任何帮助!
罗杰
发布于 2020-11-04 04:57:48
我深入研究了connexion和aiohttp代码,并想出了一种使用中间件实现这一点的方法:
import json
from aiohttp.web import middleware
from connexion.lifecycle import ConnexionResponse
from connexion import AioHttpApp
from .exceptions import NotFoundException, AlreadyExistsException
def create_app():
connexion_app = AioHttpApp(
__name__, port=8000, specification_dir="../", only_one_api=True
)
connexion_app.add_api("openapi.yaml", pass_context_arg_name="request")
connexion_app.app.middlewares.append(errors_handler_middleware)
return connexion_app
@middleware
async def errors_handler_middleware(request, handler):
""" Handle standard errors returning response following the connexion style messages"""
try:
response = await handler(request)
return response
except NotFoundException as error:
return json_error(message=str(error), title='Not Found', status_code=404)
except AlreadyExistsException as error:
return json_error(message=str(error), title='Conflict', status_code=409)
def json_error(message, title, status_code):
return ConnexionResponse(
body=json.dumps({
'title': title,
'detail': message,
'status': status_code,
}).encode('utf-8'),
status_code=status_code,
content_type='application/json'
)https://stackoverflow.com/questions/64656413
复制相似问题