from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app)上面的例子来自官方文件。
问题是,如何在没有web.run_app(app) ( KeyBoardInterrupt,Ctrl+C)的情况下终止KeyBoardInterrupt操作?
我看起来像是:
await app.shutdown()
await app.cleanup()但是我不知道我能把这些代码放在哪里,如何使用它。
发布于 2022-02-11 06:12:36
在来源中,它查找KeyboardInterrupt和GracefulExit的响应
try:
asyncio.set_event_loop(loop)
loop.run_until_complete(main_task)
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
_cancel_tasks({main_task}, loop)
_cancel_tasks(asyncio.all_tasks(loop), loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
asyncio.set_event_loop(None)由于您不想停留在手册中,所以您可以在自己的代码中引发GracefulExit,如下所示:
test.py:
import signal
from aiohttp import web
from aiohttp.web_runner import GracefulExit
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def shutdown(request):
print("will shutdown now")
raise GracefulExit()
app = web.Application()
app.add_routes([web.get('/shutdown', shutdown),
web.get('/', handle),
web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app)一旦您想关闭服务器,就可以将http://ip:8080/shutdown发送到服务器。
附加,,如果您想在接收请求后直接退出服务器,也可以使用signal.alarm(3),那么您就不需要使用/shutdown来定义处理程序了。这意味着在收到任何请求后3秒后向程序发送警报(aiohttp内部已经向loop.add_signal_handler(signal.SIGINT, _raise_graceful_exit)注册了一个信号处理程序):
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
signal.alarm(3)
return web.Response(text=text)无论如何,关闭服务器的方法是或send signal,但是退出服务器的时间取决于您的场景。
发布于 2022-02-11 06:47:43
基于@atline回答,我编写了以下代码:
import signal
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import cpu_count
import requests
import time
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def shutdown(request):
print("will shutdown now")
raise GracefulExit()
app = web.Application()
app.add_routes([web.get('/shutdown', shutdown),
web.get('/', handle),
web.get('/{name}', handle)])
def init_aiohttp():
web.run_app(app, host="127.0.0.1", port="8080", ssl_context=None)
if __name__ == '__main__':
executor = ProcessPoolExecutor()
for i in range(0, 1):
aiohttp = executor.submit(init_aiohttp)
total_time = 0
while(True):
time.sleep(1)
total_time += 1
if total_time == 5:
try:
requests.get('http://127.0.0.1:8080/shutdown')
except Exception as e:
print(e)
print(total_time)上述守则:
aiohttp服务器。希望这会对未来的人有所帮助。
https://stackoverflow.com/questions/71062375
复制相似问题