首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在ASGI中使用StreamingHttpResponse运行Django通道

如何在ASGI中使用StreamingHttpResponse运行Django通道
EN

Stack Overflow用户
提问于 2021-06-07 17:48:08
回答 1查看 282关注 0票数 3

我有一个简单的应用程序,它使用开放的cv和wsgi中的服务器来流图像。但是每当我将Django频道介绍到图片中,并从WSGI切换到ASGI时,流就停止了。我如何从cv2中流图像,同时使用Django channels?先谢谢你

我的流媒体代码:

代码语言:javascript
复制
def camera_feed(request):
    stream = CameraStream()
    frames = stream.get_frames()
    return StreamingHttpResponse(frames, content_type='multipart/x-mixed-replace; boundary=frame')

settings.py:

代码语言:javascript
复制
ASGI_APPLICATION = 'photon.asgi.application'

asgi.py

代码语言:javascript
复制
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(URLRouter(ws_urlpatterns))
})
EN

回答 1

Stack Overflow用户

发布于 2022-11-28 13:26:04

首先,我们根本不需要StramingHTTPResponse发送图像数据.

为此,首先,确保您有一个包含3.x和Python的Django版本。

然后,安装django-频道第三方包。

按照以下方式配置ASGI应用程序:

代码语言:javascript
复制
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import .routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            app.routing.websocket_urlpatterns
        )
    )
})

然后需要在ASGI_APPLICATION文件中设置settings.py常量:

代码语言:javascript
复制
ASGI_APPLICATION = "myproject.asgi.application"

之后,只需在应用程序中的WebSocket文件中创建异步consumers.py使用者:

代码语言:javascript
复制
import json
from channels.generic.websocket import AsyncWebsocketConsumer


class PairingChat(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
    
        await self.accept()

    async def disconnect(self):

        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )


    # Asyncwebsocket consumer can send any type of data ...

    async def receive(self, text_data):
        data_json = json.loads(your_data)
        message = data_json['message']

        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': '# send your data from here ...',
                'message': message,
                'user': self.scope['session']['name']
            }
        )


    async def chat_message(self, event):
        message = event['message']

        await self.send(data=json.dumps({
            'user': event['user'],
            'message': message,
        }))

为异步well使用者创建一个路由.

代码语言:javascript
复制
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat1/(?P<room_name>\w+)/$', consumers.PairingChat.as_asgi()),
]

然后,只需在javascript中创建一个WebSocket客户机.你可以走了..。

JS Websocket创建链接:javascript-websocket

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67876456

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档