Django 3.1文件说这是关于异步视图的:
其主要优点是能够在不使用Python线程的情况下服务数百个连接。这允许您使用慢流、长轮询和其他令人兴奋的响应类型。
我相信“慢流”意味着我们可以在不垄断每个客户端线程的情况下实现一个SSE视图,因此我试图勾勒出一个简单的视图,如下所示:
async def stream(request):
async def event_stream():
while True:
yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
await asyncio.sleep(1)
return StreamingHttpResponse(event_stream(), content_type='text/event-stream')(注:我修改了这一反应的代码)
不幸的是,当调用此视图时,它会引发以下异常:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
raise exc_info[1]
File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
response = await get_response(request)
File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
response = await wrapped_callback(request, *callback_args, **callback_kwargs)
File "./chat/views.py", line 144, in watch
return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
self.streaming_content = streaming_content
File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
self._set_streaming_content(value)
File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable对我来说,这表明StreamingHttpResponse目前不支持异步生成器。
我试图修改StreamingHttpResponse以使用async for,但我做得不多。
知道我怎么能做到吗?
发布于 2020-08-17 14:08:34
老实说,Django本机不支持它,但是我为您提供了一个使用Daphne的解决方案(Daphne也在Django通道中使用)。
创建自己的StreamingHttpResponse类,该类能够从异步方法中检索数据流,并将其提供给Django的同步部分。
import asyncio
# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio
from django.http.response import StreamingHttpResponse
class AsyncStreamingHttpResponse(StreamingHttpResponse):
def __init__(self, streaming_content=(), *args, **kwargs):
sync_streaming_content = self.get_sync_iterator(streaming_content)
super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)
@staticmethod
async def convert_async_iterable(stream):
"""Accepts async_generator and async_iterator"""
return iter([chunk async for chunk in stream])
def get_sync_iterator(self, async_iterable):
nest_asyncio.apply()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
return result此外,您还需要使用达芙妮运行Django web服务器,以正确地支持服务器发送事件。它由"Django软件基金会“正式支持,具有类似于gunicorn的语法,但使用的是asgi.py而不是wsgi.py。
要使用它-您可以使用:pip install daphne安装
并更改来自:python manage.py runserver的命令
比如:daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application。
不确定它是否能与gunicorn一起工作。
如果你还有其他问题,请告诉我。
发布于 2020-08-25 08:10:13
另一种方法是使用特殊的库django-eventstream
将以下内容添加到将使用数据的HTML页面:
<script src="{% static 'django_eventstream/eventsource.min.js' %}"></script>
<script src="{% static 'django_eventstream/reconnecting-eventsource.js' %}"></script>
var es = new ReconnectingEventSource('/events/');
es.addEventListener('message', function (e) {
console.log(e.data);
}, false);
es.addEventListener('stream-reset', function (e) {
// ... client fell behind, reinitialize ...
}, false);对于后端,您需要正确设置Django,稍后您将能够在任何需要执行服务器附带事件的视图/任务/信号/方法中调用以下方法:
添加将产生数据(事件)的下列视图:
# from django_eventstream import send_event
send_event('test', 'message', {'text': 'hello world'})发布于 2021-12-30 07:33:50
我创建了一个名为stream的装饰器,它可以与coroutine函数一起使用,以使它与Django的StreamingHttpResponse兼容。下面是一个例子:
import asyncio
import functools
from django.http import StreamingHttpResponse
def stream(coroutine_function):
@functools.wraps(coroutine_function)
def wrapper():
coroutine = coroutine_function()
try:
while True:
yield asyncio.run(coroutine.__anext__())
except StopAsyncIteration:
pass
return wrapper
@stream
async def chunks():
for char in 'Hello, world!':
yield char
await asyncio.sleep(1)
async def index(request):
return StreamingHttpResponse(chunks())我还需要添加nest_asyncio并在settings.py文件的顶部调用apply(),如下所示:
import nest_asyncio
nest_asyncio.apply()nest_asyncio依赖项支持从由stream装饰器创建的wrapper函数调用stream。
最后,Django的asgi可以使用uvicorn通过gunicorn运行,例如:
$ gunicorn -k uvicorn.workers.UvicornWorker www.asgi:applicationhttps://stackoverflow.com/questions/63316840
复制相似问题